TiCDC 源码阅读(三)TiCDC 集群工作过程解析

news/2023/6/7 1:02:34

内容概要

TiCDC 是一款 TiDB 增量数据同步工具,通过拉取上游 TiKV 的数据变更日志,TiCDC 可以将数据解析为有序的行级变更数据输出到下游。

本文是 TiCDC 源码解读的第三篇,主要内容是讲述 TiCDC 集群的启动及基本工作过程,将从如下几个方面展开:

  1. TiCDC Server 启动过程,以及 Server / Capture / Owner / Processor Manager 概念和关系
  2. TiCDC Changefeed 创建过程
  3. Etcd 在 TiCDC 集群中的作用
  4. Owner 和 Processor Manager 概念介绍,以及 Owner 选举和切换过程
  5. Etcd Worker 在 TiCDC 中的作用

完整视频回顾

启动 TiCDC Server

启动一个 TiCDC Server 时,使用的命令如下,需要传入当前上游 TiDB 集群的 PD 地址。

cdc server --pd=http://127.0.0.1:2379

它会启动一个 TiCDC Server 运行实例,并且向 PD 的 ETCD Server 写入 TiCDC 相关的元数据,具体的 Key 如下:

/tidb/cdc/default/__cdc_meta__/capture/${capture_id}/tidb/cdc/default/__cdc_meta__/owner/${session_id}

第一个 Key 是 Capture Key,用于注册一个 TiCDC Server 上运行的 Capture 信息,每次启动一个 Capture 时都会写入相应的 Key 和 Value。

第二个 Key 是 Campaign Key,每个 Capture 都会注册这样一个 Key 用于竞选 Owner。第一个写入 Owner Key 的 Capture 将成为 Owner 节点。

Server 启动,经过了解析 Server 启动参数,验证参数合法性,然后创建并且运行 TiCDC Server。Server 运行的过程中,会启动多个运行线程。首先启动一个 Http Server 线程,对外提供 Http OpenAPI 访问能力。其次,会创建一系列运行在 Server 级别的资源,主要作用是辅助 Capture 线程运行。最重要的是创建并且运行 Capture 线程,它是 TiCDC Server 运行的主要功能提供者。

image.png

Capture 运行时,首先会将自己的 Capture Information 投入到 ETCD 中。然后启动两个线程,一个运行 ProcessorManager,负责所有 Processor 的管理工作。另外一个运行 campaignOwner,其内部会负责竞选 Owner,以及运行 Owner 职责。如下所示,TiCDC Server 启动之后,会创建一个 Capture 线程,而 Capture 在运行过程中又会创建 ProcessorManager 和 Owner 两个线程,各自负责不同的工作任务。

image (1).png

创建 TiCDC Changefeed

创建 changefeed 时使用的命令如下:

cdc changefeed create --server=http://127.0.0.1:8300 --sink-uri="blackhole://" --changefeed-id="blackhole-test"

其中的 server 参数标识了一个运行中的 TiCDC 节点,它记录了启动时候的 PD 地址。在创建 changefeed 时,server 会访问该 PD 内的 ETCD Server,写入一个 Changefeed 的元数据信息。

/tidb/cdc/default/default/changefeed/info/${changefeed_id}/tidb/cdc/default/default/changefeed/status/${changefeed_id}
  • 第一个 Key 标识了一个 Changefeed,包括该 Changefeed 的各种静态元数据信息,比如 changefeed-idsink-uri,以及一些其他标识运行时是为的数据。
  • 第二个 Key 标识了该 Changefeed 的运行时进度,主要是记录了 CheckpointResolvedTs 的推进情况,会不断地周期性地更新。

Etcd 的作用

image (2).png

ETCD 在整个 TiCDC 集群中承担了非常重要的元数据存储功能,它记录了 Capture 和 Changefeed 等重要信息。同时通过不断记录并且更新 Changefeed 的 Checkpoint 和 ResolvedTs,保证 Changefeed 能够稳步向前推进工作。从上图中我们可以知道,Capture 在启动的时候,自行将自己的元数据信息写入到 ETCD 中,在此之后,Changefeed 的创建,暂停,删除等操作,都是经由已经启动的 TiCDC Owner 来执行的,后者负责更新 ETCD。

Owner 选举和切换

一个 TiCDC 集群中可以存在着多个 TiCDC 节点,每个节点上都运行着一个 campaignOwner 线程,负责竞选 Owner,如果竞选成功,则履行 Owner 的工作职责。集群中只有一个节点会竞选成功,然后执行 Owner 的工作逻辑,其他节点上的该线程会阻塞在竞选 Owner 上。

image (3).png

TiCDC Owner 的选举过程是基于 ETCD Election 实现的。每个 Capture 在启动之后,会创建 ETCD Session,然后使用该 Session,调用 NewElection 方法,创建到 Owner Key /tidb/cdc/${ClusterID}/__cdc_meta/owner 的竞选,然后调用 Election.Campaign 开始竞选。基本的相关代码过程如下:

sess, err := concurrency.NewSession(etcdClient, ttl) // ttl is set to 10s
if err != nil {return err
}election := concurrency.NewElection(sess, key) // key is `/tidb/cdc/${ClusterID}/__cdc_meta/owner`if err := election.Campaign(ctx); err != nil {return err
}...

感兴趣的读者 ,可以通过 Capture.Run 方法作为入口,浏览这部分代码流程,加深对该过程的理解。在真实的集群运行过程中,多个 TiCDC 节点先后上线,在不同的时刻开始竞选 Owner,第一个向 ETCD 中写入 Owner Key 的实例将成为 Owner。如下图所示,TiCDC-0 在 t=1 时刻写入 Owner Key,将会成为 Owner,它在后续运行过程中如果遇到故障辞去了自己的 Ownership,那么 TiCDC-1 将会成为新的 Owner 节点。老旧的 Owner 节点重新上线,调用 Election.Campaign 方法重新竞选 Owner,循环往复。

