1 建立连接
1.1 无用户密码
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}])
1.2 有用户密码
from elasticsearch import Elasticsearch
es = Elasticsearch([{'host':'10.10.13.12','port':9200}], http_auth=('xiao', '123456'), timeout=3600)
2 添加记录
doc = {}
doc["xx"] = "xx"
# 需要存储为 UTC 时间
doc["timestamp"] = datetime.utcnow()
es_res = es.index(index=index.lower(), id=es_id, document=doc)
返回
{
u'_type': u'_doc', # 文档所在的类型名
u'_seq_no': 43,
u'_shards': { # _shards表示索引操作的复制过程的信息。
u'successful': 1, # 表示索引操作成功的分片副本数。
u'failed': 0, # 在副本分片上索引操作失败的情况下包含复制相关错误。
u'total': 2 # 指示应在其上执行索引操作的分片副本(主分片和副本分片)的数量。
},
u'_index': u'vm-unit_2024-03-18', # 文档所在的索引名
u'_version': 7, # 文档的版本
u'_primary_term': 3,
u'result': u'updated',
u'_id': u'c1db1115-3cc3-4602-a2a9-8c04aa94ed7f' # 本例子使用的 host uuid
}
2.1 字段说明
2.1.1 _seq_no (顺序号)
严格递增的顺序号,每个文档一个,Shard级别严格递增,保证后写入的Doc的_seq_no大于先写入的Doc的_seq_no。
任何类型的写操作,包括index、create、update和Delete,都会生成一个_seq_no。
2.1.2 _version (版本)
Elasticsearch中每个Doc都会有一个Version,该Version可以由用户指定,也可以由系统自动生成。如果是系统自动生成,那么每次Version都是递增1。
2.1.3 _primary_term (primary 编号)
_primary_term 也和 _seq_no 一样是一个整数,每当 Primary Shard 发生重新分配时,比如重启,Primary 选举等,_primary_term 会递增 1。
2.1.4 _id
Doc 的主键,在写入的时候,可以指定该Doc的ID值,如果不指定,则系统自动生成一个唯一的 UUID值。
3 检索数据
es.search(index='logstash-2015.08.20', q='http_status_code:5* AND server_name:"web1"', from_='124119')
常用参数
q - 查询指定匹配 使用 Lucene 查询语法
返回
{
"hits": {
"hits": [
{
"_score": 1.0,
"_type": "_doc",
"_id": "ede22935-b9a4-4460-966c-20ba8b992721",
"_source": {
"timestamp": "2022-08-15T17:15:14.775904",
"free_mem_size_in_mb": 44683,
"free_cpu_in_core": 1,
"host_ip": "xx.xx.xx.xx"
},
"_index": "sandbox_xxx_azone-xxx"
},
...
],
"total": {
"relation": "eq",
"value": 7
},
"max_score": 1.0
},
"_shards": {
"successful": 1,
"failed": 0,
"skipped": 0,
"total": 1
},
"took": 101,
"timed_out": false
}
3.1 字段说明
3.1.1 _source (原文)
Elasticsearch 中有一个重要的概念是 source,存储原始文档,也可以通过过滤设置只存储特定Field。
3.1.2 其他字段
· max_score为最大得分,代表文档匹配度;
3.2 Query DSL
3.2.1 查询所有数据
# 方式1:
es.search(index="index_name")
# 方式2:
body = {
"query":{
"match_all":{}
}
}
es.search(index="index_name", body=body)
3.2.2 等于查询,term与terms
# term: 查询 xx = “xx”
body = {
"query":{
"term":{
"name":"python"
}
}
}
# 查询name="python"的所有数据
es.search(index="index_name", body=body)
# terms: 查询 xx = “xx” 或 xx = “yy”
body = {
"query":{
"terms":{
"name":["ios","android"]
}
}
}
# 查询出name="ios"或name="android"的所有数据
es.search(index="index_name", body=body)
3.2.4 包含查询,match与multi_match
# match: 匹配name包含"python"关键字的数据
body = {
"query":{
"match":{
"name":"python"
}
}
}
# 查询name包含python关键字的数据
es.search(index="index_name",body=body)
# multi_match: 在name和addr里匹配包含深圳关键字的数据
body = {
"query":{
"multi_match":{
"query":"深圳",
"fields":["name", "addr"]
}
}
}
# 查询name和addr包含"深圳"关键字的数据
es.search(index="index_name",body=body)
3.2.4 ids
body = {
"query":{
"ids":{
"values":[
"1","2"
]
}
}
}
# 搜索出id为1或2的所有数据
es.search(index="index_name",body=body)
3.2.5 复合查询bool
bool有3类查询关系,must(都满足),should(其中一个满足),must_not(都不满足)
body = {
"query":{
"bool":{
"must":[
{
"term":{
"name":"python"
}
},
{
"term":{
"age":18
}
}
]
}
}
}
# 获取name="python"并且age=18的所有数据
es.search(index="index_name",body=body)
3.2.6 切片式查询
body = {
"query":{
"match_all":{}
}
"from":2 # 从第二条数据开始
"size":4 # 获取4条数据
}
# 从第2条数据开始,获取4条数据
es.search(index="index_name",body=body)
3.2.7 范围查询
body = {
"query":{
"range":{
"age":{
"gte":18, # >=18
"lte":30 # <=30
}
}
}
}
# 查询18<=age<=30的所有数据
es.search(index="index_name",body=body)
3.2.8 前缀查询
body = {
"query":{
"prefix":{
"name":"p"
}
}
}
# 查询前缀为"赵"的所有数据
es.search(index="index_name",body=body)
3.2.9 通配符查询
body = {
"query":{
"wildcard":{
"name":"*id"
}
}
}
# 查询name以id为后缀的所有数据
es.search(index="index_name",body=body)
3.2.10 排序
body = {
"query":{
"match_all":{}
}
"sort":{
"age":{ # 根据age字段升序排序
"order":"asc" # asc升序,desc降序
}
}
}
# 多字段排序,注意顺序!写在前面的优先排序
body = {
"query":{
"match_all":{}
}
"sort":[{
"age":{ # 先根据age字段升序排序
"order":"asc" # asc升序,desc降序
}
},{
"name":{ # 后根据name字段升序排序
"order":"asc" # asc升序,desc降序
}
}],
}
3.2.11 filter_path, 响应过滤
# 只需要获取_id数据,多个条件用逗号隔开
es.search(index="index_name",filter_path=["hits.hits._id"])
# 获取所有数据
es.search(index="index_name",filter_path=["hits.hits._*"])
3.2.12 count, 执行查询并获取该查询的匹配数
# 获取数据量
es.count(index="index_name")
3.2.13 度量类聚合
获取最小值
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"min_age":{ # 最小值的key
"min":{ # 最小
"field":"age" # 查询"age"的最小值
}
}
}
}
# 搜索所有数据,并获取age最小的值
es.search(index="index_name",body=body)
获取最大值
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"max_age":{ # 最大值的key
"max":{ # 最大
"field":"age" # 查询"age"的最大值
}
}
}
}
# 搜索所有数据,并获取age最大的值
es.search(index="index_name",body=body)
获取和
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"sum_age":{ # 和的key
"sum":{ # 和
"field":"age" # 获取所有age的和
}
}
}
}
# 搜索所有数据,并获取所有age的和
es.search(index="index_name",body=body)
例子
body = {
"query":{
"bool":{
"must":[
{
"term":{
"env_type": "online"
}
}
]
}
},
"aggs":{
"cpu_alloc_sum":{
"sum":{
"field":"allocated_cpu_in_core"
}
},
"cpu_total_sum":{
"sum":{
"field":"total_cpu_in_core"
}
},
"mem_alloc_sum":{
"sum":{
"field":"allocated_mem_size_in_mb"
}
},
"mem_total_sum":{
"sum":{
"field":"total_mem_size_in_mb"
}
},
"disk_alloc_sum":{
"sum":{
"field":"allocated_disk_size_in_gb"
}
},
"disk_total_sum":{
"sum":{
"field":"total_disk_size_in_gb"
}
},
}
}
res = es.search(index=index, body=body, size=0)
"""
res = {
"hits": {
"hits": [],
"total": {
"relation": "eq",
"value": 536
},
"max_score": null
},
"_shards": {
"successful": 1,
"failed": 0,
"skipped": 0,
"total": 1
},
"took": 2,
"aggregations": {
"disk_total_sum": {
"value": 241200.0
},
"mem_alloc_sum": {
"value": 17860717.0
},
"cpu_alloc_sum": {
"value": 2209.0
},
"cpu_total_sum": {
"value": 4288.0
},
"disk_alloc_sum": {
"value": 110868.0
},
"mem_total_sum": {
"value": 28944000.0
}
},
"timed_out": false
}
"""
获取平均值
body = {
"query":{
"match_all":{}
},
"aggs":{ # 聚合查询
"avg_age":{ # 平均值的key
"sum":{ # 平均值
"field":"age" # 获取所有age的平均值
}
}
}
}
# 搜索所有数据,获取所有age的平均值
es.search(index="index_name",body=body)
3.2.14 from、size
from:从“第几条”开始查询
size:查询多少条
body = {
"query":{
"match_all":{}
},
"size":"50",
"from":"0"
}
3.2.15 指定查询的返回结果
指定需要返回的字段,类似sql语句的
select order_id,status,created_at from 表 where uid=469 order by cteated_at desc limit10
{
"_source":["order_id","status","created_at"],
"sort":{
"created_at":"desc"
},
"from":0,
"size":10,
"query": {
"term": {
"uid":"469"
}
}
}
3.3 注意事项
3.3.1 数值范围查询 range
range 中有 4 个字段:(g 是 greater 的缩写,t 是 than 的缩写,l 是 less 的缩写,e 是 equal 的缩写)可以单个或者多个一起使用
3.3.2 should 和 terms 的区别
should 可以连接多个不同的字段
{
"size":0,
"track_total_hits":true,
"query":{
"bool":{
"should":[
{"match":{"invoice.invoice_type":"026"}},
{"match":{"buyer.title":"高灯"}}
]
}
}
}
terms只能连接一个字段的多个值
{
"size":0,
"track_total_hits":true,
"query":{
"terms":{
"red_status":[2,5,8]
}
}
}
3.3.3 时间范围
(1) 方式一:
GET website/_search
{
"query": {
"range":{
"post_date": {
"gte": "now-1d/d", // 当前时间的上一天, 四舍五入到最近的一天
"lt": "now/d" // 当前时间, 四舍五入到最近的一天
}
}
}
}
Elasticsearch 中时间可以表示为now
, 也就是系统当前时间, 也可以是以||
结尾的日期字符串表示.
在日期之后, 可以选择一个或多个数学表达式:
下面是Elasticsearch支持数学表达式的时间单位:
说明: 假设系统当前时间now = 2018-10-01 12:00:00
:
now+1h
: now的毫秒值 + 1小时, 结果是: 2018-10-01 13:00:00
.
now-1h
: now的毫秒值 - 1小时, 结果是: 2018-10-01 11:00:00
.
now-1h/d
: now的毫秒值 - 1小时, 然后四舍五入到最近的一天的起始, 结果是: 2018-10-01 00:00:00
.
2018.10.01||+1M/d
: 2018-10-01
的毫秒值 + 1月, 再四舍五入到最近一天的起始, 结果是: 2018-11-01 00:00:00
方式二:
4 强制刷新
es.indices.refresh(index="test-index")
我们知道 ES 的索引数据是写入到磁盘上的。但这个过程是分阶段实现的,因为 IO 的操作是比较费时的。
当一个文档进入 ES 的初期, 文档是被存储到内存里的,默认经过 1s 之后, 会被写入文件系统缓存,这样该文档就可以被搜索到了,注意,此时该索引数据被没有最终写入到磁盘上。如果你对这 1s 的时间间隔还不满意, 调用 _refresh 就可以立即实现内存->文件系统缓存, 从而使文档可以立即被搜索到。
所以 refresh 实现的是文档数据从内存到文件系统缓存的过程。