【问题标题】:Akka HTTP using a response unmarshallerAkka HTTP 使用响应解组器
【发布时间】:2017-05-10 14:58:14
【问题描述】:

我正在使用 Akka 流和 Akka HTTP 构建数据管道。用例非常简单,接收来自用户的 Web 请求,该请求将做两件事。首先通过调用 3rd 方 API 创建会话,然后将此会话提交到某个持久存储,当我们收到会话时,它将代理原始用户请求但添加会话数据。

我已经开始研究数据管道的第一个分支,即会话处理,但我想知道是否有更优雅的方式将来自第 3 方 API 的 HTTP 响应解组到我目前正在使用的 POJO Jackson.unmarshaller.unmarshal 返回一个 CompletionStage<T>,然后我必须将其解包到 T。它不是很优雅,我猜 Akka HTTP 有更聪明的方法来做到这一点。

这是我现在的代码

private final Source<Session, NotUsed> session =
        Source.fromCompletionStage(
                getHttp().singleRequest(getSessionRequest(), getMat())).
                map(r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
                map(f -> f.toCompletableFuture().get()).
                alsoTo(storeSession);

【问题讨论】:

    标签: java akka-http akka-stream


    【解决方案1】:

    Akka Streams 为您提供mapAsync,这是一个以可配置、非阻塞的方式在管道中处理异步计算的阶段。

    你的代码应该是这样的

    Source.fromCompletionStage(
                    getHttp().singleRequest(getSessionRequest(), getMat())).
                    mapAsync(4, r -> Jackson.unmarshaller(Session.class).unmarshal(r.entity(), getMat())).
                    alsoTo(storeSession);
    

    注意:

    1. 在这种情况下不仅仅是优雅的问题,因为CompletableFuture.get 是一个阻塞调用。这可能会导致您的管道出现严重问题。
    2. mapAsync(并行)所需的 Int 参数允许微调可以同时运行多少并行异步操作。

    mapAsync 中的更多信息可以在docs 中找到。

    【讨论】:

    • 这非常简洁,我希望 Akka HTTP 使用更高级别的东西,比如Flow&lt;HttpResponse, T, ?>`,我可以将其插入我的管道中,这样会更好.
    • 在阻塞呼叫的问题上,这种情况真的会那么糟糕吗?我的印象是实际的管道无论如何都会在参与者线程上执行,因此它不会阻塞正在运行的线程。
    • 问题是,默认情况下,没有“演员线程”这样的东西。在 Actor 内部进行阻塞将使 Actor 系统共享的整个线程基础设施处于饥饿状态。只要这是不可避免的(例如,在处理阻塞 IO 时),您可以设置您的管道阶段以使用单独的线程池。在这种情况下,没有必要这样做。更多信息在这里doc.akka.io/docs/akka/current/general/…
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-10
    • 2015-12-29
    • 1970-01-01
    • 2016-01-21
    • 2016-06-03
    • 1970-01-01
    相关资源
    最近更新 更多