image (4).png

EtcdWorker 模块

EtcdWorker 是 TiCDC 内部一个非常重要的模块,它主要负责从 ETCD 中读取数据,映射到 TiCDC 内存中,然后驱动 Owner 和 ProcessorManager 运行。在具体的实现中,EtcdWorker 通过调用 ETCD Watch 接口,周期性地获取到所有和 TiCDC 相关的 Key 的变化情况,然后映射到其自身维护的 GlobalReactorState 结构体中,其定义如下所示,其中记录了 Capture,Changefeed,Owner 等信息。

type GlobalReactorState struct {ClusterID      stringOwner          map[string]struct{}Captures       map[model.CaptureID]*model.CaptureInfoUpstreams      map[model.UpstreamID]*model.UpstreamInfoChangefeeds    map[model.ChangeFeedID]*ChangefeedReactorState....
}

Owner 和 ProcessorManager 都是一个 Reactor 接口的实现,二者都借助 GlobalReactorState 提供的信息来推进工作进度。具体地,Owner 通过轮询每一个记录在 GlobalReactorState 中的 Changefeed,让每一个 Changefeed 都能够被稳步推进同步状态。同时也负责诸如 Pause / Resume / Remove 等和 Changefeed 的运行状态相关的工作。ProcessorManager 则轮询每一个 Processor,让它们能够及时更新自身的运行状态。

总结

以上就是本文的全部内容。希望读者能够理解如下几个问题:

  • TiCDC Server 启动,创建 Changefeed 和 ETCD 的交互过程。
  • EtcdWorker 如何读取 ETCD 数据并且驱动 Owner 和 Processor Manager 运行。
  • TiCDC Owner 的竞选和切换过程。

下一次我们将向大家介绍 TiCDC Changefeed 内部的 Scheduler 模块的工作原理。

本文来自互联网用户投稿,该文观点仅代表作者本人,不代表本站立场。本站仅提供信息存储空间服务,不拥有所有权,不承担相关法律责任。如若转载,请注明出处:http://www.exyb.cn/news/show-4555744.html

如若内容造成侵权/违法违规/事实不符,请联系郑州代理记账网进行投诉反馈,一经查实,立即删除!

相关文章

android测试环境与条件,测试环境与实际运行环境之间可能存在的差异有哪些

测试环境与实际运行环境之间可能存在的差异有哪些以下文字资料是由(历史新知网www.lishixinzhi.com)小编为大家搜集整理后发布的内容,让我们赶快一起来看一下吧!测试环境与实际运行环境之间可能存在的差异有哪些这属于测试风险的一部分。 最好的情况就是…

全网最全tcpdump和Wireshark抓包实践

tcpdump和Wireshark抓包实践1、wireshark基本使用1.1、如何捕获报文1.2、Wireshark 面板1.3、快捷方式工具栏1.5、设定时间显示格式1.6、数据包列表面板的标记符号1.7、文件操作1.9、抓包文件常见几种类型1.10、解析https报文,TSL报文1.11、wireshark Expert Inform…

C++游戏编程--图像界面+AI堵棋--实现简易三字棋小游戏

程序名称&#xff1a;井字棋 编译环境&#xff1a;Mictosoft Visual Studio 2019, EasyX_2021 作  者&#xff1a;代码骑士<1696297834qq.com> 最后修改&#xff1a;2021-11-19 目录 游戏示例&#xff1a; 第一步&#xff1a;二维数组定义棋盘。 第二步&#xff…

MSR 5660设备进行流量整形和带宽保证的实现案例

组网及说明现场客户使用MSR5660设备替换客户处原有华为的设备&#xff0c;之前华为的设备做了GTS流量整形和WFQ的带宽保证&#xff0c;需求见下面图片描述配置步骤一、针对G1/0/1口入方向进行本地优先级的映射&#xff1a;&#xff08;1&#xff09;对进入设备G1/0/1口的流量进…

ML之xgboost:利用xgboost算法(sklearn+GridSearchCV)训练mushroom蘑菇数据集(22+1,6513+1611)来预测蘑菇是否毒性(二分类预测)

ML之xgboost&#xff1a;利用xgboost算法(sklearnGridSearchCV)训练mushroom蘑菇数据集(221,65131611)来预测蘑菇是否毒性(二分类预测) 目录 输出结果 设计思路 核心代码 更多输出 输出结果 正在更新…… 设计思路 正在更新…… 核心代码 from sklearn.grid_search impo…

web3:同态加密

web3相关学习一并收录至该博客&#xff1a;web3学习博客目录大全 目录同态加密概念同态加密具体如何定义&#xff1f;主流同态加密算法原理乘法同态加密算法① RSA算法一些基本的数学知识RSA的具体过程秘钥的产生加密解密验证了 RSA 算法的乘法同态性java代码简单实现python代码…

GPIO 应用

应用层如何控制 GPIO&#xff0c; 譬如控制 GPIO 输出高电平、或输出低电平。应用层如何操控 GPIO与 LED 设备一样&#xff0c; GPIO 同样也是通过 sysfs 方式进行操控&#xff0c;进入到/sys/class/gpio 目录下。可以看到该目录下包含两个文件 export、 unexport 以及 5 个 gp…

8. 【Redisson源码】分布式信号量RSemaphore

目录 一、RSemaphore的使用 二、RSemaphore设置许可数量 三、RSemaphore的加锁流程 四、RSemaphore的解锁流程 【本篇文章基于redisson-3.17.6版本源码进行分析】 基于Redis的Redisson的分布式信号量RSemaphore采用了与java.util.concurrent.Semaphore相似的接口和用法。 …