【问题标题】:Stateful ParDo not working on Dataflow Runner有状态 ParDo 不适用于 Dataflow Runner
【发布时间】:2017-03-02 19:47:01
【问题描述】:

基于 Javadocs 和 https://beam.apache.org/blog/2017/02/13/stateful-processing.html 上的博客文章,我尝试使用一个简单的重复数据删除示例,该示例使用 2.0.0-beta-2 SDK 从 GCS 读取文件(包含每个带有 user_id 字段的 json 列表) 然后通过管道运行它,如下所述。

输入数据包含大约 146K 事件,其中只有 50 个事件是唯一的。整个输入大约是 50MB,应该可以在比 2 分钟固定窗口短得多的时间内处理。我只是在那里放置了一个窗口,以确保在不使用 GlobalWindow 的情况下保持 per-key-per-window 语义。我通过 3 个并行阶段运行窗口数据以比较结果,每个阶段都在下面解释。

  1. 只需将内容复制到 GCS 上的新文件中 - 这样可以确保所有事件都按预期处理,并且我验证了内容与输入完全相同
  2. 在 user_id 上组合.PerKey 并仅从 Iterable 中选择第一个元素 - 这基本上应该对数据进行重复数据删除,并且它可以按预期工作。生成的文件包含原始事件列表中唯一项目的确切数量 - 50 个元素
  3. 有状态 ParDo 检查密钥是否已被看到并仅在未看到时发出输出。理想情况下,由此产生的结果应该与 [2] 中的重复数据匹配,但我所看到的只是 3 个唯一事件。在我做的几次运行中,这 3 个唯一事件总是指向相同的 3 个 user_id。

有趣的是,当我从 DataflowRunner 切换到在本地运行整个过程的 DirectRunner 时,我看到 [3] 的输出与 [2] 匹配,只有 50 个唯一元素,如预期的那样。因此,我怀疑 Stateful ParDo 的 DataflowRunner 是否存在任何问题。

public class StatefulParDoSample {
    private static Logger logger = LoggerFactory.getLogger(StatefulParDoSample.class.getName());

    static class StatefulDoFn extends DoFn<KV<String, String>, String> {
        final Aggregator<Long, Long> processedElements = createAggregator("processed", Sum.ofLongs());
        final Aggregator<Long, Long> skippedElements = createAggregator("skipped", Sum.ofLongs());

        @StateId("keyTracker")
        private final StateSpec<Object, ValueState<Integer>> keyTrackerSpec =
                StateSpecs.value(VarIntCoder.of());

        @ProcessElement
        public void processElement(
                ProcessContext context,
                @StateId("keyTracker") ValueState<Integer> keyTracker) {
            processedElements.addValue(1l);
            final String userId = context.element().getKey();

            int wasSeen = firstNonNull(keyTracker.read(), 0);
            if (wasSeen == 0) {
                keyTracker.write( 1);
                context.output(context.element().getValue());
            } else {
                keyTracker.write(wasSeen + 1);
                skippedElements.addValue(1l);
            }
        }
    }

    public static void main(String[] args) {
        DataflowPipelineOptions pipelineOptions = PipelineOptionsFactory.create().as(DataflowPipelineOptions.class);
        pipelineOptions.setRunner(DataflowRunner.class);
        pipelineOptions.setProject("project-name");
        pipelineOptions.setStagingLocation(GCS_STAGING_LOCATION);
        pipelineOptions.setStreaming(false);
        pipelineOptions.setAppName("deduper");
        Pipeline p = Pipeline.create(pipelineOptions);

        final ObjectMapper mapper = new ObjectMapper();
        PCollection<KV<String, String>> keyedEvents =
        p
            .apply(TextIO.Read.from(GCS_SAMPLE_INPUT_FILE_PATH))
            .apply(WithKeys.of(new SerializableFunction<String, String>() {
                @Override
                public String apply(String input) {
                    try {
                        Map<String, Object> eventJson =
                                mapper.readValue(input, Map.class);
                        return (String) eventJson.get("user_id");
                    } catch (Exception e) {

                    }

                    return "";
                }
            }))
            .apply(
                Window.into(
                    FixedWindows.of(Duration.standardMinutes(2))
                )
            );

        keyedEvents
            .apply(ParDo.of(new StatefulDoFn()))
            .apply(TextIO.Write.to(GCS_SAMPLE_OUTPUT_FILE_PATH).withNumShards(1));

        keyedEvents
            .apply(Values.create())
            .apply(TextIO.Write.to(GCS_SAMPLE_COPY_FILE_PATH).withNumShards(1));

        keyedEvents
            .apply(Combine.perKey(new SerializableFunction<Iterable<String>, String>() {
                @Override
                public String apply(Iterable<String> input) {
                    return !input.iterator().hasNext() ? "empty" : input.iterator().next();
                }
            }))
            .apply(Values.create())
            .apply(TextIO.Write.to(GCS_SAMPLE_COMBINE_FILE_PATH).withNumShards(1));

        PipelineResult result = p.run();
        result.waitUntilFinish();
    }
}

【问题讨论】:

  • 我会看看这个。
  • 顺便说一句,我已经更新了我的答案 - 这在 HEAD 和 0.6.0 版本中以批处理模式工作。

标签: google-cloud-dataflow dataflow apache-beam


【解决方案1】:

这是批处理模式下 Dataflow 服务中的一个错误,已在即将发布的 0.6.0 Beam 版本(或 HEAD,如果您跟踪前沿)中修复。

感谢您引起我的注意!供参考,或者如果出现任何其他问题,BEAM-1611 对此进行了跟踪。

【讨论】:

  • 那么,您是说这在流媒体模式下可以正常工作吗?
  • 我问是因为我最终还是会在流模式下运行管道 - 所以如果是这样的话,现在这可能不是一个障碍
  • 是的,它应该在流模式下工作。我专门在流模式下运行了您的示例代码作为测试,并取得了成功。
  • 谢谢 - 我也能够验证它在流媒体模式下工作正常
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2017-07-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2015-01-02
  • 2021-08-10
相关资源
最近更新 更多