【问题标题】:How to parallelize HTTP requests within an Apache Beam step?如何在 Apache Beam 步骤中并行化 HTTP 请求?
【发布时间】:2018-10-18 12:40:09
【问题描述】:

我有一个在 Google Dataflow 上运行的 Apache Beam 管道,其工作相当简单:

  • 它从 Pub/Sub 读取单个 JSON 对象
  • 解析它们
  • 并通过 HTTP 将它们发送到某些 API

这个 API 要求我分批发送 75 个项目。所以我构建了一个 DoFn,它将事件累积在一个列表中,一旦我得到 75 个就通过这个 API 发布它们。结果太慢了,所以我而不是使用线程池在不同的线程中执行这些 HTTP 请求。

我现在的实现是这样的:

private class WriteFn : DoFn<TheEvent, Void>() {
  @Transient var api: TheApi

  @Transient var currentBatch: MutableList<TheEvent>

  @Transient var executor: ExecutorService

  @Setup
  fun setup() {
    api = buildApi()
    executor = Executors.newCachedThreadPool()
  }

  @StartBundle
  fun startBundle() {
    currentBatch = mutableListOf()
  }

  @ProcessElement
  fun processElement(processContext: ProcessContext) {
    val record = processContext.element()

    currentBatch.add(record)

    if (currentBatch.size >= 75) {
      flush()
    }
  }

  private fun flush() {
    val payloadTrack = currentBatch.toList()
    executor.submit {
      api.sendToApi(payloadTrack)
    }
    currentBatch.clear()
  }

  @FinishBundle
  fun finishBundle() {
    if (currentBatch.isNotEmpty()) {
      flush()
    }
  }

  @Teardown
  fun teardown() {
    executor.shutdown()
    executor.awaitTermination(30, TimeUnit.SECONDS)
  }
}

从数据进入 API 的意义上说,这似乎“很好”。但我不知道这是否是正确的方法,我感觉这很慢。

我认为它很慢的原因是在负载测试时(通过向 Pub/Sub 发送几百万个事件),管道将这些消息转发到 API(有响应时间不到 8 毫秒),而不是让我的笔记本电脑将它们输入 Pub/Sub。

我的实现有什么问题吗?这是我应该这样做的方式吗?

另外...我是否需要在我的 @FinishBundle 方法中等待所有请求完成(即通过获取执行程序返回的期货并等待它们)?

【问题讨论】:

    标签: google-cloud-dataflow apache-beam


    【解决方案1】:

    这里有两个相互关联的问题:

    1. 你做对了吗/你需要改变什么吗?
    2. 您需要在@FinishBundle 等待吗?

    第二个答案:是的。但实际上你需要更彻底地冲洗,这一点将变得清晰。

    一旦您的@FinishBundle 方法成功,Beam runner 将假定捆绑已成功完成。但是您的 @FinishBundle 仅发送请求 - 它并不能确保它们已成功。因此,如果请求随后失败,您可能会以这种方式丢失数据。您的@FinishBundle 方法实际上应该被阻塞并等待来自TheApi 的成功确认。顺便说一句,以上所有内容都应该是幂等的,因为在完成捆绑后,地震可能会发生并导致重试;-)

    所以回答第一个问题:你应该改变什么吗?就如上。只要您确定在提交捆绑包之前提交了结果,以这种方式批处理请求的做法就可以工作。

    您可能会发现这样做会导致您的管道变慢,因为@FinishBundle@Setup 更频繁地发生。要跨包批量处理请求,您需要使用状态和计时器的较低级别功能。我在https://beam.apache.org/blog/2017/08/28/timely-processing.html 写了一个人为的用例版本。我会对它对您的工作方式非常感兴趣。

    当您的管道中存在持久的随机播放时,您所期望的极低延迟(在低毫秒范围内)可能只是不可用。

    【讨论】:

    • 感谢您的回复。所以是的,我的管道现在阻塞在@FinishBundle 中(我保存了.submit{} 返回的所有期货,然后循环遍历它们并阻止它们.get())。到目前为止,一切都很好。我从 UI 中不明白的一件事是我的管道的样子:i.imgur.com/SpSrWQt.png“6 天 3 小时”的真正含义是什么?我看到数据正常传送到另一端。我真的不明白 Dataflow 对“Wall time”的定义。
    • 我会试试你帖子中的方法。但我不确定我将如何在那个中进行并行处理。我需要访问的外部 API 只能处理 75 个元素的批次,但我每个包有几千个,所以我想用少数并行请求(每个包含一批元素)来访问 API。问题是,在您的帖子中,您假设调用外部服务的结果将被输入到下一步(使用.output()),这并不适合我的情况,因为对我来说这是管道中的最后一步。
    • 所以我正在运行这个管道的两个版本。一个实现类似于我的问题中的那个,除了我实际上阻止了@FinishBundle。另一个使用您在帖子中描述的状态和计时器。带有状态/计时器的通常慢 10 倍,我认为这主要是因为所有请求都是按顺序完成的。
    猜你喜欢
    • 1970-01-01
    • 2015-10-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2020-01-15
    • 2021-04-27
    • 2018-12-05
    • 1970-01-01
    相关资源
    最近更新 更多