【发布时间】:2019-06-28 02:10:48
【问题描述】:
我正在尝试弄清楚如何使用 Lagom 来使用来自通过 Kafka 通信的外部系统的数据。
我遇到了这个section of Lagom documentation,它描述了 Lagom 服务如何通过订阅其主题与另一个 Lagom 服务进行通信。
helloService
.greetingsTopic()
.subscribe // <-- you get back a Subscriber instance
.atLeastOnce(
Flow.fromFunction(doSomethingWithTheMessage)
)
但是,当您想要订阅包含由某个随机外部系统产生的事件的 Kafka 主题时,什么是合适的配置?
此功能是否需要某种适配器? 为了澄清,我现在有这个:
object Aggregator {
val TOPIC_NAME = "my-aggregation"
}
trait Aggregator extends Service {
def aggregate(correlationId: String): ServiceCall[Data, Done]
def aggregationTopic(): Topic[DataRecorded]
override final def descriptor: Descriptor = {
import Service._
named("aggregator")
.withCalls(
pathCall("/api/aggregate/:correlationId", aggregate _)
)
.withTopics(
topic(Aggregator.TOPIC_NAME, aggregationTopic())
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
)
)
.withAutoAcl(true)
}
}
我可以通过简单的 POST 请求调用它。
但是,我希望通过使用来自某些(外部)Kafka 主题的 Data 消息来调用它。
我想知道是否有一种方法可以以类似于此模型的方式配置描述符:
override final def descriptor: Descriptor = {
...
kafkaTopic("my-input-topic")
.subscribe(serviceCall(aggregate _)
.withAtMostOnceDelivery
}
我遇到过这个discussion on Google Groups,但在 OPs 问题中,我没有看到他实际上对来自some-topic 的EventMessages 做任何事情,除了将它们路由到他的服务定义的主题。
编辑 #1:进度更新
查看文档,我决定尝试以下方法。
我又添加了 2 个模块,aggregator-kafka-proxy-api 和 aggregator-kafka-proxy-impl。
在新的 api 模块中,我定义了一个新服务,没有方法,但有一个主题代表我的 Kafka 主题:
object DataKafkaPublisher {
val TOPIC_NAME = "data-in"
}
trait DataKafkaPublisher extends Service {
def dataInTopic: Topic[DataPublished]
override final def descriptor: Descriptor = {
import Service._
import DataKafkaPublisher._
named("data-kafka-in")
.withTopics(
topic(TOPIC_NAME, dataInTopic)
.addProperty(
KafkaProperties.partitionKeyStrategy,
PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
)
)
.withAutoAcl(true)
}
}
在impl模块中,我只是简单的做了标准实现
class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
override def dataInTopic: Topic[api.DataPublished] =
TopicProducer.singleStreamWithOffset {
fromOffset =>
persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
.map(ev => (convertEvent(ev), ev.offset))
}
private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
evt.event match {
case DataPublished(data) => api.DataPublished(data)
}
}
}
现在,为了实际使用这些事件,在我的 aggregator-impl 模块中,我添加了一个“订阅者”服务,它接收这些事件,并在实体上调用适当的命令。
class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {
kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
Flow[DataPublished].mapAsync(1) { sd =>
sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
}
)
private def sessionRef(correlationId: String) =
persistentEntityRegistry.refFor[Entity](correlationId)
}
这实际上允许我在 Kafka 主题“data-in”上发布消息,然后将其代理并转换为 RecordData 命令,然后再发送给实体进行消费。
但是,这对我来说似乎有些 hacky。我通过 Lagom internals 与 Kafka 耦合。我无法轻松交换数据源。例如,如果我愿意,我将如何使用来自 RabbitMQ 的外部消息? 如果我尝试从另一个 Kafka(与 Lagom 使用的不同)消费怎么办?
编辑 #2:更多文档
我发现了一些关于 Lagom 文档的文章,特别是:
Consuming Topics from 3rd parties
您可能希望 Lagom 服务使用服务产生的数据 未在 Lagom 中实施。在这种情况下,如服务中所述 客户端部分,您可以在 你的 Lagom 项目。该模块将包含一个服务描述符 声明您将使用的主题。一旦你有了你的 第三方服务接口及相关类实现,你 应该添加 third-party-service-api 作为对您的依赖项 花式服务实施。最后,您可以从所描述的主题中消费 在第三方服务中,如订阅主题中所述 部分。
【问题讨论】:
标签: java scala apache-kafka akka-stream lagom