最近一段时间处理的业务数据量的增长和处理流程的复杂性,导致flink实时作业不仅仅局限于结果正确,更多的精力放在了性能优化上,接下来对近期CheckPoint时长优化进行总结。
我们都知道checkpoint的重要性,整体过程如下:

  1. JobManager 向 Source 算子发送 Barrier ,初始化
    Checkpoint,即JM向Source发起Trigger操作;
  2. 各个Source 算子一旦收到 Barrier 之后,开始Init自身的State,并同时向下游发送 Barrier;
  3. 下游算子收到 Barrier 后,进行 Barrier Alignment
    处理,且若有多个input时并且收到所有的input的Barrier才会开始做Init
    State,同时继续往下游发送Bassier,直到sink算子
    (1.10之前需要Barrier对齐,1.11以后可以选择UnAlignment);
  4. 算子做自身的CP时,分为同步和异步,
    同步阶段的 Snapshot操作:
    a.对state做深拷贝。
    b.将写操作封装在异步的FutureTask中;
    异步阶段的 Snapshot:
    a.执行同步阶段创建的FutureTask
    b.向Checkpoint Coordinator发送ACK响应;
  5. 各个算子(或者说Task) 做完 Checkpoint 之后,再上报 JobManager,JM收到所有算子的ACK,则认为这次CP
    完成了。

下面分别介绍几种遇到的CP超时问题。

第一种、计算量大,CPU密集性,导致TM内线程一直在processElement,而没有时间做CP

代表性作业为算法指标-用户偏好的计算,需要对用户在商城的曝光、点击、订单、出价、上下滑等所有事件进行比例计算,并且对各个偏好值进行比例计算,事件时间范围为近24小时。等于说每来一条数据,都需要对用户近24小时内所有的行为事件进行分类汇总,求比例,再汇总,再求比例,而QPS是1500,24小时1.5亿的累积数据,逻辑处理的算子根本无法将接收到的数据在合适的时间内计算完毕,这里还有个有趣的现象,为了提高处理性能,我将并行度翻倍,结果checkpoint的时间反而更长了,原因是Source的并行度也增加后,读取源数据的速度更快了~

Flink-跟着问题读源码:CheckPoint超时问题总结
从图片中可以看到source、sink的cp时间都很快,只有处理节点的‘End to End Duration’时间特别长,其他的‘Checkpoint Duration (Sync)’、‘Checkpoint Duration (Async)’时间都很短,都为几百毫秒。
Flink-跟着问题读源码:CheckPoint超时问题总结

那么怎么办呢?这里我也反思了自己的实现逻辑,实时计算中,flink是流引擎,正确的使用姿势应该是对每一条数据进行实时处理,而不应该对较长历史时间范围内的历史数据进行批处理,如果每条数据来还需要对历史数据重新计算计算,那么就不符合flink的定位。所以和算法同学商议后,将实现逻辑进行修改,进行批流分开计算,比如离线数据每半个小时进行一次计算,而实时计算只需要计算最近半小时内的数据即可。

第二种、数据倾斜

代表性作业对手机的uuid(设备编号)进行keyby,结果导致subtask的state大小差异一倍,两种方法,第一,两阶段聚合;第二,重新设置并行度,改变KeyGroup的分布
Flink-跟着问题读源码:CheckPoint超时问题总结

第三种、‘Checkpoint Duration (Async)’时间最长

Flink-跟着问题读源码:CheckPoint超时问题总结

当StateSize达到200M以上,Async的时间会超过1min。
Flink-跟着问题读源码:CheckPoint超时问题总结

这种情况特别少见,因为RocksDb State的异步阶段做的事情主要是将本地KV数据库里的增量State写到HDFS上,如果flink配置了增量chekcPoint是不太可能出现单个作业异步处理特别慢的现象。因此猜测是由于TM出现频繁FGC,导致线程根本没有足够的时间片去处理。
结果也确实如此,jstat -gcutil pid 1s,发现每4秒一个fgc。
dump分析

  1. jmap -dump:format=b,file=jconsole.dump PID
  2. ./ParseHeapDump.sh jconsole.dump org.eclipse.mat.api:suspects
    org.eclipse.mat.api:overview org.eclipse.mat.api:top_components

Flink-跟着问题读源码:CheckPoint超时问题总结

Flink-跟着问题读源码:CheckPoint超时问题总结
还有个有趣的现象是出现FGC时,反压机制会无法生效,在‘BackPressure’界面会一片空白~
解决方法,keyby的key过多,要么减少key的数量,要么加大TM的内存。

相关文章: