美团基于 Flink 的实时数仓平台建设新进展

​摘要:本文整理自美团实时数仓平台负责人姚冬阳在 Flink Forward Asia 2021 实时数仓专场的演讲。主要内容包括:

  1. 平台建设现状
  2. 遇到的问题及解决
  3. 未来规划

01平台建设现状

美团于 2018 年首次引入 Flink 实时计算引擎,当时的实时数仓概念还不太普及,平台只提供了 Flink Jar 任务的生命周期管理和监控报警。

2019 年,我们注意到实时计算的主要应用场景是解决离线数仓时效性低的问题。离线数仓已经比较成熟,通过 SQL 方式开发很简单,而数仓的实时部分主要通过 Flink DataStream API 来开发,门槛比较高,而且与离线数仓的开发方式相比较为割裂。因此,我们开始调研实时数仓的解决方案,目标是降低开发门槛,并尝试推广 FlinkSQL,最终将美团的实时数仓平台取名为 NAU。

2020 年,美团实时数仓平台正式上线。它向业务提供 FlinkSQL 作业开发入口,主要负责两个方面的工作:

  • 首先,将实时数仓常见的数据源与离线表概念对齐,用数据模型进行管理;
  • 其次,提供 FlinkSQL 开发配套的效率工具,比如校验和调试功能。

但是在实际推广过程中,我们发现业务在 FlinkSQL 的运维方面门槛依然比较高,因此,我们将接下来的工作重点转向了运维中心。

FlinkSQL 作业运维的痛点主要集中在两个方面:有状态 SQL 作业部署的断流问题和 SQL 作业的异常定位问题。为此,我们通过 Checkpoint 持久化和状态生成的异步化来解决第一个问题,并通过提供作业的自动诊断来解决第二个问题。目前,整个实时数仓的平台化建设已经初步完备,未来我们会在开发和运维能力上不断精细化,并且继续推动公司业务数仓架构的进化,比如流批生产的一体化、生产服务的一体化。

实时数仓目前已基本覆盖了公司的全部业务,为 100 多个业务团队提供了支持,比如美团优选、美团买菜、金融、骑行等业务。托管了 7000 多个实时数据模型,主要为 Kafka 表和 KV 表模型。线上运行 FlinkSQL 作业 4000+,新增的实时 SQL 作业占比已经达到 70% 以上。从数据上看,FlinkSQL 已经可以解决美团实时数仓大部分流处理的问题。

接下来以美团业务中的两个实时数仓生产链路为例,具体分享 FlinkSQL 的实际应用。

应用场景 1 是基于 FlinkSQL + OLAP 的实时生产链路。这个业务链路的实时数据源有两个,分别是业务 DB 的变更事件和业务服务的日志事件,这些事件首先会被收集到 Kafka 中,然后 DB 事件会按表名分发到新的 Kafka 中,DB 和日志的数据也会在这一层进行格式上的统一并完成实时数仓的 ODS 层。然后业务会使用 FlinkSQL 来清洗和关联 ODS 层的数据,生成实时数仓的主题宽表,最后写入 OLAP 查询引擎做实时分析。对于时效性要求不高的场景,部分业务还会在 OLAP 引擎上配置分钟级别的调度来减少相同查询的压力。

应用场景 2 与场景 1 的不同点在于,业务实时数仓的主题宽表数据并不是直接写入 OLAP 查询引擎,而是继续写入 Kafka,使用 FlinkSQL 做 APP 层的指标聚合,最终把预计算的指标数据写入 OLAP、DB 或 KV 这类应用层的存储。这种方式更适合对接数据服务,因为它兼顾了数据的时效性和高 QPS 的查询。

上图是实时数仓平台的架构,分为集成、开发、运维、治理、安全 5 个模块分别建设。

集成模块主要关注的是数据模型的管理,具体包括 Kafka 和 KV 两种模型管理,管理的内容有数据源的 schema 信息和连接信息等。

开发模块主要关注的是 FlinkSQL 转化业务需求,比如提供版本管理来记录业务需求的迭代过程,提供 FlinkSQL 的校验和调试,来确保开发的 SQL 正确表达了业务逻辑,支持业务使用自定义的 Flink UDF 函数和自定义的 Format 解析,让 FlinkSQL 可以扩展满足更多业务需求场景。

运维模块关注的是 SQL 作业的部署和运行时的监控。在监控方面,我们提供了 SQL 作业的监控报警、异常日志和作业诊断,能够帮助业务快速发现和定位作业的异常;部署方面,我们提供 SQL 作业的快照管理、AB 部署和参数调优,来帮助业务解决 SQL 作业变更时的问题。

治理模块关注的是实时数仓的数据质量、资源成本,通过建设实时数仓的 DQC 监控,帮助业务发现上游数据或产出数据的异常值/异常波动;通过链路血缘和资源计费,让业务可以量化实时数仓的生产成本,方便进行成本治理。

安全模块主要关注的是对数据流向的管控,提供数据源读写权限的管理和受限域机制,保证公司业务数据的安全性。

