Flink核心是一个流式的数据流执行引擎,其针对数据流的分布式计算提供了数据分布、数据通信以及容错机制等功能

Flink提供了诸多更高抽象层的API以便用户编写分布式任务:

DataSet API, 对静态数据进行批处理操作,将静态数据抽象成分布式的数据集,用户可以方便地使用Flink提供的各种操作符对分布式数据集进行处理,支持Java、Scala和PythonDataStream API,对数据流进行流处理操作,将流式的数据抽象成分布式的数据流,用户可以方便地对分布式数据流进行各种操作,支持Java和Scala

Table API,结构化数据进行查询操作,将结构化数据抽象成关系表,并通过类SQL的DSL对关系表进行各种查询操作,支持Java和Scala

对于一个流处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,然后立刻通过网络传输到下一个节点,由下一个节点继续处理

而对于一个批处理系统,其节点间数据传输的标准模型是:当一条数据被处理完成后,序列化到缓存中,并不会立刻通过网络传输到下一个节点,当缓存写满,就持久化到本地硬盘上,当所有数据都被处理完成后,才开始将处理后的数据通过网络传输到下一个节点

这两种数据传输模式是两个极端,对应的是流处理系统对低延迟的要求和批处理系统对高吞吐量的要求

Flink的执行引擎同时支持了这两种数据传输模型,Flink以固定的缓存块为单位进行网络数据传输,用户可以通过缓存块超时值指定缓存块的传输时机,超时值为0,则是流处理系统的标准模型,此时可以获得最低的处理延迟,缓存块的超时值为无限大,则Flink的数据传输方式类似上文所提到批处理系统的标准模型。

缓存块的超时阈值越小,则流处理数据的延迟越低,但吞吐量也会变低。根据超时阈值来灵活权衡系统延迟和吞吐量。Flink基于分布式快照与可部分重发的数据源实现了容错。

用户可自定义对整个Job进行快照的时间间隔,当任务失败时,Flink会将整个Job恢复到最近一次快照,并从数据源重发快照之后的数据。

按照用户自定义的快照间隔时间,flink会定时在数据源中插入快照标记的消息,快照消息和普通消息都在DAG中流动,但不会被用户定义的逻辑所处理,每一个快照消息都将其所在的数据流分成2部分:本次快照数据和下次快照数据。当操作符处理到快照标记消息,对自己的状态进行快照标记并缓存。操作符对自己的快照和状态可以是异步,增量操作,并不阻塞消息处理。当所有的终点操作符都收到快照标记信息并对自己的状态快照和存储后,整个分布式快照就完成了。同时通知数据源释放该快照标记消息之前的所有消息。若之后的节点崩溃等异常,就可以恢复分布式快照状态。并从数据源重发该快照以后的消息。

flink基于分布式快照实现了一次性。

 

目前大部分流处理系统来说,时间窗口一般是根据Task所在节点的本地时钟进行切分,

是可能无法满足某些应用需求,比如:

消息本身带有时间戳,用户希望按照消息本身的时间特性进行分段处理。

下一个节点时可能被切分到不同的时间窗口中,从而产生不符合预期的结果

Flink支持3种类型的时间窗口:

1.Operator Time。根据Task所在节点的本地时钟来切分的时间窗口

乱序的消息延迟很高会影响分布式系统的吞吐量和延迟。

其乱序的消息延迟不会很高,因此对Flink分布式系统的吞吐量和延迟的影响也会更小。

WaterMark标记所有小于该时间戳的消息都已流入

一个可能的优化措施是,对于聚合类的操作符,可以提前对部分消息进行聚合操作,当有属于该时间窗口的新消息流入时,基于之前的部分聚合结果继续计算,这样的话,只需缓存中间计算结果即可,无需缓存该时间窗口的所有消息

flink基于watermark实现了基于时间戳的全局排序:

排序操作:排序操作符缓存所有流入的消息,当接收到watermark时,对时间戳小于该watermark的消息进行排序,并发送到下一个节点。在此排序操作符中释放所有时间戳小于该watermark的消息,继续缓存流入的消息。等待下一次watermark触发下一次排序。

保证了其之后不会出现时间戳比它小的消息,因此可以保证排序的正确性。请注意:排序操作符有多个节点,只能保证每个节点流出的消息有序,节点之间的消息不能有序,要实现全局有序,则只能有一个排序操作符节点。

需要4个字节存储

需要48字节的空间来存储。对于大部分的大数据应用,内存都是稀缺资源,更有效率地内存存储,意味着CPU数据访问吞吐量更高,以及更少磁盘落地的存在。

分钟级的垃圾回收极大影响了Java应用的性能和可用性。

