【发布时间】:2019-01-02 15:57:30
【问题描述】:
Flink 1.6.1 版
在下面的例子中,我想连接两个无键的流。但似乎这两个流无法正确共享状态。我不知道实现它的正确方法是什么。
代码:
public class TransactionJob {
public static void main(String[] args) throws Exception {
final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<String> stream1 = env.fromElements("1", "2");
DataStream<Integer> stream2 = env.fromElements(3, 4, 5);
ConnectedStreams<String, Integer> connectedStreams = stream1.connect(stream2);
DataStream<String> resultStream = connectedStreams.process(new StringIntegerCoProcessFunction());
resultStream.print().setParallelism(1);
env.execute();
}
private static class StringIntegerCoProcessFunction extends CoProcessFunction<String, Integer, String> implements CheckpointedFunction {
private transient ListState<String> state1;
private transient ListState<Integer> state2;
@Override
public void processElement1(String value, Context ctx, Collector<String> out) throws Exception {
state1.add(value);
print(value);
}
@Override
public void processElement2(Integer value, Context ctx, Collector<String> out) throws Exception {
state2.add(value);
print(value.toString());
}
private void print(String value) throws Exception {
StringBuilder builder = new StringBuilder();
builder.append("input value is " + value + ".");
builder.append("state1 has ");
for (String str : state1.get()) {
builder.append(str + ",");
}
builder.append("state2 has ");
for (Integer integer : state2.get()) {
builder.append(integer.toString() + ",");
}
System.out.println(builder.toString());
}
@Override
public void snapshotState(FunctionSnapshotContext context) throws Exception {
}
@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
ListStateDescriptor<String> descriptor1 =
new ListStateDescriptor<>(
"state1",
TypeInformation.of(new TypeHint<String>() {
}));
ListStateDescriptor<Integer> descriptor2 =
new ListStateDescriptor<>(
"state2",
TypeInformation.of(new TypeHint<Integer>() {
}));
state1 = context.getOperatorStateStore().getListState(descriptor1);
state2 = context.getOperatorStateStore().getListState(descriptor2);
}
}
}
输出:
input value is 4.state1 has state2 has 4,
input value is 2.state1 has 2,state2 has 4,
input value is 3.state1 has state2 has 3,
input value is 1.state1 has 1,state2 has 3,
input value is 5.state1 has state2 has 5,
我希望最后一段输出是
input value is XX .state1 has 1,2 state2 has 3,4,5
但实际上输出看起来像输入项是分区的。 4和2在一个分区,3和1在另一个分区。我想访问processElement1和processElement2中state1和state2中存储的所有数据。
【问题讨论】:
标签: apache-flink flink-streaming