Yiwoqu 发表于 2021-5-18 17:02

【Python】【笔记】Python 控制ElasticSearch

#####################创建索引####################################
from elasticsearch import Elasticsearch
from tqdm import tqdm #进度条
from elasticsearch import helpers
import codecs

# 创建Es低级客户端 Python好像只有低级客户端
# 高级客户端和低级客户端的主要区别在查询
# 高级客户端查询通过调用高级API低级客户端查询通过常规的查询方式
es = Elasticsearch()

# 确保不会重复创建
def deleteInices(my_index):
    if True and es.indices.exists(my_index):
      print("删除之前存在的")
      es.indices.delete(index=my_index)


def createIndex(my_index):
    # 修改配置 7.x版本之后 my_doc默认_doc
    settings = \
      {
            "mappings": {
                  "properties": {
                        "my_id": {"type": "integer"},
                        "my_word": {"type": "text", "analyzer": "ik_smart", "search_analyzer": "ik_smart"}
                  }
                }
      }
    # create index
    es.indices.create(index=my_index, ignore=400, body=settings)
    print("创建index成功!")





#####################插入数据####################################

def getAllWords(path="para_and_test_split.txt"):
    #将数据从文件读出
    #文件格式:
    #
    words = []
    with open(path, "r", encoding="utf-8") as f:
      for i,item in enumerate(f.readlines()):
            words.append((i,item.strip()))
    return words

def insertData(words, my_index, my_doc, one_bulk):
    #插入数据
    #one_bulk表示一个bulk里装多少个
    body = []
    body_count = 0#记录body里面有多少个.
    #最后一个bulk可能没满one_bulk,但也要插入

    print("共需要插入%d条..."%len(words))
    pbar = tqdm(total=len(words))

    for id,word in words:
      data1 = { "my_id": id,
                  "my_word": word}
      every_body = \
      {
            "_index": my_index,
            "_type": my_doc,
            "_source": data1
      }

      if body_count<one_bulk:
            body.append(every_body)
            body_count+=1
      else:
            helpers.bulk(es, body) #还是要用bulk啊,不然太慢了
            pbar.update(one_bulk)
            body_count = 0
            body = []
            body.append(every_body)
            body_count+=1

    if len(body)>0:
      #如果body里面还有,则再插入一次(最后非整块的)
      helpers.bulk(es, body)
      # pbar.update(len(body))
      print('done2')

    pbar.close()
    #res = es.index(index=my_index,doc_type=my_doc,id=my_key_id,body=data1)#一条插入
    print("插入数据完成!")



#####################检索数据####################################

def keywordSearch(keywords1, my_index):
    # 根据keywords1来查找,倒排索引
    my_search1 = \
      {
            "query": {
                "match": {
                  "my_word": keywords1
                }
            }
      }
    # 直接查询
    res= es.search(index=my_index,body=my_search1)
    for line in res["hits"]["hits"]:
      print(line)
      # print(line["_source"]['my_word'])
    # total = res["hits"]["total"] #一共这么多个
    # print("共查询到%d条数据"%total)

    # helpers查询
    # es_result = helpers.scan(
    #   client=es,
    #   query=my_search1,
    #   scroll='10m',
    #   index=my_index,
    #   timeout='10m'
    # )
    # es_result = # 原始是生成器<generator object scan at 0x0000022E384697D8>
    # # print(es_result) #你可以直接打印查看
    # search_res = []
    # for item in es_result:
    #   tmp = item['_source']
    #   search_res.append((tmp['my_id'], tmp['my_word']))
    #   print(tmp['my_word'])
    # print("共查询到%d条数据" % len(es_result))

def mainCreateIndex():
    # 调用后创建index
    my_index = "word2vec_index"
    deleteInices(my_index)
    createIndex(my_index)

def mainInsert():
    # 调用后插入数据
    my_index = "word2vec_index"
    my_doc = "_doc"
    words = getAllWords(path="para_and_test_split.txt")
    insertData(words, my_index, my_doc, one_bulk=5000)

def mainSearch():
    # 调用后检索数据
    my_index = "word2vec_index"
    keywords1 = "特朗普的支持率"
    keywordSearch(keywords1, my_index)

if __name__ == '__main__':
    # mainCreateIndex()
    # mainInsert()
    mainSearch()
页: [1]
查看完整版本: 【Python】【笔记】Python 控制ElasticSearch