Python-ElasticSearch客户端的封装(聚合查询、统计查询、全量数据)

chatgpt/2023/9/24 2:13:46

目录

  • ES Python客户端介绍
  • 封装代码
  • 测试代码
  • 参考


ES Python客户端介绍

官方提供了两个客户端elasticsearch、elasticsearch-dsl

pip install elasticsearch
pip install elasticsearch-dsl

第二个是对第一个的封装,类似ORM操作数据库,可以.filter、.groupby,个人感觉很鸡肋,star数也不多。平时使用的时候一般会在kibana上测试,然后直接把query拷贝过来获取更多数据,所以这里做下第一个的封装。

封装代码

  1. 封装后依然暴露了es,方便有特殊情况下使用
  2. index一般很少改动,就直接放到对象中了,可以使用set_index修改
  3. 常用的应该是get_doc和get_doc_scroll来获取少量和全量数据

代码测试时使用的是7.17.12版本,大于此版本可能由于官方改动出异常

pip install elasticsearch==7.17.12

es.py

import random
import string
from elasticsearch import Elasticsearch
from elasticsearch.helpers import bulk
from typing import List,Dictclass ESClient:def __init__(self, host="127.0.0.1",index="", http_auth = None):self.index = indexif http_auth is None:self.es = Elasticsearch(hosts=host)else:self.es = Elasticsearch(hosts=host, http_auth=http_auth)print("success to connect " + host)def close(self):self.es.close()# 设置索引def set_index(self,index:str):self.index = index# 创建索引def create_index(self, index_name: str, mappings=None):res = self.es.indices.create(index=index_name, mappings=mappings)return res# 删除索引def delete_index(self, index_name: str):res = self.es.indices.delete(index=index_name)return res# 获取索引def get_index(self, index_name: str):res = self.es.indices.get(index=index_name)return res# 创建文档(单个)def create_doc(self,body, _id=''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))):res = self.es.create(index=self.index, body=body, id=_id)return res# 创建文档(批量)def create_doc_bulk(self, docs: List[Dict]):actions = []for doc in docs:action = {"_index": self.index,"_op_type": "create","_id": ''.join(random.sample(string.ascii_letters+string.ascii_uppercase+string.digits,20))}for k,v in doc.items():action[k] = vactions.append(action)res = bulk(client=self.es, actions=actions)return res# 删除文档def delete_doc(self, doc_id):res = self.es.delete(index=self.index, id=doc_id)return res# 更新文档def update_doc(self, doc_id, doc:Dict):body = {"doc" : doc}res = self.es.update(index=self.index, id=doc_id, body=body)return res# 分页获取超过100000的文档def get_doc_scroll(self,query:Dict):res = self.es.search(index=self.index,size=10000,body=query,search_type="query_then_fetch",scroll="5m")data_list = []hits = res.get("hits")scroll_id = res.get('_scroll_id')total_value = 0# total 可能为Dict或intif isinstance(hits.get('total'),Dict):total_value= hits.get('total').get('value')else:total_value = hits.get('total')if total_value>0:for data in hits.get('hits'):data_list.append(data.get('_source'))return scroll_id,data_list# 通过scroll_id分页获取后序文档def get_doc_by_scroll_id(self,scroll_id):page = self.es.scroll(scroll_id=scroll_id,scroll="5m")data_list = []scroll_id = page.get('_scroll_id')for data in page.get('hits').get('hits'):data_list.append(data)return scroll_id,data_list# 清空scroll_id,防止服务端不够用def clear_scroll(self,scroll_id):self.es.clear_scroll(scroll_id)# 获取索引的hits内容(一般用于获取文档id、总数)def get_doc_all(self):res = self.es.search(index=self.index)return res['hits']# 获取一个文档def get_doc_by_id(self, id_):res = self.es.get(index=self.index, id=id_)return res["_source"]# 获取所有文档的_source内容(小于100000)def get_doc(self,query:Dict,size:int=100000):query['size'] = sizeres = self.es.search(index=self.index,body=query)data_list = []hits = res.get("hits")total_value = 0# total 可能为Dict或intif isinstance(hits.get('total'), Dict):total_value = hits.get('total').get('value')else:total_value = hits.get('total')if total_value > 0:for data in hits.get('hits'):data_list.append(data.get('_source'))return data_list# 聚合查询(分组条件名为group_by,返回buckets)def get_doc_agg(self, query):res = self.es.search(index=self.index, body=query)return res['aggregations']['group_by'].get('buckets')# 统计查询(统计条件为stats_by,返回最值、平均值等)def get_doc_stats(self,query):res = self.es.search(index=self.index,body=query)return res['aggregations']["stats_by"]

测试代码

