Flink CDC 最佳实践(以 MySQL 为例)

chatgpt/2023/10/4 7:03:45

1. 准备工作

1.1 确认 MySQL binlog 模式

确认 MySQL 数据库的 binlog 模式是否为 ROW。可以在 MySQL 命令行中执行以下语句确认:

SHOW GLOBAL VARIABLES LIKE 'binlog_format';

如果返回结果中的 Value 字段为 ROW,则说明 binlog 模式为 ROW

1.2 下载并安装 Flink

下载并安装 Flink,可以参考官方文档进行安装。

2. 配置 Flink CDC

2.1 配置 MySQL 数据库连接信息

在 Flink 的配置文件 flink-conf.yaml 中添加 MySQL 数据库连接信息,例如:

# MySQL connection configuration
mysql.server-id: 12345
mysql.hostname: localhost
mysql.port: 3306
mysql.username: root
mysql.password: 123456
mysql.database-name: test

2.2 配置 CDC Job

在 Flink 的 CDC Job 配置文件 mysql-cdc.properties 中添加以下配置:

# Flink CDC Job Configuration
name: mysql-cdc-job
flink.parallelism: 1
flink.checkpoint.interval: 60000
flink.checkpoint.mode: EXACTLY_ONCE# MySQL CDC Source Configuration
debezium.transforms: unwrap
debezium.transforms.unwrap.type: io.debezium.transforms.ExtractNewRecordState
database.hostname: localhost
database.port: 3306
database.user: root
database.password: 123456
database.history.kafka.bootstrap.servers: localhost:9092
database.history.kafka.topic: mysql-cdc-history
database.server.id: 12345
database.server.name: test
database.whitelist: test.user

其中,name 为 CDC Job 的名称,flink.parallelism 为 Flink 的并行度,flink.checkpoint.interval 为 Flink 的 Checkpoint 时间间隔,flink.checkpoint.mode 为 Checkpoint 模式,此处设置为 EXACTLY_ONCE

debezium.transforms 为 Debezium 转换器的名称,此处设置为 unwrapdatabase.hostnamedatabase.portdatabase.userdatabase.password 分别为 MySQL 数据库的连接信息。database.history.kafka.bootstrap.servers 为 Kafka 的地址信息,database.history.kafka.topic 为 CDC 历史数据记录的 Kafka Topic。database.server.id 为 MySQL 的 Server ID,database.server.name 为 CDC Source 的名称,database.whitelist 为需要进行同步的 MySQL 表的名称。

步骤一:创建 MySQL 数据库

首先,需要在本地或云端创建 MySQL 数据库,并添加一个具有读写权限的用户。下面是一个创建名为 test_db 的数据库以及名为 flink_cdc_user 的用户的示例 SQL 代码:

CREATE DATABASE test_db;CREATE USER 'flink_cdc_user'@'%' IDENTIFIED BY 'password';GRANT ALL PRIVILEGES ON test_db.* TO 'flink_cdc_user'@'%';

步骤二:启动 Flink 集群

启动一个 Flink 集群以便运行 CDC 应用程序。可以使用 Flink 自带的 bin/start-cluster.sh 脚本启动 Flink 集群。确保 Flink 集群在运行时已经包含了 Kafka 和 MySQL 的依赖项。

步骤三:创建 MySQL 表和 CDC 表

在 MySQL 中,首先需要创建需要进行 CDC 的表和 CDC 表。CDC 表是一个系统表,它存储了需要捕获的更改数据。可以通过以下代码创建一个名为 test_table 的表以及与之关联的 CDC 表

