摘要:本文整理自大健云仓基础架构负责人、Flink CDC Maintainer 龚中强在 5 月 21 日 Flink CDC Meetup 的演讲。主要内容包括:
- 引入 Flink CDC 的背景
- 现今内部落地的业务场景
- 未来内部推广及平台化建设
- 社区合作
一、引入 Flink CDC 的背景
公司引入 CDC 技术,主要基于以下四个角色的需求:
- 物流科学家:需要库存、销售订单、物流账单等数据用于做分析。
- 开发:需要同步其他业务系统的基本信息。
- 财务:希望财务数据能够实时传送到财务系统,而不是月结前才能看到。
- 老板:需要数据大屏,通过大屏查看公司的业务和运营情况。
CDC 是数据捕获变更的技术。广义上来说,但凡能够捕获数据变更的技术,都能被称为 CDC。但通常我们说的 CDC 技术主要面向数据库的变更。
CDC 的实现方式主要有两种,分别是基于查询和基于日志:
- 基于查询:查询后插入、更新到数据库即可,无须数据库的特殊配置以及账号权限。它的实时性基于查询频率决定,只能通过提高查询频率来保证实时性,而这必然会对 DB 造成巨大压力。此外,因为是基于查询,所以它无法捕获两次查询之间数据的变更记录,也就无法保证数据的一致性。
- 基于日志:通过实时消费数据的变更日志实现,因此实时性很高。而且不会对 DB 造成很大的影响,也能够保证数据的一致性,因为数据库会将所有数据的变动记录在变更日志中。通过对日志的消费,即可明确知道数据的变化过程。它的缺点是实现相对复杂,因为不同数据库的变动日志实现不一样,格式、开启方式以及特殊权限都不一样,需要针对每一种数据库做相应的适配开发。
正如 Flink 的宣言 “实时即未来”,在如今的大背景下,实时性是亟待解决的重要问题。因此,我们将主流 CDC 基于日志的技术做了对比,如上图所示:
- 数据源:Flink CDC 除了对传统的关系型数据库做到了很好的支持外,对文档型、NewSQL(TiDB、Oceanbase) 等当下流行的数据库都能够支持;Debezium 对数据库的支持相对没有那么广泛,但是对主流的关系型数据库都做到了很好的支撑;Canal 和 OGG 只支持单一的数据源。
- 断点续传:四种技术都能够支持。
- 同步模式:除了 Canal 只支持增量,其他技术均支持全量 + 增量的方式。而全量 + 增量的方式意味着第一次上线时全量到增量的切换过程全部可以通过 CDC 技术实现,无须人为地通过全量的任务加上增量的 job 去实现全量 + 增量数据的读取。
- 活跃度:Flink CDC 拥有非常活跃的社区,资料丰富,官方也提供了详尽的教程以及快速上手教程;Debezium 社区也相当活跃,但资料大多是英文的;Canal 的用户基数特别大,资料也相对较多,但社区活跃度一般;OGG 是 Oracle 的大数据套件,需要付费,只有官方资料。
- 开发难度:Flink CDC 依靠 Flink SQL 和 Flink DataStream 两种开发模式,尤其是 Flink SQL,通过非常简单的 SQL 即可完成数据同步任务的开发,开发上手尤为简单;Debezium 需要自己解析采集到的数据变更日志进行单独处理,Canal 亦是如此。
- 运行环境依赖:Flink CDC 是以 Flink 作为引擎,Debezium通常是将 Kafka connector 作为运行容器;而 Canal 和 OGG 都是单独运行。
- 下游丰富程度:Flink CDC 依靠 Flink 非常活跃的周边以及丰富的生态,能够打通丰富的下游,对普通的关系型数据库以及大数据存储引擎 Iceberg、ClickHouse、Hudi 等都做了很好的支持;Debezium 有 Kafka JDBC connector, 支持 MySQL 、Oracle 、SqlServer;Canal 只能直接消费数据或将其输出到 MQ 中进行下游的消费;OGG 因为是官方套件,下游丰富程度不佳。
二、现今内部落地的业务场景
- 2018 年之前,大健云仓数据同步的方式为:通过多数据应用定时同步系统之间的数据。
- 2020 年之后,随着跨境业务的飞速发展,多数据源应用经常打满 DB 影响在线应用,同时定时任务的执行顺序管理混乱。
- 因此, 2021 年我们开始调研选型 CDC 技术,搭建了小型试验场景,进行小规模的试验。
- 2022 年,上线了基于 Flink CDC 实现的 LDSS 系统库存场景同步功能。
- 未来,我们希望依托 Flink CDC 打造数据同步平台,通过界面的开发和配置完成同步任务的开发、测试和上线,能够全程在线管理同步任务的整个生命周期。
LDSS 库存管理的业务场景主要有以下四种:
- 仓储部门:要求仓库的库存容量和商品品类分布合理,库存容量方面,需要留一些 buffer 以防突如其来的入库单导致爆仓;商品品类方面,季节性的商品库存分配不合理导致热点问题,这必将给仓库的管理带来巨大挑战。
- 平台客户:希望订单处理及时,货物能够快速、精准地交到客户手上。
- 物流部门:希望能够提升物流效率,降低物流成本,高效利用有限的运力。
- 决策部门:希望 LDSS 系统能够对在何时何地新建仓库提供科学的建议。
上图为 LDSS 库存管理分单场景架构图。
首先,通过多数据源同步的应用向下拉取仓储系统、平台系统以及内部 ERP 系统数据,将所需数据抽取到 LDSS 系统的数据库中,以支撑 LDSS 系统订单、库存、物流三大模块的业务功能。
其次,需要产品信息、订单信息以及仓库信息才能进行有效的分单决策。多数据源定时同步任务基于 JDBC 查询,通过时间做筛选,同步变更的数据到 LDSS 系统中。LDSS 系统基于这些数据做分单决策,以获得最优解。
定时任务同步的代码,首先需要定义定时任务、定义定时任务的类、执行方法以及执行间隔。
上图左侧为定时任务的定义,右侧是定时任务的逻辑开发。首先,打开 Oracle 数据库进行查询,然后 upsert 到 MySQL 数据库,即完成了定时任务的开发。此处以接近原生 JDBC 的查询方式,将数据依次塞到对应的数据库表中,开发逻辑十分繁琐,也容易出现 bug。
因此,我们基于 Flink CDC 对其进行了改造。
上图为基于 Flink CDC 实现的实时同步场景,唯一的变化是将此前的多数据源同步应用程序换成了 Flink CDC 。
首先,通过 SqlServer CDC、MySQL CDC、Oracle CDC 分别连接抽取对应仓储平台、 ERP 系统数据库的表数据,然后通过 Flink 提供的 JDBC connector 写入到 LDSS 系统的 MySQL 数据库中。能够通过 SqlServer CDC、MySQL CDC、Oracle CDC 将异构数据源转化为统一的 Flink 内部类型,再往下游写。
此架构相比于之前的架构,对业务系统没有侵入性,而且实现较为简单。
我们引入了 MySQL CDC 和 SqlServer CDC 分别连接 B2B 平台的 MySQL 数据库以及仓储系统的 SqlServer 数据库,然后将抽取到的数据通过 JDBC Connector 写入到 LDSS 系统的 MySQL 数据库。
通过以上改造,得益于 Flink CDC 赋予其实时的能力,不需要管理繁杂的定时任务。
基于 Flink CDC 同步代码的实现分为以下三步:
- 第一步,定义源表 —— 需要同步的表;
- 第二步,定义目标表 —— 需要写入数据的目标表;
- 第三步,通过 insert select 语句,即可完成 CDC 同步任务的开发。
上述开发模式非常简单,逻辑清晰。此外,依托 Flink CDC 的同步任务和 Flink 架构,还获得了失败重试、分布式、高可用、全量增量一致性切换等特性。
三、未来内部推广及平台化建设
上图为平台架构图。
左侧 source 是由 Flink CDC + Flink 提供的源端,能够通过丰富的源端抽取数据,通过数据平台上的开发写入到目标端。目标端又依托于 Flink 的强大生态,能够很好地支撑数据湖、关系型数据库、MQ 等。
Flink 目前有两种运行方式,一种是国内比较流行的 Flink on Yarn,另一种是 Flink on Kubernets。中间部分的数据平台向下管理 Flink 集群,以向上支撑 SQL 在线开发、任务开发、血缘管理、任务提交、在线 Notebook 开发、权限和配置以及对任务性能的监控和告警,同时也能够对数据源做到很好的管理。
数据同步的需求在公司内部特别旺盛,需要通过平台来提高开发效率,加快交付速度。而且平台化之后,可以统一公司内部的数据同步技术,收拢同步技术栈,减少维护成本。
平台化的目标如下:
- 能够很好地管理数据源、表等元信息;
- 任务的整个生命周期都可以在平台上完成;
- 实现任务的性能观测以及告警;
- 简化开发,快速上手,业务开发人员经过简单培训即可上手开发同步任务。
平台化能带来以下三个方面的收益:
- 收拢数据同步任务,统一来管理;
- 平台管理维护同步任务的全生命周期;
- 专门的团队负责,团队能够专注前沿的数据集成技术。
有了平台之后,即可快速落地应用更多的业务场景。
- 实时数仓:希望通过 Flink CDC 以支持更多实时数仓的业务场景,借助 Flink 强大的计算能力做一些数据库的物化视图。将计算从 DB 里解脱出来,通过 Flink 的外部计算再重新写回数据库,以加速平台应用的报表、统计、分析等实时应用场景。
- 实时应用:Flink CDC 能够从 DB 层捕获变更,因此可以通过 Flink CDC 实时更新搜索引擎中的内容,实时向财务系统推送财务和核算数据。因为大部分财务系统的数据都需要业务系统通过跑定时任务以及经过大量关联、聚合、分组等操作才能计算出来,再推送到财务系统中。而借助 Flink CDC 强大的数据捕获能力,再加上 Flink 的计算能力,将这些数据实时地推送到核算系统和财务系统,就能够及时发现业务的问题,减少公司的损失。
- 缓存:通过 Flink CDC,能够构建一个脱离于传统的应用之外的实时缓存,对于在线应用的性能有极大的提升。
有了平台的助力,相信 Flink CDC 能够在公司内部更好地释放它的能力。
上图展示了 SqlServer CDC 的原理。
社区同学使用了当前版本的 SqlServer CDC 后,主要反馈的问题有以下三个:
- 快照过程中锁表:锁表操作对于 DBA 和在线应用都是不可忍受的, DBA 无法接受数据库被夯住,同时也会影响在线应用。
- 快照过程中不能 checkpoint:不能 checkpoint 就意味着快照过程中一旦失败,只能重新开始跑快照过程,这对于大表非常不友好。
- 快照过程只支持单并发:千万级、上亿级的大表,在单并发的情况下需要同步十几甚至几十个小时,极大束缚了 SqlServer CDC 的应用场景。
我们针对上述问题做了实践和改进,参考社区 2.0 版本 MySQL CDC 并发无锁算法的思想,对 SqlServer CDC 进行了优化,最终实现了快照过程中无锁,实现一致性快照;快照过程中支持 checkpoint ;快照过程中支持并发,加速快照过程。在大表同步的情况下,并发优势尤为明显。
但是由于 2.2 版本社区将 MySQL 的并发无锁思想抽象成了统一公共的框架,SqlServer CDC 需要重新适配这套通用框架后才能贡献给社区。
提问&解答
Q1需要开启 SqlServer 自己的 CDC 吗?
是的,SqlServer CDC 的功能就是基于 SqlServer 数据库自己的 CDC 特性实现的。
Q2物化视图通过什么方式去刷新定时任务触发器?
通过 Flink CDC 将需要生成物化视图的 SQL 放在 Flink 里运行,通过原表的变动触发计算,然后同步到物化视图表里。
Q3平台化是怎么做的?
平台化参考了社区众多的开源项目以及优秀的开源平台,比如 StreamX、Dlink 等优秀的开源项目。
Q4SqlServer CDC 在消费 transaction log 时有瓶颈吗?
SqlServer 并没有直接消费 log,其原理是 SqlServer capture process 去匹配 log 内哪些表开启了 CDC ,然后将这些表从日志里捞到开启 CDC 表的变更数据,再转插到 change table 里,最后通过开启 CDC 之后数据库生成的 CDC query function 获取到数据的变更。
Q5Flink CDC 高可用如何保障同步任务过多或密集处理方案?
Flink 的高可用依赖于 Flink 特性比如 checkpoint 等来保证。同步任务过多或处理方案密集的情况,建议使用多套 Flink 下游集群,然后根据同步的实时性区分对待,将任务发布到相应的集群中。
Q6中间需要 Kafka 吗?
取决于同步任务或数仓架构是否需要将中间数据做 Kafka 落地。
Q7一个数据库中有多张表,可以放到一个任务里运行吗?
取决于开发方式。如果是 SQL 的开发方式,要实现一次性写多表只能通过多个任务。但 Flink CDC 提供了另外一种比较高阶的开发方式 DataStream ,可以将多表放到一个任务里运行。
Q8Flink CDC 支持读取 Oracle 从库的日志吗?
目前还无法实现。
Q9通过 CDC 同步后两个端的数据质量如何监控,如何比对?
目前只能通过定时抽样来做数据质量的检查,数据质量问题一直是业内比较棘手的问题。
Q10大健云仓用的什么调度系统?系统如何与 Flink CDC 集合?
使用 XXL Job 作为分布式的任务调度,CDC 没有用到定时任务。
Q11如果采集增删表,SqlServer CDC 需要重启吗?
SqlServer CDC 目前不支持动态加表的功能。
Q12同步任务会影响系统性能吗?
基于 CDC 做同步任务肯定会影响系统性能,尤其是快照过程对数据库会有影响,进而影响应用系统。社区将来会做限流、对所有 connector 做并发无锁的实现,都是为了扩大 CDC 的应用场景以及易用性。
Q13全量和增量的 savepoint 怎么处理?
(未通过并发无锁框架实现的连接器)全量过程中不可以触发 savepoint,增量过程中如果需要停机发布,可通过 savepoint 恢复任务。
Q14CDC 同步数据到 Kafka ,而 Kafka 里面存的是 Binlog ,如何保存历史数据和实时数据?
将 CDC 同步的数据全部 Sync 到 Kafka,保留的数据取决于 Kafka log 的清理策略,可以全部保留。
Q15CDC 会对 Binlog 的日志操作类型进行过滤吗?会影响效率吗?
即使有过滤操作,对性能影响也不大。
Q16CDC 读 MySQL 初始化快照阶段,多个程序读不同的表会有程序报错无法获取锁表的权限,这是什么原因?
建议先查看 MySQL CDC 是不是使用老的方式实现,可以尝试新版本的并发无锁实现。
Q17MySQL 上亿大表全量和增量如何衔接?
建议阅读雪尽老师在 2.0 的相关博客,非常简单清晰地介绍了并发无锁如何实现一致性快照,完成全量和增量的切换。