【发布时间】:2018-10-18 12:40:09
【问题描述】:
我有一个在 Google Dataflow 上运行的 Apache Beam 管道,其工作相当简单:
- 它从 Pub/Sub 读取单个 JSON 对象
- 解析它们
- 并通过 HTTP 将它们发送到某些 API
这个 API 要求我分批发送 75 个项目。所以我构建了一个 DoFn,它将事件累积在一个列表中,一旦我得到 75 个就通过这个 API 发布它们。结果太慢了,所以我而不是使用线程池在不同的线程中执行这些 HTTP 请求。
我现在的实现是这样的:
private class WriteFn : DoFn<TheEvent, Void>() {
@Transient var api: TheApi
@Transient var currentBatch: MutableList<TheEvent>
@Transient var executor: ExecutorService
@Setup
fun setup() {
api = buildApi()
executor = Executors.newCachedThreadPool()
}
@StartBundle
fun startBundle() {
currentBatch = mutableListOf()
}
@ProcessElement
fun processElement(processContext: ProcessContext) {
val record = processContext.element()
currentBatch.add(record)
if (currentBatch.size >= 75) {
flush()
}
}
private fun flush() {
val payloadTrack = currentBatch.toList()
executor.submit {
api.sendToApi(payloadTrack)
}
currentBatch.clear()
}
@FinishBundle
fun finishBundle() {
if (currentBatch.isNotEmpty()) {
flush()
}
}
@Teardown
fun teardown() {
executor.shutdown()
executor.awaitTermination(30, TimeUnit.SECONDS)
}
}
从数据进入 API 的意义上说,这似乎“很好”。但我不知道这是否是正确的方法,我感觉这很慢。
我认为它很慢的原因是在负载测试时(通过向 Pub/Sub 发送几百万个事件),管道将这些消息转发到 API(有响应时间不到 8 毫秒),而不是让我的笔记本电脑将它们输入 Pub/Sub。
我的实现有什么问题吗?这是我应该这样做的方式吗?
另外...我是否需要在我的 @FinishBundle 方法中等待所有请求完成(即通过获取执行程序返回的期货并等待它们)?
【问题讨论】:
标签: google-cloud-dataflow apache-beam