最近在研究chunjun,它是一款稳定、易用、高效、批流一体的数据集成框架。一直在用chunjun做数据抽取测试,json任务重复地在写,感觉十分浪费时间,于是想写个自动生成json脚本。
1.设计模板
模板通过excel设计,主要记录任务中一些参数,每一行就是一个任务,如:MySQL库的ip、端口、库表还有hive的库表、hive数据存储路径 等等
2.编写Python代码
2.1.读取模板任务
def readList_extract_Info():"""获取 模板 中的 整体 任务 数据:return: 返回 所有任务 集合"""row_count = table.nrows-1for row_item in range(row_count):count = row_item+1list = table.row_values(count)job_list.append(list)return job_list
2.2.查询表分区并创建添加分区sql
chunjun 好像和 datax一样,不支持动态分区,所以在 数据抽取之前,需要创建分区,自动生成添加分区脚本。
def create_partition(job_list):sql_list=[]# 循环 出 每一个 任务 信息for i in range(len(job_list)):# 拼接 出 创建 分区 sqlsql = "alter table " + job_list[i][8] + "." + job_list[i][9] + " add if not exists partition("# 判断 任务 中 是否 为分区表,如果为分区表,那么就要根据 任务 中的分区值 创建分区if job_list[i][10]=="1":# 取出 分区 字段名partN_list = job_list[i][11].split(",")# 取出 分区 字段值partV_list = job_list[i][12].split(",")# 判断 分区字段个数是否 和 分区字段值个数 一致if len(partN_list)==len(partV_list):for item in range(len(partN_list)):# 将 分区字段名称 和 分区字段值 合并 类似:dt="2023",time="2024"partName = partN_list[item]+"=\""+partV_list[item]+"\" "if item == len(partN_list)-1:sql = sql + partName+");"else:sql = sql + partName+","sql_list.append(sql)print(job_list[i][9]+"---->添加分区sql 创建成功!----> "+sql)else:print("分区字段个数不匹配,填写有误,不添加分区")else:print(job_list[i][9]+"---->不是分区表")#判断 存储路径是否存在if not os.path.exists(output_path):# 不存在 创建os.makedirs(output_path)# 打开 存储文件,并写入 添加分区 sqlwith open(os.path.join(output_path, "create_partition_sql.sql"), "w", encoding='UTF-8') as f:for i in sql_list:f.write(i+"\n")
2.3.获取MySQL连接
def get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd):return pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, passwd=mysql_passwd)
2.4.获取数据源表的元数据
获取数据源表的字段名、字段类型
def get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):"""获取 mysql的元数据:param database::param table::param mysql_host::param mysql_port::param mysql_user::param mysql_passwd::return:"""connection = get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd)cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS " \"WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchall
把字段转换成想要 数组 [map]的形式:
注释:python3 需要把 map 外面再 套一层 list,不然会出异常
def get_mysql_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):return list(map(lambda x:{"name":x[0],"type":x[1]},get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)))
2.5.数据源表字段类型转换
注释:python3 需要把 map 外面再 套一层 list,不然会出异常
把字段转换成想要 数组 [map]的形式:
def get_hive_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))
2.6.生成json文件模型
def generate_json(list):# 判断 Hive的存储 路径 是否 填写if len(list[13])==0:list[13] ="/user/hive/warehouse/"# 判断 Hive的存储 路径 最后一个字符 为 /if list[13][-1]!="/":list[13]+="/"# 拼接 hive 文件 存储 路径 /user/hive/warehouse/stg.db/stu/path = list[13]+list[8]+".db/"+list[9]+"/"# 判断 是否 有分区 ,循环 分区if list[10]=="1":partN_list = list[11].split(",")partV_list = list[12].split(",")if len(partN_list) == len(partV_list):# hive 表若有分区,它的存储路径拼接 /user/hive/warehouse/stg.db/stu/dt=2023/time=2024for item in range(len(partN_list)):partName = partN_list[item] + "=" + partV_list[item] + "/"path=path+partNamejob = {"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": list[2],"password": list[3],"column": get_mysql_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),"splitPk": "","connection": [{"table": [list[5]],"jdbcUrl": ["jdbc:mysql://" + list[0] + ":" + str(int(list[1])) + "/" + list[4]]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + list[6] + ":" + str(int(list[7])),"fileType": list[15],"path": path,#"fileName": source_table,"column": get_hive_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),"writeMode": list[14],"fieldDelimiter": "\t"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([list[8], list[9], "json"])), "w", encoding='UTF-8') as f:json.dump(job, f)print("数据源表:" + list[5] + " 数据抽取到目标表:" + list[9] + " [chunjun json脚本已创建在【"+output_path+"】目录下]")
3.整体代码
# ecoding=utf-8
import json
import getopt
import os
import sys
import pymysql
import xlrd# 打开文件
data = xlrd.open_workbook("F:\\模板.xlsx")# 获取第一个sheet内容
table = data.sheet_by_index(0)job_list=[]#生成添加分区文件和json脚本的目标路径,可根据实际情况作出修改
output_path = "F:\\"def readList_extract_Info():"""获取 模板 中的 整体 任务 数据:return: 返回 所有任务 集合"""row_count = table.nrows-1for row_item in range(row_count):count = row_item+1list = table.row_values(count)job_list.append(list)return job_listdef create_partition(job_list):sql_list=[]# 循环 出 每一个 任务 信息for i in range(len(job_list)):# 拼接 出 创建 分区 sqlsql = "alter table " + job_list[i][8] + "." + job_list[i][9] + " add if not exists partition("# 判断 任务 中 是否 为分区表,如果为分区表,那么就要根据 任务 中的分区值 创建分区if job_list[i][10]=="1":# 取出 分区 字段名partN_list = job_list[i][11].split(",")# 取出 分区 字段值partV_list = job_list[i][12].split(",")# 判断 分区字段个数是否 和 分区字段值个数 一致if len(partN_list)==len(partV_list):for item in range(len(partN_list)):# 将 分区字段名称 和 分区字段值 合并 类似:dt="2023",time="2024"partName = partN_list[item]+"=\""+partV_list[item]+"\" "if item == len(partN_list)-1:sql = sql + partName+");"else:sql = sql + partName+","sql_list.append(sql)print(job_list[i][9]+"---->添加分区sql 创建成功!----> "+sql)else:print("分区字段个数不匹配,填写有误,不添加分区")else:print(job_list[i][9]+"---->不是分区表")#判断 存储路径是否存在if not os.path.exists(output_path):# 不存在 创建os.makedirs(output_path)# 打开 存储文件,并写入 添加分区 sqlwith open(os.path.join(output_path, "create_partition_sql"), "w", encoding='UTF-8') as f:for i in sql_list:f.write(i+"\n")def get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd):"""mysql 连接:param mysql_host::param mysql_port::param mysql_user::param mysql_passwd::return:"""return pymysql.connect(host=mysql_host, port=mysql_port, user=mysql_user, passwd=mysql_passwd)def get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):"""获取 mysql的元数据:param database::param table::param mysql_host::param mysql_port::param mysql_user::param mysql_passwd::return:"""connection = get_connection(mysql_host,mysql_port,mysql_user,mysql_passwd)cursor = connection.cursor()sql = "SELECT COLUMN_NAME,DATA_TYPE from information_schema.COLUMNS " \"WHERE TABLE_SCHEMA=%s AND TABLE_NAME=%s ORDER BY ORDINAL_POSITION"cursor.execute(sql, [database, table])fetchall = cursor.fetchall()cursor.close()connection.close()return fetchalldef get_mysql_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):"""获取 mysql 表 字段:param database::param table::param mysql_host::param mysql_port::param mysql_user::param mysql_passwd::return:"""return list(map(lambda x:{"name":x[0],"type":x[1]},get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)))def get_hive_columns(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd):def type_mapping(mysql_type):mappings = {"bigint": "bigint","int": "bigint","smallint": "bigint","tinyint": "bigint","decimal": "string","double": "double","float": "float","binary": "string","char": "string","varchar": "string","datetime": "string","time": "string","timestamp": "string","date": "string","text": "string"}return mappings[mysql_type]meta = get_mysql_meta(database, table,mysql_host,mysql_port,mysql_user,mysql_passwd)return list(map(lambda x: {"name": x[0], "type": type_mapping(x[1].lower())}, meta))def generate_json(list):# 判断 Hive的存储 路径 是否 填写if len(list[13])==0:list[13] ="/user/hive/warehouse/"# 判断 Hive的存储 路径 最后一个字符 为 /if list[13][-1]!="/":list[13]+="/"# 拼接 hive 文件 存储 路径 /user/hive/warehouse/stg.db/stu/path = list[13]+list[8]+".db/"+list[9]+"/"# 判断 是否 有分区 ,循环 分区if list[10]=="1":partN_list = list[11].split(",")partV_list = list[12].split(",")if len(partN_list) == len(partV_list):# hive 表若有分区,它的存储路径拼接 /user/hive/warehouse/stg.db/stu/dt=2023/time=2024for item in range(len(partN_list)):partName = partN_list[item] + "=" + partV_list[item] + "/"path=path+partNamejob = {"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": list[2],"password": list[3],"column": get_mysql_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),"splitPk": "","connection": [{"table": [list[5]],"jdbcUrl": ["jdbc:mysql://" + list[0] + ":" + str(int(list[1])) + "/" + list[4]]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://" + list[6] + ":" + str(int(list[7])),"fileType": list[15],"path": path,#"fileName": source_table,"column": get_hive_columns(list[4], list[5],list[0],int(list[1]),list[2],list[3]),"writeMode": list[14],"fieldDelimiter": "\t"}}}]}}if not os.path.exists(output_path):os.makedirs(output_path)with open(os.path.join(output_path, ".".join([list[8], list[9], "json"])), "w", encoding='UTF-8') as f:json.dump(job, f)print("数据源表:" + list[5] + " 数据抽取到目标表:" + list[9] + " [chunjun json脚本已创建在【"+output_path+"】目录下]")if __name__ == '__main__':job_list=readList_extract_Info()create_partition(job_list)for i in job_list:generate_json(i)
4.运行结果
代码运行结果:
产出脚本:
create_partition_sql.sql
stg.stu_no_part.json
notepadt++ 格式化代码
{"job": {"setting": {"speed": {"channel": 1},"errorLimit": {"record": 0,"percentage": 0.02}},"content": [{"reader": {"name": "mysqlreader","parameter": {"username": "root","password": "000000","column": [{"name": "id","type": "int"}, {"name": "name","type": "varchar"}],"splitPk": "","connection": [{"table": ["stu"],"jdbcUrl": ["jdbc:mysql://192.168.233.130:3306/test"]}]}},"writer": {"name": "hdfswriter","parameter": {"defaultFS": "hdfs://192.168.233.130:8020","fileType": "text","path": "/user/hive/warehouse/stg.db/stu_no_part/","column": [{"name": "id","type": "bigint"}, {"name": "name","type": "string"}],"writeMode": "append","fieldDelimiter": "\t"}}}]}
}