【问题标题】:Dataflow streaming job not scaleing past 1 worker数据流流作业未超过 1 个工作人员
【发布时间】:2017-09-08 12:20:53
【问题描述】:

我使用 Apache Beam SDK for Java 2.1.0 的流式数据流作业 (2017-09-08_03_55_43-9675407418829265662) 不会超过 1 个 Worker,即使 pubsub 队列不断增长(现在有 10 万条未传递的消息)——你知道为什么吗?

它目前与autoscalingAlgorithm=THROUGHPUT_BASEDmaxNumWorkers=10 一起运行。

【问题讨论】:

    标签: java google-cloud-dataflow apache-beam


    【解决方案1】:

    这里是数据流工程师。我在我们的后端查看了该作业,我可以看到它没有扩大,因为 CPU 利用率很低,这意味着其他东西正在限制管道的性能,例如外部节流。在这些情况下,升级几乎没有帮助。

    我发现有些捆绑包需要花费数小时才能处理。我建议调查您的管道逻辑,看看是否还有其他可以优化的部分。

    【讨论】:

    • 谢谢!管道中的一个步骤是向另一个执行繁重处理的服务发出 HTTP 请求。此服务是自动缩放的,可以处理比当前服务更高的负载,但是数据流接缝有上限。通过将数据流步骤转变为异步 http 调用,是否能够发送更多并行请求?
    • 您正在寻找的并行规模是多少?使用当前设置,您已经获得了大约 4 * max_num_workers 的并行度。 DoFn 可以发出异步调用,但更简单、更灵活的方法是添加“重新洗牌”[1] 步骤。这使您可以很容易地控制对您的服务的最大并行请求(例如,如果您使用 random.nextInt(100) 作为随机键,则管道将有多达 100 个并行请求)。 [1]:beam.apache.org/documentation/sdks/javadoc/2.0.0/index.html?org/…
    • 您链接到已弃用的内容 - 我应该使用自己的 GroupBy/FlatMap 实现重新实现它吗?
    • Reshuffle 转换被标记为已弃用,因为它是特定于数据流的提示而不是 Beam 模型概念。您可以自行重新实现相同的逻辑,但 Dataflow 在内部使用 ReshuffleTrigger 进行额外优化。 Reshuffle 转换将来可能会更改或被删除,但不会被类似的机制取代。
    • @Brodin,请继续使用 Reshuffle。尽管它被标记为已弃用,但它是在 Dataflow 中随机播放消息的推荐方式。如果并且当它被移除时,将会有一个简单的替换它。
    【解决方案2】:

    这就是我最终的结果:

    import org.apache.beam.sdk.transforms.*;
    import org.apache.beam.sdk.values.KV;
    import org.apache.beam.sdk.values.PCollection;
    
    import java.util.concurrent.ThreadLocalRandom;
    
    
    public class ReshuffleWithRandomKey<T>
            extends PTransform<PCollection<T>, PCollection<T>> {
    
        private final int size;
    
        public ReshuffleWithRandomKey(int size) {
            this.size = size;
        }
    
        @Override
        public PCollection<T> expand(PCollection<T> input) {
            return input
                    .apply("Random key", ParDo.of(new AssignRandomKeyFn<T>(size)))
                    .apply("Reshuffle", Reshuffle.<Integer, T>of())
                    .apply("Values", Values.<T>create());
        }
    
        private static class AssignRandomKeyFn<T> extends DoFn<T, KV<Integer, T>> {
    
            private final int size;
    
            AssignRandomKeyFn(int size) {
                this.size = size;
            }
    
            @ProcessElement
            public void process(ProcessContext c) {
                c.output(KV.of(ThreadLocalRandom.current().nextInt(0, size), c.element()));
            }
        }
    }
    

    @raghu-angadi 和 @scott-wegner 你怎么看?

    【讨论】:

    • 我觉得不错。小东西:您可以使用 MapElement.via() 来减少代码。将“大小”重命名为“碎片”可能会更好地传达意图。
    • 您是否在 ParDo 之前立即申请了 ReshuffleWithRandomKey 并阻止了网络调用?或者您是否在ReshuffleValues 之间嵌入了ParDo?我遇到了完全相同的问题,但不要遵循这如何增加并行度。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2020-03-20
    • 2019-02-08
    • 1970-01-01
    • 1970-01-01
    • 2017-04-20
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多