【Python】【笔记】Python 控制ElasticSearch
#####################创建索引####################################from elasticsearch import Elasticsearch
from tqdm import tqdm #进度条
from elasticsearch import helpers
import codecs
# 创建Es低级客户端 Python好像只有低级客户端
# 高级客户端和低级客户端的主要区别在查询
# 高级客户端查询通过调用高级API低级客户端查询通过常规的查询方式
es = Elasticsearch()
# 确保不会重复创建
def deleteInices(my_index):
if True and es.indices.exists(my_index):
print("删除之前存在的")
es.indices.delete(index=my_index)
def createIndex(my_index):
# 修改配置 7.x版本之后 my_doc默认_doc
settings = \
{
"mappings": {
"properties": {
"my_id": {"type": "integer"},
"my_word": {"type": "text", "analyzer": "ik_smart", "search_analyzer": "ik_smart"}
}
}
}
# create index
es.indices.create(index=my_index, ignore=400, body=settings)
print("创建index成功!")
#####################插入数据####################################
def getAllWords(path="para_and_test_split.txt"):
#将数据从文件读出
#文件格式:
#
words = []
with open(path, "r", encoding="utf-8") as f:
for i,item in enumerate(f.readlines()):
words.append((i,item.strip()))
return words
def insertData(words, my_index, my_doc, one_bulk):
#插入数据
#one_bulk表示一个bulk里装多少个
body = []
body_count = 0#记录body里面有多少个.
#最后一个bulk可能没满one_bulk,但也要插入
print("共需要插入%d条..."%len(words))
pbar = tqdm(total=len(words))
for id,word in words:
data1 = { "my_id": id,
"my_word": word}
every_body = \
{
"_index": my_index,
"_type": my_doc,
"_source": data1
}
if body_count<one_bulk:
body.append(every_body)
body_count+=1
else:
helpers.bulk(es, body) #还是要用bulk啊,不然太慢了
pbar.update(one_bulk)
body_count = 0
body = []
body.append(every_body)
body_count+=1
if len(body)>0:
#如果body里面还有,则再插入一次(最后非整块的)
helpers.bulk(es, body)
# pbar.update(len(body))
print('done2')
pbar.close()
#res = es.index(index=my_index,doc_type=my_doc,id=my_key_id,body=data1)#一条插入
print("插入数据完成!")
#####################检索数据####################################
def keywordSearch(keywords1, my_index):
# 根据keywords1来查找,倒排索引
my_search1 = \
{
"query": {
"match": {
"my_word": keywords1
}
}
}
# 直接查询
res= es.search(index=my_index,body=my_search1)
for line in res["hits"]["hits"]:
print(line)
# print(line["_source"]['my_word'])
# total = res["hits"]["total"] #一共这么多个
# print("共查询到%d条数据"%total)
# helpers查询
# es_result = helpers.scan(
# client=es,
# query=my_search1,
# scroll='10m',
# index=my_index,
# timeout='10m'
# )
# es_result = # 原始是生成器<generator object scan at 0x0000022E384697D8>
# # print(es_result) #你可以直接打印查看
# search_res = []
# for item in es_result:
# tmp = item['_source']
# search_res.append((tmp['my_id'], tmp['my_word']))
# print(tmp['my_word'])
# print("共查询到%d条数据" % len(es_result))
def mainCreateIndex():
# 调用后创建index
my_index = "word2vec_index"
deleteInices(my_index)
createIndex(my_index)
def mainInsert():
# 调用后插入数据
my_index = "word2vec_index"
my_doc = "_doc"
words = getAllWords(path="para_and_test_split.txt")
insertData(words, my_index, my_doc, one_bulk=5000)
def mainSearch():
# 调用后检索数据
my_index = "word2vec_index"
keywords1 = "特朗普的支持率"
keywordSearch(keywords1, my_index)
if __name__ == '__main__':
# mainCreateIndex()
# mainInsert()
mainSearch()
页:
[1]