Elasticsearch py

1 建立连接

1.1 无用户密码

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])

1.2 有用户密码

from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}], http_auth=('xiao', '123456'), timeout=3600)

2 添加记录

doc = {}
doc["xx"] = "xx"
# 需要存储为 UTC 时间
doc["timestamp"] = datetime.utcnow()
es_res = es.index(index=index.lower(), id=es_id, document=doc)

返回

{
    u'_type': u'_doc',                                # 文档所在的类型名
    u'_seq_no': 43,
    u'_shards': {                                     # _shards表示索引操作的复制过程的信息。
        u'successful': 1,                             # 表示索引操作成功的分片副本数。
        u'failed': 0,                                 # 在副本分片上索引操作失败的情况下包含复制相关错误。
        u'total': 2                                   # 指示应在其上执行索引操作的分片副本(主分片和副本分片)的数量。
    },
    u'_index': u'vm-unit_2024-03-18',                 # 文档所在的索引名
    u'_version': 7,                                   # 文档的版本
    u'_primary_term': 3,
    u'result': u'updated',
    u'_id': u'c1db1115-3cc3-4602-a2a9-8c04aa94ed7f'   # 本例子使用的 host uuid
}

2.1 字段说明

2.1.1 _seq_no (顺序号)

严格递增的顺序号,每个文档一个,Shard级别严格递增,保证后写入的Doc的_seq_no大于先写入的Doc的_seq_no。

任何类型的写操作,包括index、create、update和Delete,都会生成一个_seq_no。

2.1.2 _version (版本)

Elasticsearch中每个Doc都会有一个Version,该Version可以由用户指定,也可以由系统自动生成。如果是系统自动生成,那么每次Version都是递增1。

2.1.3 _primary_term (primary 编号)

_primary_term 也和 _seq_no 一样是一个整数,每当 Primary Shard 发生重新分配时,比如重启,Primary 选举等,_primary_term 会递增 1。

2.1.4 _id

Doc 的主键,在写入的时候,可以指定该Doc的ID值,如果不指定,则系统自动生成一个唯一的 UUID值。

3 检索数据

es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')

常用参数

  • index - 索引名

  • q - 查询指定匹配 使用 Lucene 查询语法

  • from_ - 查询起始点 默认0

  • doc_type - 文档类型

  • size - 指定查询条数 默认10

  • field - 指定字段 逗号分隔

  • sort - 排序 字段:asc/desc

  • body - 使用Query DSL

  • scroll - 滚动查询

返回

{
    "hits": {
        "hits": [
            {
                "_score": 1.0,
                "_type": "_doc",
                "_id": "ede22935-b9a4-4460-966c-20ba8b992721",
                "_source": {
                    "timestamp": "2022-08-15T17:15:14.775904",
                    "free_mem_size_in_mb": 44683,
                    "free_cpu_in_core": 1,
                    "host_ip": "xx.xx.xx.xx"
                },
                "_index": "sandbox_xxx_azone-xxx"
            },
            ...
        ],
        "total": {
            "relation": "eq",
            "value": 7
        },
        "max_score": 1.0
    },
    "_shards": {
        "successful": 1,
        "failed": 0,
        "skipped": 0,
        "total": 1
    },
    "took": 101,
    "timed_out": false
}

3.1 字段说明

3.1.1 _source (原文)

Elasticsearch 中有一个重要的概念是 source,存储原始文档,也可以通过过滤设置只存储特定Field。

3.1.2 其他字段

  1. · took代表搜索执行时间(单位:毫秒);

  2. · total代表本次搜索命中的文档数量;

  3. · max_score为最大得分,代表文档匹配度;

  4. · hits为搜索命中的结果列表,默认为10条。

3.2 Query DSL

3.2.1 查询所有数据

# 方式1:
es.search(index="index_name")

# 方式2:
body = {
    "query":{
        "match_all":{}
    }
}
es.search(index="index_name", body=body)

3.2.2 等于查询,term与terms

# term: 查询 xx = “xx”
body = {
    "query":{
        "term":{
            "name":"python"
        }
    }
}
# 查询name="python"的所有数据
es.search(index="index_name", body=body)

# terms: 查询 xx = “xx” 或 xx = “yy”
body = {
    "query":{
        "terms":{
            "name":["ios","android"]
        }
    }
}

# 查询出name="ios"或name="android"的所有数据
es.search(index="index_name", body=body)

3.2.4 包含查询,match与multi_match

# match: 匹配name包含"python"关键字的数据
body = {
    "query":{
        "match":{
            "name":"python"
        }
    }
}
# 查询name包含python关键字的数据
es.search(index="index_name",body=body)

