【问题标题】:How to throttle request in a graph using akka stream?如何使用 akka 流在图中限制请求?
【发布时间】:2019-01-09 09:23:06
【问题描述】:

背景

我有一个项目,我们在 Java 中使用 akka-streams。

在这个项目中,我有一个字符串流和一个对它们执行一些操作的图形。

目标

在我的图表中,我想将该流广播给 2 个工作人员。将所有字符 'a' 替换为 'A' 并在实时接收数据时发送数据。

另一个将接收数据,每 3 个字符串,它将连接这 3 个字符串并将它们映射到数字。

如下所示:

显然Sink 2 接收信息的速度不如Sink 1。但这是预期的行为。这里有趣的部分是worker 2。

问题

做工人 1 很容易,并不难。这里的问题是工人 2。我知道 akka 有可以保存最多 X 条消息的缓冲区,但是看起来我不得不选择现有的 Overflow strategies 之一,这通常会导致选择我想要删除的消息或如果我想保持直播或不直播。

我想要的只是,当我在 worke2 中的缓冲区达到缓冲区的最大大小时,对其拥有的所有消息执行 concat 和 map 操作,然后将它们一起发送(之后重置缓冲区)。

但即使在阅读了 akka 的 stream-rate 文档后,我也找不到这样做的方法,至少使用 Java。

研究

我还检查了一个类似的 SO 问题,Selective request-throttling using akka-http stream 但是已经一年多了,没有人回复。

问题

使用图形 DSL,我将如何创建路径:

Source -> bcast -> worker2 -> Sink 2

??

【问题讨论】:

    标签: java akka buffer akka-stream


    【解决方案1】:

    在您的 bcast 之后应用 groupedWithin 运算符,其持续时间不受限制,元素数量设置为 3。 https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/groupedWithin.html

    您也可以自己做,添加一个将元素存储在 List 中的阶段,并在每次达到 3 个元素时发出列表。

    import akka.stream.Attributes;
    import akka.stream.FlowShape;
    import akka.stream.Inlet;
    import akka.stream.Outlet;
    import akka.stream.stage.AbstractInHandler;
    import akka.stream.stage.GraphStage;
    import akka.stream.stage.GraphStageLogic;
    import com.google.common.collect.ImmutableList;
    import java.util.ArrayList;
    import java.util.List;
    
    public class RecordGrouper<T> extends GraphStage<FlowShape<T, List<T>>> {
    
      private final Inlet<T> inlet = Inlet.create("in");
      private final Outlet<List<T>> outlet = Outlet.create("out");
      private final FlowShape<T, List<T>> shape = new FlowShape<>(inlet, outlet);
    
      @Override
      public GraphStageLogic createLogic(Attributes inheritedAttributes) {
        return new GraphStageLogic(shape) {
          List<T> batch = new ArrayList<>(3);
    
          {
            setHandler(
                inlet,
                new AbstractInHandler() {
                  @Override
                  public void onPush() {
                    T record = grab(inlet);
                    batch.add(record);
                    if (batch.size() == 3) {
                      emit(outlet, ImmutableList.copyOf(batch));
                      batch.clear();
                    }
                    pull(inlet);
                  }
                });
          }
    
          @Override
          public void preStart() {
            pull(inlet);
          }
        };
      }
    
      @Override
      public FlowShape<T, List<T>> shape() {
        return shape;
      }
    }
    

    作为一个侧节点,我认为buffer 运算符不会起作用,因为它仅在有背压时才会启动。所以如果一切都很安静,元素仍然会一个一个地发射,而不是 3 个 3 个。https://doc.akka.io/docs/akka/2.5/stream/operators/Source-or-Flow/buffer.html

    【讨论】:

    • 你能举一个List方法的例子吗?我很好奇!
    • @Flame_Phoenix,我已经添加了一个示例,但我还没有测试过。
    猜你喜欢
    • 2014-09-30
    • 1970-01-01
    • 1970-01-01
    • 2015-12-13
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-28
    • 1970-01-01
    相关资源
    最近更新 更多