【问题标题】:Lagom service consuming input from KafkaLagom 服务使用来自 Kafka 的输入
【发布时间】:2019-06-28 02:10:48
【问题描述】:

我正在尝试弄清楚如何使用 Lagom 来使用来自通过 Kafka 通信的外部系统的数据。

我遇到了这个section of Lagom documentation,它描述了 Lagom 服务如何通过订阅其主题与另一个 Lagom 服务进行通信。

helloService
  .greetingsTopic()
  .subscribe // <-- you get back a Subscriber instance
  .atLeastOnce(
  Flow.fromFunction(doSomethingWithTheMessage)
)

但是,当您想要订阅包含由某个随机外部系统产生的事件的 Kafka 主题时,什么是合适的配置?

此功能是否需要某种适配器? 为了澄清,我现在有这个:

object Aggregator {
  val TOPIC_NAME = "my-aggregation"
}

trait Aggregator extends Service {
  def aggregate(correlationId: String): ServiceCall[Data, Done]

  def aggregationTopic(): Topic[DataRecorded]

  override final def descriptor: Descriptor = {
    import Service._

    named("aggregator")
      .withCalls(
        pathCall("/api/aggregate/:correlationId", aggregate _)
      )
      .withTopics(
        topic(Aggregator.TOPIC_NAME, aggregationTopic())
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[DataRecorded](_.sessionData.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

我可以通过简单的 POST 请求调用它。 但是,我希望通过使用来自某些(外部)Kafka 主题的 Data 消息来调用它。

我想知道是否有一种方法可以以类似于此模型的方式配置描述符:

override final def descriptor: Descriptor = {
  ...
  kafkaTopic("my-input-topic")
    .subscribe(serviceCall(aggregate _)
    .withAtMostOnceDelivery
}

我遇到过这个discussion on Google Groups,但在 OPs 问题中,我没有看到他实际上对来自some-topicEventMessages 做任何事情,除了将它们路由到他的服务定义的主题。

编辑 #1:进度更新

查看文档,我决定尝试以下方法。 我又添加了 2 个模块,aggregator-kafka-proxy-apiaggregator-kafka-proxy-impl

在新的 api 模块中,我定义了一个新服务,没有方法,但有一个主题代表我的 Kafka 主题:

object DataKafkaPublisher {
  val TOPIC_NAME = "data-in"
}

trait DataKafkaPublisher extends Service {
  def dataInTopic: Topic[DataPublished]

  override final def descriptor: Descriptor = {
    import Service._
    import DataKafkaPublisher._

    named("data-kafka-in")
      .withTopics(
        topic(TOPIC_NAME, dataInTopic)
          .addProperty(
            KafkaProperties.partitionKeyStrategy,
            PartitionKeyStrategy[SessionDataPublished](_.data.correlationId)
          )
      )
      .withAutoAcl(true)
  }
}

在impl模块中,我只是简单的做了标准实现

class DataKafkaPublisherImpl(persistentEntityRegistry: PersistentEntityRegistry) extends DataKafkaPublisher {
  override def dataInTopic: Topic[api.DataPublished] =
    TopicProducer.singleStreamWithOffset {
      fromOffset =>
        persistentEntityRegistry.eventStream(KafkaDataEvent.Tag, fromOffset)
          .map(ev => (convertEvent(ev), ev.offset))
    }

  private def convertEvent(evt: EventStreamElement[KafkaDataEvent]): api.DataPublished = {
    evt.event match {
      case DataPublished(data) => api.DataPublished(data)
    }
  }
}

现在,为了实际使用这些事件,在我的 aggregator-impl 模块中,我添加了一个“订阅者”服务,它接收这些事件,并在实体上调用适当的命令。

class DataKafkaSubscriber(persistentEntityRegistry: PersistentEntityRegistry, kafkaPublisher: DataKafkaPublisher) {

  kafkaPublisher.dataInTopic.subscribe.atLeastOnce(
    Flow[DataPublished].mapAsync(1) { sd =>
      sessionRef(sd.data.correlationId).ask(RecordData(sd.data))
    }
  )

  private def sessionRef(correlationId: String) =
    persistentEntityRegistry.refFor[Entity](correlationId)
}

这实际上允许我在 Kafka 主题“data-in”上发布消息,然后将其代理并转换为 RecordData 命令,然后再发送给实体进行消费。

但是,这对我来说似乎有些 hacky。我通过 Lagom internals 与 Kafka 耦合。我无法轻松交换数据源。例如,如果我愿意,我将如何使用来自 RabbitMQ 的外部消息? 如果我尝试从另一个 Kafka(与 Lagom 使用的不同)消费怎么办?

编辑 #2:更多文档

我发现了一些关于 Lagom 文档的文章,特别是:

Consuming Topics from 3rd parties

您可能希望 Lagom 服务使用服务产生的数据 未在 Lagom 中实施。在这种情况下,如服务中所述 客户端部分,您可以在 你的 Lagom 项目。该模块将包含一个服务描述符 声明您将使用的主题。一旦你有了你的 第三方服务接口及相关类实现,你 应该添加 third-party-service-api 作为对您的依赖项 花式服务实施。最后,您可以从所描述的主题中消费 在第三方服务中,如订阅主题中所述 部分。

【问题讨论】:

    标签: java scala apache-kafka akka-stream lagom


    【解决方案1】:

    我不使用lagom,所以这可能只是一个想法。但是由于akka-streamslagom 的一部分(至少我认为)——从这个解决方案中得到你需要的东西应该很容易。

    我使用了 akka-stream-kafka,效果非常好(我只做了一个原型)

    当你消费消息时,你会做一些事情:

         Consumer
          .committableSource(
              consumerSettings(..), // config of Kafka
              Subscriptions.topics("kafkaWsPathMsgTopic")) // Topic to subscribe
          .mapAsync(10) { msg =>
            business(msg.record) // do something
          }
    

    检查写得好documentation

    你在这里找到我的整个例子: PathMsgConsumer

    【讨论】:

    • 我知道 akka-streams 带有 Lagom,我想到我可能会在输入/输出边界上使用它,但我想知道是否有推荐的惯用方式.不过,我会试试你的方法!
    【解决方案2】:

    Alan Klikic 在 Lightbend 论坛 here 上提供了答案。

    第 1 部分:

    如果您只在业务服务中使用外部 Kafka 集群 那么你可以只使用 Lagom Broker API 来实现它。所以你需要 到:

    1. 使用仅包含主题定义的服务描述符创建 API(此 API 未实现)
    2. 在您的业务服务中根据您的部署配置 kafka_native(正如我在上一篇文章中提到的)
    3. 在您的业务服务中,从 #1 中创建的 API 注入服务并使用 Lagom Broker API 订阅者订阅它

    偏移提交,在 Lagom Broker API 中处理订阅者 开箱即用。

    第 2 部分:

    Kafka 和 AMQP 消费者实现需要持久化 akka 溪流。所以你需要处理断开连接。这些可以分两次完成 方式:

    1. 通过将持久性akka 流包装在一个actor 中来控制它。您在演员 preStart 和管道流完成时初始化流流 给阻止它的演员。如果流完成或失败演员 会停止。然后用重启策略将actor包裹在actor backoff中, 这将在完成或失败的情况下重新启动actor,并且 重新初始化流程
    2. akka 流延迟重启与退避阶段

    我个人使用#1,但尚未尝试#2。

    初始化#1 的退避演员或#2 的流程可以在您的 Lagom 组件特征(基本上在你做你的同一个地方 现在使用 Lagom Broker API 订阅)。

    配置consumer的时候一定要设置consumer group,确保 避免重复消费。您可以像 Lagom 一样使用服务 描述符中的名称作为消费者组名称。

    【讨论】:

      猜你喜欢
      • 2018-05-17
      • 1970-01-01
      • 1970-01-01
      • 2021-11-14
      • 2021-01-03
      • 1970-01-01
      • 1970-01-01
      • 2020-06-19
      • 1970-01-01
      相关资源
      最近更新 更多