# multi_match: 在name和addr里匹配包含深圳关键字的数据
body = {
    "query":{
        "multi_match":{
            "query":"深圳",
            "fields":["name", "addr"]
        }
    }
}
# 查询name和addr包含"深圳"关键字的数据
es.search(index="index_name",body=body)

3.2.4 ids

body = {
    "query":{
        "ids":{
            "values":[
                "1","2"
            ]
        }
    }
}
# 搜索出id为1或2的所有数据
es.search(index="index_name",body=body)

3.2.5 复合查询bool

bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)

body = {
    "query":{
        "bool":{
            "must":[
                {
                    "term":{
                        "name":"python"
                    }
                },
                {
                    "term":{
                        "age":18
                    }
                }
            ]
        }
    }
}
# 获取name="python"并且age=18的所有数据
es.search(index="index_name",body=body)

3.2.6 切片式查询

body = {
    "query":{
        "match_all":{}
    }
    "from":2    # 从第二条数据开始
    "size":4    # 获取4条数据
}
# 从第2条数据开始,获取4条数据
es.search(index="index_name",body=body)

3.2.7 范围查询

body = {
    "query":{
        "range":{
            "age":{
                "gte":18,       # >=18
                "lte":30        # <=30
            }
        }
    }
}
# 查询18<=age<=30的所有数据
es.search(index="index_name",body=body)

3.2.8 前缀查询

body = {
    "query":{
        "prefix":{
            "name":"p"
        }
    }
}

# 查询前缀为"赵"的所有数据
es.search(index="index_name",body=body)

3.2.9 通配符查询

body = {
    "query":{
        "wildcard":{
            "name":"*id"
        }
    }
}
# 查询name以id为后缀的所有数据
es.search(index="index_name",body=body)

3.2.10 排序

body = {
    "query":{
        "match_all":{}
    }
    "sort":{
        "age":{                 # 根据age字段升序排序
            "order":"asc"       # asc升序,desc降序
        }
    }
}

# 多字段排序,注意顺序!写在前面的优先排序
body = {
    "query":{
        "match_all":{}
    }
    "sort":[{
        "age":{                # 先根据age字段升序排序
            "order":"asc"      # asc升序,desc降序
        }
    },{
        "name":{               # 后根据name字段升序排序
            "order":"asc"      # asc升序,desc降序
        }
    }],
}

3.2.11 filter_path, 响应过滤

# 只需要获取_id数据,多个条件用逗号隔开
es.search(index="index_name",filter_path=["hits.hits._id"])

# 获取所有数据
es.search(index="index_name",filter_path=["hits.hits._*"])

3.2.12 count, 执行查询并获取该查询的匹配数

# 获取数据量
es.count(index="index_name")

3.2.13 度量类聚合

获取最小值

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "min_age":{                 # 最小值的key
            "min":{                 # 最小
                "field":"age"       # 查询"age"的最小值
            }
        }
    }
}
# 搜索所有数据,并获取age最小的值
es.search(index="index_name",body=body)

获取最大值

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "max_age":{                 # 最大值的key
            "max":{                 # 最大
                "field":"age"       # 查询"age"的最大值
            }
        }
    }
}
# 搜索所有数据,并获取age最大的值
es.search(index="index_name",body=body)

获取和

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "sum_age":{                 # 和的key
            "sum":{                 # 和
                "field":"age"       # 获取所有age的和
            }
        }
    }
}
# 搜索所有数据,并获取所有age的和
es.search(index="index_name",body=body)

例子

body = {
    "query":{
        "bool":{
            "must":[
                {
                    "term":{
                        "env_type": "online"
                    }
                }
            ]
        }
    },
    "aggs":{
        "cpu_alloc_sum":{
            "sum":{
                "field":"allocated_cpu_in_core"
            }
        },
        "cpu_total_sum":{
            "sum":{
                "field":"total_cpu_in_core"
            }
        },
        "mem_alloc_sum":{
            "sum":{
                "field":"allocated_mem_size_in_mb"
            }
        },
        "mem_total_sum":{
            "sum":{
                "field":"total_mem_size_in_mb"
            }
        },
        "disk_alloc_sum":{
            "sum":{
                "field":"allocated_disk_size_in_gb"
            }
        },
        "disk_total_sum":{
            "sum":{
                "field":"total_disk_size_in_gb"
            }
        },
    }
}
res = es.search(index=index, body=body, size=0)
"""
res = {
    "hits": {
        "hits": [],
        "total": {
            "relation": "eq",
            "value": 536
        },
        "max_score": null
    },
    "_shards": {
        "successful": 1,
        "failed": 0,
        "skipped": 0,
        "total": 1
    },
    "took": 2,
    "aggregations": {
        "disk_total_sum": {
            "value": 241200.0
        },
        "mem_alloc_sum": {
            "value": 17860717.0
        },
        "cpu_alloc_sum": {
            "value": 2209.0
        },
        "cpu_total_sum": {
            "value": 4288.0
        },
        "disk_alloc_sum": {
            "value": 110868.0
        },
        "mem_total_sum": {
            "value": 28944000.0
        }
    },
    "timed_out": false
}
"""