02遇到的问题及解决

在实际推广 FlinkSQL 的过程中,我们也面临了不少挑战。

2.1 双流关联大状态问题

首先是双流关联的大状态问题,FlinkSQL 的双流关联会保留左右流的历史数据来互相关联,需要关联的时间间隔越长,保存的历史数据就会越多,状态也就会越大。比如,要关联订单的下单事件和退款事件,并保证计算结果的正确性,需要考虑这两个事件发生的间隔,可能是一个月甚至更久。

上图左侧是一个双流关联的有状态 SQL 作业,图中的 Mem 和 Disk 组成了 SQL 作业的 TaskManager 节点,SQL 作业状态后端使用 RocksDB,状态持久化在 HDFS 文件系统上。一开始我们尝试把 SQL 作业的状态设置为保留一个月,但 SQL 作业会变得不稳定,出现内存超限、状态读取性能下降等问题,只能不断增加作业的 TM 数和内存大小来缓解。

即使这样,业务上仍然存在两个痛点。首先是关联数据初始化难,目前公司 Kafka 数据源对历史回溯有限制,因此业务不能构建出完整的历史状态,即使 Kafka 支持了更久的回溯,状态初始化的效率也依然是一个问题。其次,内存资源开销大,特别是当多个 SQL 作业关联相同的数据源时,需要为每个 SQL 作业都分配相应的内存资源,不同 SQL 作业间的状态是隔离的,作业间相同的关联数据不能复用。

解决方案对于上述问题,我们提出了冷热关联分离的解决方案。假设关联两天前的数据是相对低频的且状态回滚不会超过两天,那么可以定义两天前的数据为冷数据,两天之内的数据为热数据。

如上图所示,左侧的 SQL 作业通过设置状态保留时长,只保留 T+0 和 T+1 这两天的热数据,而 T+2 及更久以前的冷数据则通过批任务每天从 Hive 同步到外存 KV 中。关联时,若状态中的热数据不存在,则再通过访问外存 KV 来关联冷数据。右侧是另外一个 SQL 作业需要关联相同的数据源,它与左侧的 SQL 作业共享外层 KV 中的冷数据。

对于第一个痛点,因为状态控制在了两天内,SQL 作业上线时,关联数据初始化的数据量得到了控制。对于第二个痛点,因为两天前的大部分数据都保存在外层KV中,不同的 SQL 作业都可以查询外存 KV,从而可以节省大量内存资源。

2.2 SQL 变更状态恢复问题

第二个问题是有状态 SQL 逻辑变更后状态如何恢复?FlinkSQL 支持有状态的增量计算,状态是增量计算的历史累计,实际上业务需要修改逻辑的情况很多,上图右侧列出了一些常见的 SQL 变更情况,比如新增聚合指标、修改原指标口径、增加过滤条件、新增数据流关联、增加聚合维度等。

举个例子,业务增加了更多服务维度,在数据产品上就需要扩展分析的维度,因此也需要修改 FlinkSQL 增加聚合维度。但是上述 SQL 逻辑变化后却不能从之前的状态恢复,因为历史状态对于变更后的 SQL 不能保证其完整性,即使恢复后也不能百分百保证后续计算的正确性。这种情况下,业务为了保证数据的正确性,需要从历史回溯重新计算,回溯的过程会导致线上断流,但业务又不希望牺牲太多的时效性。

解决方案针对这个问题,我们给出了三种解决方案。

解法 1:双链路切换。此解法的关键是再搭建一条相同的实时链路作为备用链路,当变更有状态 SQL 时,可以在备用链路上做回溯,重新计算历史数据,回溯完成后先验证备用链路的结果数据,确保没问题后再在链路最下游的数据服务层切换读取的表,完成整个变更流程。

解法 2:旁路状态生成。与双链路切换不同点在于,这里变更的是链路上的单个作业,思路是临时启动一个旁路作业来回溯,构建出新逻辑的状态,验证数据完成后再重启线上作业,以此完成 SQL 和状态的同时切换。

解法 3:历史状态迁移,前两个方法的思路比较类似,都是基于历史数据重新计算,构建出新状态。但这个思路是基于历史状态迁移出新状态,这种方法构建出的新状态虽然不能保证完整性,但在某些情况下,业务也是可以接受的。目前我们通过改造 State Process API 支持在 SQL 算子及其上下游关系不变的情况下,允许 Join 和 Agg 算子来新增列。

上述三种方式各有优点,可以从普适性、资源成本、线上断流、等待时长四个维度来对以上三个解决方案进行横向比较。

普适性是指在保证数据正确的前提下支持的 SQL 变更范围,前两个方法都是重新计算,状态是完整的,因此比方案 3 的普适性更高。

资源成本是指完成 SQL 变更所需要的额外 Flink 或 Kafka 资源,方法 1 需要构建整条链路,需要更多的 Flink 和 Kafka 资源,因此成本最高。

