【问题标题】:How to connect two unkeyed streams and share states with each other in Flink?如何在 Flink 中连接两个 unkeyed 流并相互共享状态?
【发布时间】:2019-01-02 15:57:30
【问题描述】:

Flink 1.6.1 版

在下面的例子中,我想连接两个无键的流。但似乎这两个流无法正确共享状态。我不知道实现它的正确方法是什么。

代码:

public class TransactionJob {
public static void main(String[] args) throws Exception {
    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    DataStream<String> stream1 = env.fromElements("1", "2");
    DataStream<Integer> stream2 = env.fromElements(3, 4, 5);
    ConnectedStreams<String, Integer> connectedStreams = stream1.connect(stream2);
    DataStream<String> resultStream = connectedStreams.process(new StringIntegerCoProcessFunction());
    resultStream.print().setParallelism(1);
    env.execute();
}

private static class StringIntegerCoProcessFunction extends CoProcessFunction<String, Integer, String> implements CheckpointedFunction {
    private transient ListState<String> state1;
    private transient ListState<Integer> state2;

    @Override
    public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
        state1.add(value);
        print(value);
    }

    @Override
    public void processElement2(Integer value, Context ctx, Collector<String> out) throws Exception {
        state2.add(value);
        print(value.toString());
    }

    private void print(String value) throws Exception {
        StringBuilder builder = new StringBuilder();
        builder.append("input value is " + value + ".");
        builder.append("state1 has ");
        for (String str : state1.get()) {
            builder.append(str + ",");
        }
        builder.append("state2 has ");
        for (Integer integer : state2.get()) {
            builder.append(integer.toString() + ",");
        }
        System.out.println(builder.toString());
    }

    @Override
    public void snapshotState(FunctionSnapshotContext context) throws Exception {

    }

    @Override
    public void initializeState(FunctionInitializationContext context) throws Exception {
        ListStateDescriptor<String> descriptor1 =
                new ListStateDescriptor<>(
                        "state1",
                        TypeInformation.of(new TypeHint<String>() {
                        }));
        ListStateDescriptor<Integer> descriptor2 =
                new ListStateDescriptor<>(
                        "state2",
                        TypeInformation.of(new TypeHint<Integer>() {
                        }));
        state1 = context.getOperatorStateStore().getListState(descriptor1);
        state2 = context.getOperatorStateStore().getListState(descriptor2);
    }
}

}

输出:

input value is 4.state1 has state2 has 4,
input value is 2.state1 has 2,state2 has 4,
input value is 3.state1 has state2 has 3,
input value is 1.state1 has 1,state2 has 3,
input value is 5.state1 has state2 has 5,

我希望最后一段输出是

input value is XX .state1 has 1,2 state2 has 3,4,5

但实际上输出看起来像输入项是分区的。 4和2在一个分区,3和1在另一个分区。我想访问processElement1processElement2中state1和state2中存储的所有数据。

【问题讨论】:

    标签: apache-flink flink-streaming


    【解决方案1】:

    你应该修改你工作的开始,像这样:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setParallelism(1);
    ...
    

    这将导致整个作业以 1 的并行度运行。你确实有

    resultStream.print().setParallelism(1);
    

    这具有将打印接收器的并行度设置为 1 的效果,但作业的其余部分以默认并行度运行,该并行度明显大于 1。

    或者,您可以使用相同的常量键对两个流进行键控,然后使用键控状态。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2022-11-30
      • 2023-03-07
      • 1970-01-01
      • 2017-02-22
      • 2017-12-05
      • 2020-08-02
      相关资源
      最近更新 更多