背景
最近我们有个需求,需要去同步上游数据,并进行数据同步,数据清洗,数据整合,数据推送的功能;要求在接收到上游客户端的请求,才执行该操作;我们这里分享该功能实现,包括踩的一些坑(异步操作时sqoop推数失败的情况);
需求分析
需求介绍: 接收到请求之后,执行一系列的脚本,实现数据同步,数据整合,数据推送的功能,并及时告知下游系统,让下游系统做后续的操作;
方案设计
框架选型:Python的tornado框架,版本5.1
实现逻辑:采用tornado的异步架构,做成web,分为两个接口,这里简称接口1和接口2;
接口1主要是接收第一次请求,接收到请求后,直接和请求端断掉连接(使用tornado的self.finish函数可实现),并异步执行数据同步,数据清洗,数据整合,数据推送的功能;
接口2主要是告知请求端当前接口1的执行状态;
代码实现
#coding:utf8from tornado.web import RequestHandler
from util import commonutil
import logging
import json
import configparser
from pyhive import hive
from elasticsearch import Elasticsearch
import requests
import time
import tornado.gen
import subprocess
from tornado.process import Subprocess
import tornado.gen@tornado.gen.coroutine
def run_command(command):''':param cmd::return:'''process = Subprocess([command],stdout=Subprocess.STREAM,stderr=Subprocess.STREAM,shell=True)out, err = yield [process.stdout.read_until_close(),process.stderr.read_until_close()]raise tornado.gen.Return((out,err))class Class1(RequestHandler):async def post(self, *args, **kwargs):jd = self.request.body.decode("utf-8")jsonData = json.loads(jd)loggers.info("==数据同步整合重跑==")self.set_header('Content-Type','application/json;charset=UTF-8')result = {}try:mysql_conn = commonutil.connect_mysql(auto_ip,auto_port,auto_db,auto_user,auto_passwd)cursor = mysql_conn.cursor() result['status'] = 0result['msg'] = '参数检查正常,正在进行理财重跑'self.finish(json.dumps(result))os.chdir("需要执行的脚本目录")print("executing-------------")cmd = 'python3 script.py {} {} {}'.format(mainDir,data_dt,prdnbrs)print(cmd)r_cmd_res,r_cmd_err = await run_command(cmd)os.chdir(mainDir)except Exception as ex:os.chdir(mainDir)loggers.info(ex)raise exdef get(self,*args,**kwargs):pass
注:1.这里只是提供了class类,需要使用时在main.py中注册即可;
2.采用self.finish的方法,可以和请求端断掉连接,并可以进行后续的操作;
3.script.py是一个封装的代码,里面包含了数据同步,数据整合,数据导出等所有的功能,其中数据同步和数据导出是采用sqoop进行处理的;
4.我们在生产上部署过程中,遇到了一个特别坑的问题,这种问题并不是每次都会出现,因此找了好久才发现这个问题,问题的大致情况是执行到数据同步时就会出现程序阻塞的情况,一致卡在那,自己执行不了,还会影响其他的程序执行;经过多测测试,我们发现是nohup的问题;
大家可以参考如下的的文章:https://blog.csdn.net/haha_keke/article/details/122218675
我们在linux服务器上部署tornado服务的时候,是采用
nohup python3 main.py &
的方式进行部署的,这种方式在执行过程中会出现阻塞,尤其是sqoop抽数这种和外部有交互的情况;
总结
采用tornado异步的方式部署一个web服务,用于接收客户端的请求,并执行python脚本;