文章大纲:
- Async IO的概述以及使用
- Async IO的原理
一、Async IO概述
1.什么是Async IO
对于IO,我们都清楚有同步IO和异步IO,那么在Flink中,什么是Async IO,它是怎么样应用的呢?在有些流式应用的场合中,我们都会去与外部系统进行交互,比如连接数据库等。当我们需要向外部系统发送一个请求a的时候,我们需要等待它返回结果,这是同步的模式。考虑到吞吐量和延迟,我们可以并发的发送请求a,b,c,当哪个请求先回复的时候先处理哪个,这样可以在连续的请求中避免不必要的等待,这就是Async IO。它是由阿里巴巴捐献给Flink社区的,于1.2版本后引入。
2.Async IO的优势
在Flink中,为了提高吞吐量,我们可以提高并行度,比如提高MapFunction的并行度,但是这样做意外这需要更多的subtask,网络资源,连接资源,缓冲区等等。
而与数据库的异步交互意味着一个并行函数实例可以同时处理多个请求并同时接收响应(资源复用)。这样,等待时间可以与发送其他请求和接收响应重叠。至少,等待时间是在多个请求上平摊的。这在大多数情况下会导致更高的流吞吐量。
3.Async IO实现异步流式转换
- 实现用来分发请求的AsyncFunction ,用来向数据库发送异步请求并设置回调
- 获取操作结果的callback,并将它提交给ResultFuture
- 将异步I/O操作应用于DataStream
Aysnc IO的应用主要上面的三个步骤,下面我们看一个官方给出的例子,这里并没有直接连接数据库,而是使用线程池模拟并发连接,然后处理请求:
public class AsyncIOExample {
private static final Logger LOG = LoggerFactory.getLogger(AsyncIOExample.class);
private static final String EXACTLY_ONCE_MODE = "exactly_once";
private static final String EVENT_TIME = "EventTime";
private static final String INGESTION_TIME = "IngestionTime";
private static final String ORDERED = "ordered";
/**
* A checkpointed source.
*/
private static class SimpleSource implements SourceFunction<Integer>, ListCheckpointed<Integer> {
private static final long serialVersionUID = 1L;
private volatile boolean isRunning = true;
private int counter = 0;
private int start = 0;
@Override
public List<Integer> snapshotState(long checkpointId, long timestamp) throws Exception {
return Collections.singletonList(start);
}
@Override
public void restoreState(List<Integer> state) throws Exception {
for (Integer i : state) {
this.start = i;
}
}
public SimpleSource(int maxNum) {
this.counter = maxNum;
}
@Override
public void run(SourceContext<Integer> ctx) throws Exception {
while ((start < counter || counter == -1) && isRunning) {
synchronized (ctx.getCheckpointLock()) {
ctx.collect(start);
++start;
// loop back to 0
if (start == Integer.MAX_VALUE) {
start = 0;
}
}
Thread.sleep(10L);
}
}
@Override
public void cancel() {
isRunning = false;
}
}
/**
* An sample of {@link AsyncFunction} using a thread pool and executing working threads
* to simulate multiple async operations.
*
* <p>For the real use case in production environment, the thread pool may stay in the
* async client.
*/
private static class SampleAsyncFunction extends RichAsyncFunction<Integer, String> {
private static final long serialVersionUID = 2098635244857937717L;
private transient ExecutorService executorService;
/**
* The result of multiplying sleepFactor with a random float is used to pause
* the working thread in the thread pool, simulating a time consuming async operation.
*/
private final long sleepFactor;
/**
* The ratio to generate an exception to simulate an async error. For example, the error
* may be a TimeoutException while visiting HBase.
*/
private final float failRatio;
private final long shutdownWaitTS;
SampleAsyncFunction(long sleepFactor, float failRatio, long shutdownWaitTS) {
this.sleepFactor = sleepFactor;
this.failRatio = failRatio;
this.shutdownWaitTS = shutdownWaitTS;
}
//打开连接资源
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
executorService = Executors.newFixedThreadPool(30);
}
//关闭连接
@Override
public void close() throws Exception {
super.close();
ExecutorUtils.gracefulShutdown(shutdownWaitTS, TimeUnit.MILLISECONDS, executorService);
}
//自定义一些处理逻辑,将数据发送给flink下游去处理
@Override
public void asyncInvoke(final Integer input, final ResultFuture<String> resultFuture) {
executorService.submit(() -> {
// wait for while to simulate async operation here
long sleep = (long) (ThreadLocalRandom.current().nextFloat() * sleepFactor);
try {
Thread.sleep(sleep);
if (ThreadLocalRandom.current().nextFloat() < failRatio) {
resultFuture.completeExceptionally(new Exception("wahahahaha..."));
} else {
resultFuture.complete(
Collections.singletonList("key-" + (input % 10)));
}
} catch (InterruptedException e) {
resultFuture.complete(new ArrayList<>(0));
}
});
}
}
private static void printUsage() {
System.out.println("To customize example, use: AsyncIOExample [--fsStatePath <path to fs state>] " +
"[--checkpointMode <exactly_once or at_least_once>] " +
"[--maxCount <max number of input from source, -1 for infinite input>] " +
"[--sleepFactor <interval to sleep for each stream element>] [--failRatio <possibility to throw exception>] " +
"[--waitMode <ordered or unordered>] [--waitOperatorParallelism <parallelism for async wait operator>] " +
"[--eventType <EventTime or IngestionTime>] [--shutdownWaitTS <milli sec to wait for thread pool>]" +
"[--timeout <Timeout for the asy nchronous operations>]");
}
public static void main(String[] args) throws Exception {
// obtain execution environment
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
// parse parameters
final ParameterTool params = ParameterTool.fromArgs(args);
final String statePath;
final String cpMode;
final int maxCount;
final long sleepFactor;
final float failRatio;
final String mode;
final int taskNum;
final String timeType;
final long shutdownWaitTS;
final long timeout;
try {
// check the configuration for the job
statePath = params.get("fsStatePath", null);
cpMode = params.get("checkpointMode", "exactly_once");
maxCount = params.getInt("maxCount", 100000);
sleepFactor = params.getLong("sleepFactor", 100);
failRatio = params.getFloat("failRatio", 0.001f);
mode = params.get("waitMode", "ordered");
taskNum = params.getInt("waitOperatorParallelism", 1);
timeType = params.get("eventType", "EventTime");
shutdownWaitTS = params.getLong("shutdownWaitTS", 20000);
timeout = params.getLong("timeout", 10000L);
} catch (Exception e) {
printUsage();
throw e;
}
StringBuilder configStringBuilder = new StringBuilder();
final String lineSeparator = System.getProperty("line.separator");
configStringBuilder
.append("Job configuration").append(lineSeparator)
.append("FS state path=").append(statePath).append(lineSeparator)
.append("Checkpoint mode=").append(cpMode).append(lineSeparator)
.append("Max count of input from source=").append(maxCount).append(lineSeparator)
.append("Sleep factor=").append(sleepFactor).append(lineSeparator)
.append("Fail ratio=").append(failRatio).append(lineSeparator)
.append("Waiting mode=").append(mode).append(lineSeparator)
.append("Parallelism for async wait operator=").append(taskNum).append(lineSeparator)
.append("Event type=").append(timeType).append(lineSeparator)
.append("Shutdown wait timestamp=").append(shutdownWaitTS);
LOG.info(configStringBuilder.toString());
if (statePath != null) {
// setup state and checkpoint mode
env.setStateBackend(new FsStateBackend(statePath));
}
if (EXACTLY_ONCE_MODE.equals(cpMode)) {
env.enableCheckpointing(1000L, CheckpointingMode.EXACTLY_ONCE);
}
else {
env.enableCheckpointing(1000L, CheckpointingMode.AT_LEAST_ONCE);
}
// enable watermark or not
if (EVENT_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
}
else if (INGESTION_TIME.equals(timeType)) {
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
}
// create input stream of an single integer
DataStream<Integer> inputStream = env.addSource(new SimpleSource(maxCount));
// create async function, which will *wait* for a while to simulate the process of async i/o
AsyncFunction<Integer, String> function =
new SampleAsyncFunction(sleepFactor, failRatio, shutdownWaitTS);
// add async operator to streaming job
DataStream<String> result;
if (ORDERED.equals(mode)) {
result = AsyncDataStream.orderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20).setParallelism(taskNum);
}
else {
result = AsyncDataStream.unorderedWait(
inputStream,
function,
timeout,
TimeUnit.MILLISECONDS,
20).setParallelism(taskNum);
}
// add a reduce to get the sum of each keys.
result.flatMap(new FlatMapFunction<String, Tuple2<String, Integer>>() {
private static final long serialVersionUID = -938116068682344455L;
@Override
public void flatMap(String value, Collector<Tuple2<String, Integer>> out) throws Exception {
out.collect(new Tuple2<>(value, 1));
}
}).keyBy(0).sum(1).print();
// execute the program
env.execute("Async IO Example");
}
}`
4.Aysnc IO的超时处理
当异步IO请求超时时,默认会抛出异常并且重启job。如果希望我们可以自己处理超时,那么可以覆盖AsyncFunction的timeout方法,源码如下所示:
5.Asycn IO应用于DataStream
对于异步IO并不难理解,但是它是怎样应用于DataStream的呢?在这里它提供了一个工具AsyncDataStream,为了控制消息发送Flink处理的顺序(这个顺序基于哪个请求先处理完成),Flink提供了两种模式:有序和无序,这里对应于AsyncDataStream的两个静态方法:
AsyncDataStream.orderedWait();
AsyncDataStream.unorderWait();
- orderedWait(有序):消息的发送顺序与接收到的顺序相同(包括 watermark ),也就是先进先出。
- unorderWait(无序):
- 在ProcessingTime中,完全无序,即哪个请求先返回结果就先发送(最低延迟和最低消耗)。
- 在EventTime中,以watermark为边界,介于两个watermark之间的消息可以乱序,但是watermark和消息之间不能乱序,这样既认为在无序中又引入了有序,这样就有了与有序一样的开销。(具体我们会在后面的原理中讲解)。
6.容错与故障恢复
Async I/O operator提供完全exactly-once容错保证,它将运行中的异步请求记录存储在检查点中,并在从故障恢复时恢复/重新触发请求。
所有的StreamRecords将会记录在State中,而不会一个一个的处理这些输入流记录,AsyncWaitOperator 将会在snapshotting operator state时将所有的输入流记录放入AsyncCollectorBuffer中,所有的旧的记录也会在持久化之前从State中移除。
当故障恢复后,一个operator重新启动会扫描在State中的所有元素,得到AsyncCollectors,然后调用AsyncFunction.asyncInvoke()并且插入到AsyncCollectorBuffer。
为了保证状态的一致性,我们需要使用快照来分析哪些数据不需要,哪些是重发的,需要保证Exactly-Once语义,而已经完成回调且未发往下游的元素,加上未完成回调的元素,就是上述队列中
的所有元素。
所以快照的逻辑如下:
- 清空原有的状态存储
- 遍历uncompletedQueue中的所有 Promise,从中取出 StreamElement(消息或 watermark)并
放入状态存储中(3)执行快照操作。
恢复的时候,从快照中读取所有的元素全部再处理一次,当然包括之前已完成回调的元素。所以在
失败恢复后,会有元素重复请求外部服务,但是每个回调的结果只会被发往下游一次,因此保证了Exactly-Once语义。
二、Aysnc IO的原理实现
1.Aynsc IO的实现原理
从上面的例子中我们可以看到,将DataStream和AsyncFunction联系在一起的是AsyncDataStream的两个静态方法,orderedWait和unorderedWait方法,下面我们进入到这个方法的源码中一探究竟。
/**
* Add an AsyncWaitOperator. The order to process input records is guaranteed to be the same as
* input ones.
*
* @param in Input {@link DataStream}
* @param func {@link AsyncFunction}
* @param timeout for the asynchronous operation to complete
* @param timeUnit of the given timeout
* @param capacity The max number of async i/o operation that can be triggered
* @param <IN> Type of input record
* @param <OUT> Type of output record
* @return A new {@link SingleOutputStreamOperator}.
*/
public static <IN, OUT> SingleOutputStreamOperator<OUT> orderedWait(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
TimeUnit timeUnit,
int capacity) {
return addOperator(in, func, timeUnit.toMillis(timeout), capacity, OutputMode.ORDERED);
}
在orderedWait方法中又调用了addOperator方法:
/**
* Add an AsyncWaitOperator.
*
* @param in The {@link DataStream} where the {@link AsyncWaitOperator} will be added.
* @param func {@link AsyncFunction} wrapped inside {@link AsyncWaitOperator}.
* @param timeout for the asynchronous operation to complete
* @param bufSize The max number of inputs the {@link AsyncWaitOperator} can hold inside.
* @param mode Processing mode for {@link AsyncWaitOperator}.
* @param <IN> Input type.
* @param <OUT> Output type.
* @return A new {@link SingleOutputStreamOperator}
*/
private static <IN, OUT> SingleOutputStreamOperator<OUT> addOperator(
DataStream<IN> in,
AsyncFunction<IN, OUT> func,
long timeout,
int bufSize,
OutputMode mode) {
TypeInformation<OUT> outTypeInfo = TypeExtractor.getUnaryOperatorReturnType(
func,
AsyncFunction.class,
0,
1,
new int[]{1, 0},
in.getType(),
Utils.getCallLocationName(),
true);
// create transform
AsyncWaitOperator<IN, OUT> operator = new AsyncWaitOperator<>(
in.getExecutionEnvironment().clean(func),
timeout,
bufSize,
mode);
return in.transform("async wait operator", outTypeInfo, operator);
}
AsyncWaitOperator是一个重要的组件,它是Async IO的主要实现,其主要由一个StreamElementQueue和Emitter组成,它会异步的处理所有输入流记录,对于每一个流记录创建AsyncFunction进行异步处理,实际上是加入到StreamElementQueue,并且调用asyncInvoke方法注册回调,当记录处理完毕之后,该回调函数会调用AysncCollector.collect方法将结果返回给框架处理,并且通过Emitter将处理完毕的记过发送给Flink下游去处理。
实际上StreamElementQueue是一个接口,它有两个实现类OrderedStreamElementQueue和UnOrderedStreamElementQueue:
进入到这个实现类中,我们可以看到,它主要是通过java AQS和ArrayDeque双端队列来实现的:
在这个队列中,每一个元素被包装成StreamElementQueueEntry,它实际上是Promise的抽象实现(参考Scala的Promise)在这个Entry类中,它会注册回调函数并且判断是否完成然后将消息发送给下游处理:
到这里,我们大概了解到了其基本的工作的原理,上面提到过Async IO有两种工作模式,有序和无序,下面我们来具体看下它的原理。
2. Async IO处理消息的顺序性
有序
对于有序而言,实现比较简单,它会将每一个消息封装为Promise并且按照顺序放入队列中,尽管P4请求先返回,但是Emitter并不会处理,它必须等待P1请求返回才会处理。这样既保证了顺序性,每次都从队头处理。
Processing无序
ProcessingTime 无序也比较简单,因为没有 watermark,不需要协调 watermark 与消息的顺序性,所以使用两个队列就能实现,一个 uncompletedQueue 一个 completedQueue。所有新进入该算子的元素,同样的包装成Promise 并放入 uncompletedQueue 队列,当uncompletedQueue队列中任意的Promise返回了数据,则将该Promise 移到 completedQueue 队列中,并通知 Emitter 消费。如下图所示:
EventTime无序EventTime 无序类似于有序与 ProcessingTime 无序的结合体。因为有 watermark,需要协调 watermark 与消息之间的顺序性,所以uncompletedQueue中存放的元素从原先的 Promise 变成了 Promise 集合。如果进入算子的是消息元素,则会包装成Promise 放入队尾的集合中。如果进入算子的是 watermark,也会包装成 Promise 并放到一个独立的集合中,再将该集合加入到uncompletedQueue 队尾,最后再创建一个空集合加到 uncompletedQueue 队尾。这样,watermark 就成了消息顺序的边界。只有处在队首的集合中的 Promise 返回了数据,才能将该 Promise 移到 completedQueue 队列中,由 Emitter 消费发往下游。只有队首集合空了,才能处理第二个集合。这样就保证了当且仅当某个 watermark 之前所有的消息都已经被发送了,该watermark 才能被发送。过程如下图所示:
参考资料:
https://ci.apache.org/projects/flink/flink-docs-release-1.6/dev/stream/operators/asyncio.html
https://cwiki.apache.org/confluence/pages/viewpage.action?pageId=65870673