import unittest
from es import ESClientcli = ESClient(host="http://10.28.144.3:9200",http_auth=["elastic","changeme"])
def test_create_index():res = cli.create_index(index_name="test")print(res)def test_delete_index():res = cli.delete_index(index_name="test")print(res)def test_get_index():res = cli.get_index(index_name="test")print(res)def test_set_index():cli.set_index(index="test")def test_create_doc():body = {"name": "lady_killer9","age": 19}res = cli.create_doc(body=body)print(res)def test_create_doc_bulk():from copy import deepcopybody = {"name": "lady_killer9"}users = []for i in range(100001):tmp = deepcopy(body)tmp["age"] = iusers.append(tmp)res = cli.create_doc_bulk(docs=users)print(res)def test_get_doc_all():res = cli.get_doc_all()print(res)def test_get_doc_by_id():res = cli.get_doc_by_id("jHALXDQaENQZPM4C9EUt")print(res)def test_get_doc():query = {"query": {"match_all": {}}}res = cli.get_doc(query=query,size=20)print(res)def test_update_doc():body={"name": "lady_killer_after_update"}res = cli.update_doc(doc_id="jHALXDQaENQZPM4C9EUt",doc=body)print(res)def test_delete_doc():res = cli.delete_doc(doc_id="jHALXDQaENQZPM4C9EUt")print(res)def test_get_doc_agg():query = {"aggs": {"group_by": {"terms": {"field": "age"}}}}res = cli.get_doc_agg(query=query)print(res)def test_get_doc_stats():query = {"aggs": {"stats_by": {"stats": {"field": "age"}}}}res = cli.get_doc_stats(query=query)print(res)def test_get_doc_scroll():query = {"query": {"match_all": {}}}scroll_id,data_list = cli.get_doc_scroll(query=query)res = []while data_list:res.extend(data_list)scroll_id,data_list = cli.get_doc_by_scroll_id(scroll_id=scroll_id)print(len(res))if __name__ == '__main__':# test_delete_index()test_create_index()test_get_index()# test_set_index()# test_create_doc()# test_create_doc_bulk()# test_get_doc_all()# test_update_doc()# test_get_doc_by_id()# test_get_doc()# test_delete_doc()# test_get_doc_agg()# test_get_doc_stats()# test_get_doc_scroll()cli.close()

测试截图
在这里插入图片描述
更多python相关内容:【python总结】python学习框架梳理

本人b站账号:一路狂飚的蜗牛

有问题请下方评论,转载请注明出处,并附有原文链接,谢谢!如有侵权,请及时联系。如果您感觉有所收获,自愿打赏,可选择支付宝18833895206(小于),您的支持是我不断更新的动力。

参考

github-elasticsearch
github-elasticsearch-dsl

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-5312804.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

kernel32.dll如何修复,快速解决kernel32.dll缺失的方法

Kernel32.dll是Windows操作系统中一个重要的系统文件,对于系统的正常运行至关重要。然而,由于各种原因,用户可能会遇到kernel32.dll文件的缺失问题。今天小编就来给大家详细的介绍一下kernel32.dll这个文件,并且详细的介绍一下ker…

c++防火墙代码

以下是一个简单的基于C的防火墙代码示例&#xff1a; #include <iostream> #include <string> #include <vector>using namespace std;class Firewall { private:vector<string> allowed_ips; public:Firewall(vector<string> allowed_ips) {…

设计模式大白话——装饰者模式

装饰者模式 文章目录 装饰者模式一、概述二、应用场景三、代码示例四、小结 一、概述 ​ 装饰者模式&#xff0c;此模式最核心之处在于装饰二字&#xff0c;之所以需要装饰&#xff0c;是因为基础的功能无法满足需求&#xff0c;并且装饰是临时的&#xff0c;并不是永久的&…

IPIDEA参展ChinaJoy!探索未来创新科技的峰会之旅

中国最大的国际数码互动娱乐展会ChinaJoy即将于7月28日在上海举行&#xff0c;届时将聚集全球来自22个国家和地区的领先科技公司、创业者和技术专家&#xff0c;为参观者呈现一系列引人入胜的展览和活动。而IPIDEA作为参展商之一&#xff0c;将为参观者带来一场关于数字科技的奇…

一文了解 Android 车机如何处理中控的旋钮输入?

前言 上篇文章《从实体按键看 Android 车载的自定义事件机制》带大家了解了 Android 车机支持自定义输入的机制 CustomInputService。事实上&#xff0c;除了支持自定义事件&#xff0c;对于中控上常见的音量控制、焦点控制的旋钮事件&#xff0c;Android 车机也是支持的。 那…

【Java】javax.websocket

javax.websocket import org.springframework.context.annotation.Bean; import org.springframework.context.annotation.Configuration; import org.springframework.web.socket.server.standard.ServerEndpointExporter;Configuration public class WebSocketConfig {Bean

Qt 3. QSerialPortInfo显示串口信息在QTextEdit显示

//ex2.cpp #include "ex2.h" #include "ui_ex2.h" #include <QtSerialPort/QtSerialPort>int static cnt 0;Ex2::Ex2(QWidget *parent): QDialog(parent), ui(new Ui::Ex2) {ui->setupUi(this); }Ex2::~Ex2() {delete ui; }void Ex2::on_pushBu

QT基于TCP协议实现数据传输以及波形绘制——安卓APP及Windows程序双版本

文章代码有非常非常之详细的解析&#xff01;&#xff01;&#xff01;诸位可放心食用 这个玩意我做了两个&#xff0c;一个是安卓app&#xff0c;一个是Windows程序。代码并非全部都是由我从无到有实现&#xff0c;只是实现了我想要的功能。多亏了巨人的肩膀&#xff0c;开源…
推荐文章