线上断流指的是在变更过程中导致下游数据延迟的时长,方法 1 是在数据服务层做切换,几乎没有断流;方法 2 的断流时长取决于作业从状态恢复的速度;方法 3 除了状态恢复,还需要考虑状态迁移的速度。

等待时长指的是完成整个变更流程需要的时间,前两个方法都需要重新计算,因此比方法 3 的等待时间更长。

上图是方法 2 的平台自动化流程。流程分为七个阶段,变更流程执行的时间较长,可能需要几十分钟,通过流程条以及图中每个阶段的执行日志可以让用户感受到变更的进度和状态。我们还为用户做了自动化指标检查,比如在第 2 个阶段的旁路数据回溯中,我们会检查作业消费 Kafka 的积压指标,来判断回溯是否完成,完成后自动制作新逻辑状态。再比如在第 6 个阶段,原作业从旁路作业启动时会比较 Kafka Offset 指标来比较两个作业的消费进度,确保线上作业重启后不会少发数据。

2.3 FlinkSQL 调试繁琐问题

遇到的第 3 个问题是 FlinkSQL 调试繁琐,操作步骤多,业务需要创建额外的作业和 Kafka,还要将导出的结果进行存储。此外,输入构造复杂,为了针对性地调试某种输入场景,业务需要写代码来构建消息并写入数据源,甚至需要对多个不同数据源消息到来的顺序进行控制。上图左侧可以看到,为了做 FlinkSQL 调试,需要手动搭建一条与线上隔离的调试链路,然后写入 Mock 数据。

解决方案‍

针对上述问题的解法是:基于文件调试一键化。首先业务在 Web 端可以在线编辑 Mock 数据,Mock 数据是有界的消息序列,它的初始化可以先从线上抽样,然后再由业务进行修改。业务构建完 Mock 数据后,会将 SQL 作业的 Mock 数据持久化到右侧的 S3 文件对象系统上。业务在 Web 端点击调试,左侧发起的调试任务会在与线上隔离的服务器上单进程执行,执行时会从 S3 获取之前上传的 Mock 数据,而且可以根据 Mock 数据指定的多源消息之间的到达顺序和消息之间的发送间隔来执行,执行完成后会将输出结果也持久化到 S3,最后在 Web 端查询 S3 呈现给业务。

更多情况下业务不需要修改 Mock 数据,只需要做抽样和执行两步操作。另外我们也支持了一些调试的高级功能,比如支持控制消息的顺序和间隔。

上图是基于以上解法的调试工具。业务会为 SQL 作业创建多个测试用例,其中包括了 Source 的 Mock 数据和 Sink 的预期结果。执行调试后,会检查所有测试用例的通过情况,通过的条件是要保证结果流 Merge 之后的表与预期表数据一致。

2.4 SQL 作业异常定位问题

第 4 个问题是 FlinkSQL 作业的异常定位。作业异常是指作业消费 Kafka 出现了积压,为了解决这个问题,需要定位出产生积压的原因。而定位原因时,归因的路径比较复杂,排查门槛比较高。另外由于归因的路径缺少系统化的沉淀,定位花费的时间也比较长。随着 SQL 作业的数量越来越多,如果完全依赖人工排查,工作量将会非常巨大。解决方案

针对上述为的解决方法是实现 SQL 作业的自动化异常诊断。通过 Flink Reporter 上报 SQL 作业的运行指标,并持久化到 TSDB 中用于历史查询。同时也会持久化 SQL 作业的运行日志,报警服务会根据规则监控 SQL 作业上报的 Kafka Offset 指标,当消费的 Offset 落后于生产的 Offset 时,会判定位作业发生消费积压,然后发出报警并下发异常事件,诊断服务会监听报警服务的异常事件。

异常发生时,根据异常时间窗口内作业日志和作业指标分析异常原因,诊断服务可以通过增加规则来沉淀人工排查的经验。比如发生了 Restart,就会从日志中根据关键字来提取异常信息,未发生 Restart 则会根据反压指标找出瓶颈节点,然后结合 GC 指标、数据倾斜、火焰图等来分析瓶颈的原因,最后提出调优建议。

上图展示了诊断出业务消息脏数据的例子。图中的运行概况一栏会给出 SQL 作业在每个时间检查点的诊断情况,绿色表明运行正常,红色表明作业存在异常,通过这个时间线可以清楚看到异常发生的时间点。诊断结果栏中可以看到异常的原因、详情和建议。比如在这个事例中,原因是业务消息存在脏数据,在详情中可以看到导致作业异常的原始消息内容,在建议中会提示业务配置脏数据的处理策略。

03未来规划

未来,美团实时数仓平台的规划主要包括以下两个方面。

  • 首先,是流批一体开发运维,我们即将在实时数仓平台集成数据湖存储,并开放 FlinkSQL 的批作业,在存储和计算层都做到流批统一,提高工作效率。
  • 其次,是作业的自动调优,继续提升作业诊断的准确率以及作业重启的效率。​
 
友情链接
鄂ICP备19019357号-22