【问题标题】:How to do Async Http Call with Apache Beam (Java)?如何使用 Apache Beam (Java) 进行异步 Http 调用?
【发布时间】:2018-04-17 18:20:15
【问题描述】:

输入PCollection是http请求,是一个有界数据集。我想在 ParDo 中进行异步 http 调用(Java),解析响应并将结果放入输出 PCollection。我的代码如下。得到如下异常。

我不知道原因。需要一个指导......

java.util.concurrent.CompletionException: java.lang.IllegalStateException: Can't add element ValueInGlobalWindow{value=streaming.mapserver.backfill.EnrichedPoint@2c59e, pane=PaneInfo.NO_FIRING} to committed bundle in PCollection Call Map Server With Rate Throttle/ParMultiDo(ProcessRequests).output [PCollection]

代码

public class ProcessRequestsFn extends DoFn<PreparedRequest,EnrichedPoint> {
    private static AsyncHttpClient _HttpClientAsync;
    private static ExecutorService _ExecutorService;

static{

    AsyncHttpClientConfig cg = config()
            .setKeepAlive(true)
            .setDisableHttpsEndpointIdentificationAlgorithm(true)
            .setUseInsecureTrustManager(true)
            .addRequestFilter(new RateLimitedThrottleRequestFilter(100,1000))
            .build();

    _HttpClientAsync = asyncHttpClient(cg);

    _ExecutorService = Executors.newCachedThreadPool();

}


@DoFn.ProcessElement
public void processElement(ProcessContext c) {

    PreparedRequest request = c.element();

    if(request == null)
        return;

    _HttpClientAsync.prepareGet((request.getRequest()))
            .execute()
            .toCompletableFuture()
            .thenApply(response -> { if(response.getStatusCode() == HttpStatusCodes.STATUS_CODE_OK){
                                                return response.getResponseBody();
                                            } return null; } )
            .thenApply(responseBody->
                    {
                        List<EnrichedPoint> resList = new ArrayList<>();
                        /*some process logic here*/
                        System.out.printf("%d enriched points back\n", result.length());
                        }
                        return resList;

                    })
            .thenAccept(resList -> {
                for (EnrichedPoint enrichedPoint : resList) {
                    c.output(enrichedPoint);
                }
            })
            .exceptionally(ex->{
                System.out.println(ex);
                return null;
            });

  }
}

【问题讨论】:

  • 不知何故,我的直觉告诉我,在 Beam 中做异步很难,如果不是不可能的话。因为它不知道电话何时会回来。但是,特别是在有界数据的批处理中。有可能吗?
  • 最终目标是尽可能加快 http 调用速度。

标签: apache-beam asynchttpclient


【解决方案1】:

Scio 库实现了处理异步操作的DoFnBaseAsyncDoFn 可能会为您提供所需的处理。由于您正在处理CompletableFuture,因此请查看JavaAsyncDoFn

请注意,您不一定需要使用 Scio 库,但您可以采用 BaseAsyncDoFn 的主要思想,因为它独立于 Scio 库的其余部分。

【讨论】:

    【解决方案2】:

    您遇到的问题是您在processElementfinishBundle 调用的上下文之外输出。

    您需要将所有输出收集到内存中,并在未来的processElement 调用期间急切地输出它们,最后在finishBundle 内通过阻塞直到所有调用完成。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2012-01-28
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-11-13
      • 2018-05-21
      相关资源
      最近更新 更多