这也远远不够:

 

为了解决以上提到的问题,高性能分布式计算框架通常需要以下技术:

Flink的处理策略:

定制的序列化工具,显式内存管理的前提步骤就是序列化,用的序列化框架,如Java默认使用java.io.Serializable

这种方式效率最高。

计算密集的数据结构和算法,直接操作序列化后的二进制数据,而不是将对象反序列化后再进行操作。

么访问Key时的Cache命中率会大大提高。

只保存一份对象Schema信息,节省大量的存储空间

也达到了和Hadoop类似的序列化效率。

由TypeInformation类表示,这个类有诸多具体实现类

例如

任意Java基本类型(装包或未装包)和String类型

BasicArrayTypeInfo任意Java基本类型数组(装包或未装包)和String数组

3.WritableTypeInfo任意Hadoop的Writable接口的实现类

Java Tuple实现

Scala tuples)

PojoTypeInfo任意的POJO (Java or Scala),Java对象的所有成员变量,要么是public修饰符定义,要么有getter/setter方法

7.GenericTypeInfo任意无法匹配之前几种类型的类。

Flink皆可以自动生成对应的TypeSerializer定制序列化工具,非常有效率地对数据集进行序列化和反序列化

使用Kryo进行序列化和反序列化

进行compare、hash等操作

TypeSerializer、TypeComparator,如图6所示:

实现自己的序列化工具。

JDK8的G1算法改善了JVM垃圾回收的效率和可用范围

通过JVM进行内存管理的话,OutOfMemoryError也是一个很难解决的问题

在JVM内存管理中,Java对象有潜在的碎片化存储问题

Flink将内存分为3个部分,每个部分都有不同用途:

1.Network buffers: 一些以32KB Byte数组为单位buffer,主要被网络模块用于数据的网络传输,基于Netty的网络传输

2.Memory Manager pool大量以32KB Byte数组为单位的内存池,所有的运行时算法(例如Sort/Shuffle/Join)都从这个内存池申请内存并将序列化后的数据存储其中,结束后释放回内存池。通常会配置为最大的一块内存,

3. Remaining (Free) Heap主要留给UDF中用户自己创建的Java对象,由JVM管理。Flink也不鼓励用户在UDF中缓存很多数据。。 Remaining Heap的内存虽然由JVM管理,但是由于其主要用来存储用户处理的流式数据,生命周期非常短,速度很快的Minor GC就会全部回收掉,一般不会触发Full GC

在Flink中,内存池由多个MemorySegment组成,每个MemorySegment代表一块连续的内存,底层存储是byte[],默认32KB大小。

MemorySegment提供了根据偏移量访问数据的各种方法,如get/put int、long、float、double等,MemorySegment之间数据拷贝等方法和java.nio.ByteBuffer类似。

于Flink的数据结构,通常包括多个向内存池申请的MemeorySegment,所有要存入的对象通过TypeSerializer序列化之后,将二进制数据存储在MemorySegment中,在取出时通过TypeSerializer反序列化

数据结构通过MemorySegment提供的set/get方法访问具体的二进制数据

Flink这种看起来比较复杂的内存管理方式带来的好处主要有:

1.二进制的数据存储大大提高了数据存储密度,节省了存储空间。所有的运行时数据结构和算法只能通过内存池申请内存,保证了其使用的内存大小是固定的,不会因为运行时数据结构和算法而发生OOM

Flink当前的内存管理在最底层是基于byte[],

flink排序算法的实现:

1.将待排序的数据经过序列化存储在两个不同的MemorySegment集中,数据全部的序列化值存放于其中一个MemorySegment集中。数据序列化后的Key和指向第一个MemorySegment集中值的指针存放于第二个MemorySegment集中。对第二个MemorySegment集中的Key进行排序,如需交换Key位置,只需交换对应的Key+Pointer的位置,第一个MemorySegment集中的数据无需改变。 当比较两个Key大小时,TypeComparator提供了直接基于二进制数据的对比方法,无需反序列化任何数据。排序完成后,访问数据时,按照第二个MemorySegment集中Key的顺序访问,并通过Pointer值找到数据在第一个MemorySegment集中的位置,通过TypeSerializer反序列化成Java对象返回。

flink流处理内容

通过Key和Full data分离存储的方式尽量将被操作的数据最小化,提高Cache命中的概率,从而提高CPU的吞吐量。 移动数据时,只需移动Key+Pointer,而无须移动数据本身,大大减少了内存拷贝的数据量。 TypeComparator直接基于二进制数据进行操作,节省了反序列化的时间。

DataSet API级别的执行计划优化器,原生的迭代操作符等,

 

相关文章: