【发布时间】:2016-11-28 15:47:25
【问题描述】:
我目前正在构建一个将数据从 mongoDb 流式传输到 elasticsearch 的解决方案。 我的目标是跟踪所有成功传输到 elasticsearch 的项目。 我正在使用 akka-streams 和 elastic4s。目前流进es的样子是这样的
val esSubscriber: BulkIndexingSubscriber[CustomT] = esClient.subscriber[CustomT](
batchSize = batchSize,
completionFn = { () => elasticFinishPromise.success(()); ()},
errorFn = { (t: Throwable) => elasticFinishPromise.failure(t); ()},
concurrentRequests = concurrentRequests
)
val esSink: Sink[CustomT, NotUsed] = Sink.fromSubscriber(esSubscriber)
从我的来源来看是这样的:
val a: [NotUsed] = mongoSrc
.via(some operations..)
.to(esSink)
.run()
现在一切正常,现在我正在使用第二个接收器记录例如项目计数。但我宁愿记录真正传输到elasticsearch的项目。
elastic4s 订阅者提供listener: ResponseListener 和onAck(): Unit 和onFailure(): Unit,我希望像这样将这些信息返回到流中
val mongoSrc: [Source..]
val doStuff: [Flow..]
val esSink: [Flow..] //now as flow instead of sink
val logSink: [Sink[Int...]] //now gets for example a 1 for each successful transported item
mongoSrc ~> doStuff ~> esSink ~> logSink
我将如何实现它?我是否需要一个自定义阶段来缓冲onAck 和onFailure 的元素?还是有更简单的方法?
感谢您的帮助。
【问题讨论】:
-
Akka Streams 的响应式 kafka 驱动程序做了这样的事情,也许看看这些资源会很有启发性:github.com/akka/reactive-kafka(尤其是 ProducerStage)
-
谢谢,看起来很有帮助!明天试试这个
-
你能创建另一个通过 onAck 方法填充的流吗?
-
@monkjack 是的,这正是我现在正在做的事情,我用 source.queue 创建了另一个 Stream 并在 onAck 上推送到它。
标签: scala elasticsearch akka akka-stream elastic4s