【问题标题】:Unable to subscribe/unsubscribe on kafka topic by command received by REST request无法通过 REST 请求收到的命令订阅/取消订阅 kafka 主题
【发布时间】:2021-09-17 17:53:02
【问题描述】:

我想编写一个完全基于 ZIO 堆栈设计的应用程序。 我是这个框架的新手,所以也许解决方案是微不足道的,我误解了一些重要的东西。 并面临以下问题。 我需要使用 REST 收到的命令取消订阅 kafka 主题。 我还需要通过 REST 订阅主题。 我使用 zio-kafka 编写了如下代码来描述订阅主题并将事件打印到控制台的效果:

private val consumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("MyConsumerGroup")
    .withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))

private val managedConsumer = Consumer.make(consumerSettings)

val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] = ZLayer.fromManaged(managedConsumer)

def startStream: ZIO[Console with Any with Has[Consumer] with Clock, Throwable, Unit] =
    Consumer.subscribeAnd(Subscription.topics("myTopic"))
      .plainStream(Serde.string, Serde.string)
      .tap(cr => zio.console.putStrLn(cr.value))
      .map(_.offset)
      .aggregateAsync(Consumer.offsetBatches)
      .run(ZSink.foreach(_.commit))

然后我使用 zhttp 描述了 REST 端点:

  private val app = HttpApp.fromEffectFunction{
    case Method.POST -> Root / "stop" => for {
      _ <- ZIO.serviceWith[Consumer](_.unsubscribe)
      _ <- zio.console.putStrLn("stopped")
    } yield Response.ok
    case Method.POST -> Root / "start" => for {
      _ <- startStream.fork
      _ <- zio.console.putStrLn("started")
    } yield Response.ok
  }

private val server = Server.port(8080) ++ Server.app(app)

最后我用 main 方法运行我的简单程序:

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
    _ <- startStream.provideSomeLayer(consumer ++ Console.live).fork
    _ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
      .provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
} yield ()).exitCode

它运行良好,但问题是当我运行程序时,它会对/stop 请求作出反应,但Consumer 仍处于订阅状态,并且仍然从主题读取消息。 如果我只使用服务器效果运行我的程序,如下所示:

override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = 
  server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
    .provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
  .exitCode

在我调用/start 端点之后,在控制台中我可以看到消费者还活着,我可以看到一些关于 kafka 集群的信息,但没有从主题中读取任何消息。 请告诉我我哪里错了,我的误解在哪里。
谢谢。

【问题讨论】:

    标签: scala apache-kafka zio


    【解决方案1】:

    您传递了两次consumer 层,这是有效的。这意味着取消订阅的消费者是不一样的。

    怎么样

    override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
        _ <- startStream.fork
        _ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
    } yield ()).provideSomeLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
    .exitCode
    
    

    注意:可能无法编译,未测试

    【讨论】:

    • 谢谢您的回答似乎是我在寻找... 相当愚蠢的错误 这很奇怪,但重新订阅后我开始两次收到相同的消息,但可能这是另一个问题
    • 猜想,但这可能是消费者在阅读最后一条消息后但在提交之前关闭。编辑:可能不是因为您使用的是取消订阅
    • 纤维似乎有问题。如果一切都按照您的建议执行,则只有/stop - /start 的一个循环有效,之后我们可以停止消费但无法重新开始。我尝试了.joinstartStream 效果,它变得稳定了。但是由于消费者阻塞线程,永远不会收到对/start端点的请求
    • 好的,我今晚会尝试实现这个,会让你保持最新状态。
    • 聊天选项一般都是在你给cmets发了一点垃圾之后才出现的,老实说不知道有什么其他的打开方式?
    猜你喜欢
    • 2018-01-19
    • 1970-01-01
    • 1970-01-01
    • 2015-08-11
    • 1970-01-01
    • 2019-09-09
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多