【问题标题】:Emitting the response from the elastic4s subscriber back to the akka-stream将来自 elastic4s 订阅者的响应发送回 akka-stream
【发布时间】:2016-11-28 15:47:25
【问题描述】:

我目前正在构建一个将数据从 mongoDb 流式传输到 elasticsearch 的解决方案。 我的目标是跟踪所有成功传输到 elasticsearch 的项目。 我正在使用 akka-streams 和 elastic4s。目前流进es的样子是这样的

val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
    batchSize = batchSize,
    completionFn = { () => elasticFinishPromise.success(()); ()},
    errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
    concurrentRequests = concurrentRequests
    )
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)

从我的来源来看是这样的:

val a: [NotUsed] = mongoSrc
  .via(some operations..)
  .to(esSink)
  .run()

现在一切正常,现在我正在使用第二个接收器记录例如项目计数。但我宁愿记录真正传输到elasticsearch的项目。 elastic4s 订阅者提供listener: ResponseListeneronAck(): UnitonFailure(): Unit,我希望像这样将这些信息返回到流中

val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item

mongoSrc ~> doStuff ~> esSink ~> logSink

我将如何实现它?我是否需要一个自定义阶段来缓冲onAckonFailure 的元素?还是有更简单的方法?

感谢您的帮助。

【问题讨论】:

  • Akka Streams 的响应式 kafka 驱动程序做了这样的事情,也许看看这些资源会很有启发性:github.com/akka/reactive-kafka(尤其是 ProducerStage)
  • 谢谢,看起来很有帮助!明天试试这个
  • 你能创建另一个通过 onAck 方法填充的流吗?
  • @monkjack 是的,这正是我现在正在做的事情,我用 source.queue 创建了另一个 Stream 并在 onAck 上推送到它。

标签: scala elasticsearch akka akka-stream elastic4s


【解决方案1】:

您可以通过利用 Flow.fromSinkAndSource 来“流动”您的 Subscriber[T] 接收器。查看来自the docs 的“复合流(来自 Sink 和 Source)”插图。

在这种情况下,您将附加您的自定义 actorPublisher 作为源并从 onAck() 向其发送消息。

既然您要求更简单的方法:

val doStuff = Flow[DocToIndex]
                .grouped(batchSize)
                .mapAsync(concurrentRequests)(bulkopFuture)

简而言之,除了所有有用的抽象,elastic4s 订阅者只是a bulk update request

【讨论】:

  • 请注意,现在您不需要esSink 连接点。你的图表看起来像mongoSrc ~> doStuff ~> logSink
  • 感谢您的回答。我认为您与 actorPublisher 的方法会很好。我想它必须是一个预先实现的actorPublisher,因为我遇到了source.queue和复合流的问题,当图形已经实现时,我首先得到source.queue,而将其注入已经太晚了elastic4s 订阅者。现在我只是创建另一个 Stream 并让 onAck 方法推送到 source.queue 并且一切正常,而且我仍然对解决方案不是 100% 满意,并且更喜欢复合流。
猜你喜欢
  • 2018-08-02
  • 1970-01-01
  • 2016-06-19
  • 1970-01-01
  • 1970-01-01
  • 2012-09-19
  • 2015-04-18
  • 1970-01-01
  • 2018-05-08
相关资源
最近更新 更多