CREATE TABLE test_db.test_table (id INT PRIMARY KEY,name VARCHAR(30),age INT,email VARCHAR(50)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;CREATE TABLE test_db.test_table_cdc (`database` VARCHAR(100),`table` VARCHAR(100),`type` VARCHAR(10),`ts` TIMESTAMP(3) NOT NULL DEFAULT CURRENT_TIMESTAMP(3) ON UPDATE CURRENT_TIMESTAMP(3),`before` JSON,`after` JSON
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_0900_ai_ci;

步骤四:编写 Flink CDC 应用程序

接下来,需要编写一个 Flink CDC 应用程序,以将 MySQL 表更改推送到 Kafka 主题中。可以使用 Flink 的 flink-connector-jdbc 库和 flink-connector-kafka 库来实现此目的。

以下是一个基本的 Flink CDC 应用程序的代码示例:

public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();env.setParallelism(1);Properties properties = new Properties();properties.setProperty("bootstrap.servers", "localhost:9092");properties.setProperty("group.id", "test-group");JdbcSource<RowData> source = JdbcSource.<RowData>builder().setDrivername("com.mysql.jdbc.Driver").setDBUrl("jdbc:mysql://localhost:3306/test_db").setUsername("flink_cdc_user").setPassword("password").setQuery("SELECT id, name, age, email FROM test_table").setRowTypeInfo(Types.ROW(Types.INT, Types.STRING, Types.INT, Types.STRING)).setFetchSize(1000).build();DataStream<RowData> stream = env.addSource(source);

以下是一个简单的示例运行及结果:

$ bin/flink run -c com.example.MyCDCJob ./my-cdc-job.jar --database.server=mysql.example.com --database.port=3306 --database.name=mydb --database.username=myuser --database.password=mypassword --table.name=mytable --debezium.plugin.name=mysql --debezium.plugin.property.version=1.3.1.Final
[INFO] Starting CDC process for table: mytable.
[INFO] Initializing CDC source...
[INFO] CDC source successfully initialized.
[INFO] Starting CDC source...
[INFO] CDC source successfully started.
[INFO] Adding CDC source to Flink job topology...
[INFO] CDC source successfully added to Flink job topology.
[INFO] Starting Flink job...
[INFO] Flink job started successfully.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 25}.
[INFO] Record key: {"id": 2}, record value: {"id": 2, "name": "Bob", "age": 30}.
[INFO] Record key: {"id": 3}, record value: {"id": 3, "name": "Charlie", "age": 35}.
[INFO] Change data for table: mytable.
[INFO] Record key: {"id": 1}, record value: {"id": 1, "name": "Alice", "age": 27}.

可以看到,当有数据变更时,Flink CDC Job 会输出变更的表名、记录的主键以及变更的数据。例如,在这个示例中,有一行记录的年龄字段从25变成了27。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-5313499.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

小研究 - JVM 垃圾回收方式性能研究(二)

本文从几种JVM垃圾回收方式及原理出发&#xff0c;研究了在 SPEC jbb2015基准测试中不同垃圾回收方式对于JVM 性能的影响&#xff0c;并通过最终测试数据对比&#xff0c;给出了不同应用场景下如何选择垃圾回收策略的方法。 目录 3 几种垃圾回收器 3.1 串行回收器 3.2 并行回…

机器学习笔记之优化算法(五)线搜索方法(步长角度;非精确搜索;Armijo Condition)

机器学习笔记之优化算法——线搜索方法[步长角度&#xff0c;非精确搜索&#xff0c;Armijo Condition] 引言回顾&#xff1a;关于 f ( x k 1 ) ϕ ( α ) f(x_{k1}) \phi(\alpha) f(xk1​)ϕ(α)的一些特性非精确搜索近似求解最优步长的条件 Armijo Condition \text{Armijo…

线程池 LinkedBlockingQueue、ArrayBlockingQueue、SynchronousQueue 的区别是什么 分别有什么优缺点

LinkedBlockingQueue、ArrayBlockingQueue 和 SynchronousQueue 都是 Java 中常用的阻塞队列实现&#xff0c;在线程池等多线程场景中经常用于保存等待执行的任务。它们之间的区别和各自的优缺点如下&#xff1a; LinkedBlockingQueue: 是一个基于链表的阻塞队列&#xff0c;…

内网隧道代理技术(十五)之 Earthworm的使用(二级代理)

Earthworm的使用(二级代理) 本文紧接着上一篇文章继续讲解Earthworm工具的使用 (二级代理)正向连接 二级正向代理发生在如下的情况: 1、Web服务器在公网,黑客可以直接访问 2、B机器在内网,黑客不能直接访问 3、Web服务器可以访问内网机器B 4、内网机器B可以访问公司…

基于峰谷分时电价引导下的电动汽车充电负荷优化(matlab代码)

目录 1 主要内容 峰谷电价优化 电动汽车充电负荷变化 2 部分代码 3 程序结果 1 主要内容 该程序基本复现《基于峰谷分时电价引导下的电动汽车充电负荷优化》&#xff0c;代码主要做的是基于NSGA-II的电动汽车充电负荷优化&#xff0c;首先&#xff0c;在研究电动汽车用户充…

2024考研408-计算机网络 第二章-物理层学习笔记

文章目录 前言一、通信基础1.1、物理层基本概念1.1.1、认识物理层1.1.2、认识物理层的四种接口特性 1.2、数据通信基础知识1.2.1、典型的数据通信模型及相关术语1.2.2、数据通信相关术语1.2.3、设计数据通信系统要考虑的三个问题&#xff1a;问题1&#xff1a;采用单工通信/半双…

Ubuntu 22.04 安装nginx1.24.0

安装编译Nginx所需的依赖项&#xff1a; sudo apt update sudo apt install libgd-dev libpcre3 libpcre3-dev build-essential zlib1g-dev libssl-dev -y 下载Nginx 1.24.0源代码包&#xff1a; wget http://nginx.org/download/nginx-1.24.0.tar.gz解压源代码包&#xff1a…

数学建模学习(8):单目标和多目标规划

优化问题描述 优化 优化算法是指在满足一定条件下,在众多方案中或者参数中最优方案,或者参数值,以使得某个或者多个功能指标达到最优,或使得系统的某些性能指标达到最大值或者最小值 线性规划 线性规划是指目标函数和约束都是线性的情况 [x,fval]linprog(f,A,b,Aeq,Beq,LB,U…
推荐文章