获取平均值

body = {
    "query":{
        "match_all":{}
    },
    "aggs":{                        # 聚合查询
        "avg_age":{                 # 平均值的key
            "sum":{                 # 平均值
                "field":"age"       # 获取所有age的平均值
            }
        }
    }
}
# 搜索所有数据,获取所有age的平均值
es.search(index="index_name",body=body)

3.2.14 from、size

from:从“第几条”开始查询
size:查询多少条
body = {
    "query":{
        "match_all":{}
    },
    "size":"50",
    "from":"0"
}

3.2.15 指定查询的返回结果

指定需要返回的字段,类似sql语句的

select order_id,status,created_at from 表 where uid=469 order by cteated_at desc limit10
{
    "_source":["order_id","status","created_at"],
    "sort":{
        "created_at":"desc"
    },
    "from":0,
    "size":10,
	"query": {
		"term": {
			"uid":"469"
		}
	}
}

3.3 注意事项

3.3.1 数值范围查询 range

range 中有 4 个字段:(g 是 greater 的缩写,t 是 than 的缩写,l 是 less 的缩写,e 是 equal 的缩写)可以单个或者多个一起使用

  • gt : 大于 >

  • lt : < 小于

  • gte : <= 大于等于

  • lte :<= 小于等于

3.3.2 should 和 terms 的区别

should 可以连接多个不同的字段

{
    "size":0,
    "track_total_hits":true,
    "query":{
     "bool":{
                "should":[
                    {"match":{"invoice.invoice_type":"026"}},
                    {"match":{"buyer.title":"高灯"}}
                ]
            }
    }
}

terms只能连接一个字段的多个值

{
    "size":0,
    "track_total_hits":true,
    "query":{
        "terms":{
            "red_status":[2,5,8]
        }
    }
}

3.3.3 时间范围

(1) 方式一:

GET website/_search
{
    "query": {
        "range":{
            "post_date": {
            	"gte": "now-1d/d",	// 当前时间的上一天, 四舍五入到最近的一天
            	"lt":  "now/d"		// 当前时间, 四舍五入到最近的一天
        	}
        }
    }
}

Elasticsearch 中时间可以表示为now, 也就是系统当前时间, 也可以是以||结尾的日期字符串表示.

在日期之后, 可以选择一个或多个数学表达式:

  • +1h —— 加1小时;

  • -1d —— 减1天;

  • /d —— 四舍五入到最近的一天.

下面是Elasticsearch支持数学表达式的时间单位:

表达式
含义
表达式
含义

y

M

w

星期

d

h

小时

H

小时

m

分钟

s

说明: 假设系统当前时间now = 2018-10-01 12:00:00 :

  • now+1h: now的毫秒值 + 1小时, 结果是: 2018-10-01 13:00:00.

  • now-1h: now的毫秒值 - 1小时, 结果是: 2018-10-01 11:00:00.

  • now-1h/d: now的毫秒值 - 1小时, 然后四舍五入到最近的一天的起始, 结果是: 2018-10-01 00:00:00.

  • 2018.10.01||+1M/d: 2018-10-01的毫秒值 + 1月, 再四舍五入到最近一天的起始, 结果是: 2018-11-01 00:00:00

方式二:

2022-08-18T01:18:01

4 强制刷新

es.indices.refresh(index="test-index")

我们知道 ES 的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为 IO 的操作是比较费时的。

当一个文档进入 ES 的初期, 文档是被存储到内存里的,默认经过 1s 之后, 会被写入文件系统缓存,这样该文档就可以被搜索到了,注意,此时该索引数据被没有最终写入到磁盘上。如果你对这 1s 的时间间隔还不满意, 调用 _refresh 就可以立即实现内存->文件系统缓存, 从而使文档可以立即被搜索到。

所以 refresh 实现的是文档数据从内存到文件系统缓存的过程。

Last updated