欢迎来到cool的博客
7

Music box

Click to Start

点击头像播放音乐
新博客链接

Golang查询Elasticsearch

本文主要讲 golang 连接 es 的常用操作

1.下载依赖库

$ go get github.com/olivere/elastic
操作代码如下:

es.go

package es
 
import (
    "context"
    "fmt"
    "github.com/olivere/elastic"
    "strconv"
    "time"
)
 
var host = []string{
    "http://10.42.0.121:9200/",
    "http://10.42.0.122:9200/",
    "http://10.42.0.123:9200/",
}
 
var client *elastic.Client
 
//初始化
func init() {
    var err error
    client, err = elastic.NewClient(elastic.SetURL(host...))
    if err != nil {
        fmt.Printf("create client failed, err: %v", err)
    }
}
 
//ping 连接测试
func PingNode() {
    start := time.Now()
 
    info, code, err := client.Ping(host[0]).Do(context.Background())
    if err != nil {
        fmt.Printf("ping es failed, err: %v", err)
    }
 
    duration := time.Since(start)
    fmt.Printf("cost time: %v\n", duration)
    fmt.Printf("Elasticsearch returned with code %d and version %s\n", code, info.Version.Number)
}
 
//校验 index 是否存在
func IndexExists(index ...string) bool {
    exists, err := client.IndexExists(index...).Do(context.Background())
    if err != nil {
        fmt.Printf("%v\n", err)
    }
    return exists
}
 
//创建 index
func CreateIndex(index, mapping string) bool {
    result, err := client.CreateIndex(index).BodyString(mapping).Do(context.Background())
    if err != nil {
        fmt.Printf("create index failed, err: %v\n", err)
    }
    return result.Acknowledged
}
 
//删除 index
func DelIndex(index... string) bool {
    response, err := client.DeleteIndex(index...).Do(context.Background())
    if err != nil {
        fmt.Printf("delete index failed, err: %v\n", err)
    }
    return response.Acknowledged
}
 
//批量插入
func Batch(index string, type_ string, datas... interface{})  {
 
    bulkRequest := client.Bulk()
    for i, data := range datas {
        doc := elastic.NewBulkIndexRequest().Index(index).Type(type_).Id(strconv.Itoa(i)).Doc(data)
        bulkRequest = bulkRequest.Add(doc)
    }
 
    response, err := bulkRequest.Do(context.TODO())
    if err != nil {
        panic(err)
    }
    failed := response.Failed()
    iter := len(failed)
    fmt.Printf("error: %v, %v\n", response.Errors,  iter)
}
 
//获取指定 Id 的文档
func GetDoc(index, id string) []byte {
    temp := client.Get().Index(index).Id(id)
    get, err := temp.Do(context.Background())
    if err != nil {
        panic(err)
    }
    if get.Found {
        fmt.Printf("Got document %s in version %d from index %s, type %s\n", get.Id, get.Version, get.Index, get.Type)
    }
    source, err := get.Source.MarshalJSON()
    if err != nil {
        fmt.Printf("byte convert string failed, err: %v", err)
    }
    return source
}
 
//term 查询
func TermQuery(index, type_, fieldName, fieldValue string) *elastic.SearchResult {
    query := elastic.NewTermQuery(fieldName, fieldValue)
    //_ = elastic.NewQueryStringQuery(fieldValue) //关键字查询
 
    searchResult, err := client.Search().
        Index(index).Type(type_).
        Query(query).
        From(0).Size(10).
        Pretty(true).
        Do(context.Background())
 
    if err != nil {
        panic(err)
    }
    fmt.Printf("query cost %d millisecond.\n", searchResult.TookInMillis)
 
    return searchResult
}
 
func Search(index, type_ string) *elastic.SearchResult {
    boolQuery := elastic.NewBoolQuery()
    boolQuery.Must(elastic.NewMatchQuery("user", "Jame10"))
    boolQuery.Filter(elastic.NewRangeQuery("age").Gt("30"))
    searchResult, err := client.Search(index).
        Type(type_).Query(boolQuery).Pretty(true).Do(context.Background())
    if err != nil {
        panic(err)
    }
 
    return searchResult
}
 
func AggsSearch(index, type_ string) {
 
    minAgg := elastic.NewMinAggregation().Field("age")
    rangeAgg := elastic.NewRangeAggregation().Field("age").AddRange(0,30).AddRange(30,60).Gt(60)
 
 
    build := client.Search(index).Type(type_).Pretty(true)
 
    minResult, err := build.Aggregation("minAgg", minAgg).Do(context.Background())
    rangeResult, err := build.Aggregation("rangeAgg", rangeAgg).Do(context.Background())
    if err != nil {
        panic(err)
    }
 
    minAggRes, _ := minResult.Aggregations.Min("minAgg")
    fmt.Printf("min: %v\n", *minAggRes.Value)
 
    rangeAggRes, _ := rangeResult.Aggregations.Range("rangeAgg")
    for _, item := range rangeAggRes.Buckets {
        fmt.Printf("key: %s, value: %v\n", item.Key, item.DocCount)
    }
    
}
 
测试类 es_test.go

package es
 
import (
    "encoding/json"
    "fmt"
    "github.com/olivere/elastic"
    "reflect"
    "testing"
    "time"
)
 
type Tweet struct {
    User     string                `json:"user"`
    Age      int                   `json:"age"`
    Message  string                `json:"message"`
    Retweets int                   `json:"retweets"`
    Image    string                `json:"image,omitempty"`
    Created  time.Time             `json:"created,omitempty"`
    Tags     []string              `json:"tags,omitempty"`
    Location string                `json:"location,omitempty"`
    Suggest  *elastic.SuggestField `json:"suggest_field,omitempty"`
}
 
var mapping = `{
    "settings":{
        "number_of_shards": 3,
        "number_of_replicas": 1
    },
    "mappings":{
        "doc":{
            "properties":{
                "user":{
                    "type":"keyword"
                },
                "age":{
                    "type": "integer"
                },
                "message":{
                    "type":"text",
                    "store": true,
                    "fielddata": true
                },
                "image":{
                    "type":"keyword"
                },
                "created":{
                    "type":"date"
                },
                "tags":{
                    "type":"keyword"
                },
                "location":{
                    "type":"geo_point"
                },
                "suggest_field":{
                    "type":"completion"
                }
            }
        }
    }
}`
 
func TestPingNode(t *testing.T) {
    PingNode()
}
 
func TestIndexExists(t *testing.T) {
    result := IndexExists("car_source", "test")
    fmt.Println("all index exists: ", result)
}
 
func TestDeleteIndex(t *testing.T) {
    result := DelIndex("twitter")
    fmt.Println("all index deleted: ", result)
}
 
func TestCreateIndex(t *testing.T) {
    result := CreateIndex("twitter", mapping)
    fmt.Println("mapping created: ", result)
}
 
func TestBatch(t *testing.T) {
    tweet1 := Tweet{User: "Jame1",Age: 23, Message: "Take One", Retweets: 1, Created: time.Now()}
    tweet2 := Tweet{User: "Jame2",Age: 32, Message: "Take Two", Retweets: 0, Created: time.Now()}
    tweet3 := Tweet{User: "Jame3",Age: 32, Message: "Take Three", Retweets: 0, Created: time.Now()}
    Batch("twitter", "doc", tweet1, tweet2, tweet3)
}
 
func TestGetDoc(t *testing.T) {
    var tweet Tweet
    data := GetDoc("twitter", "1")
    if err := json.Unmarshal(data, &tweet); err == nil {
        fmt.Printf("data: %v\n", tweet)
    }
}
 
func TestTermQuery(t *testing.T) {
    var tweet Tweet
    result := TermQuery("twitter", "doc", "user", "Take Two")
    //获得数据, 方法一
    for _, item := range result.Each(reflect.TypeOf(tweet)) {
        if t, ok := item.(Tweet); ok {
            fmt.Printf("tweet : %v\n", t)
        }
    }
    //获得数据, 方法二
    fmt.Println("num of raws: ", result.Hits.TotalHits)
    if result.Hits.TotalHits > 0 {
        for _, hit := range result.Hits.Hits {
            err := json.Unmarshal(*hit.Source, &tweet)
            if err != nil {
                fmt.Printf("source convert json failed, err: %v\n", err)
            }
            fmt.Printf("data: %v\n", tweet)
        }
    }
}
 
func TestSearch(t *testing.T) {
    result := Search("twitter", "doc")
    var tweet Tweet
    for _, item := range result.Each(reflect.TypeOf(tweet)) {
        if t, ok := item.(Tweet); ok {
            fmt.Printf("tweet : %v\n", t)
        }
    }
}
 
func TestAggsSearch(t *testing.T) {
    AggsSearch("twitter", "doc")
}
————————————————
原文链接:https://blog.csdn.net/qq_21835703/article/details/87364904

返回列表