【发布时间】:2021-09-17 17:53:02
【问题描述】:
我想编写一个完全基于 ZIO 堆栈设计的应用程序。 我是这个框架的新手,所以也许解决方案是微不足道的,我误解了一些重要的东西。 并面临以下问题。 我需要使用 REST 收到的命令取消订阅 kafka 主题。 我还需要通过 REST 订阅主题。 我使用 zio-kafka 编写了如下代码来描述订阅主题并将事件打印到控制台的效果:
private val consumerSettings = ConsumerSettings(List("localhost:9092")).withGroupId("MyConsumerGroup")
.withOffsetRetrieval(OffsetRetrieval.Auto(AutoOffsetStrategy.Earliest))
private val managedConsumer = Consumer.make(consumerSettings)
val consumer: ZLayer[Clock with Blocking, Throwable, Has[Consumer]] = ZLayer.fromManaged(managedConsumer)
def startStream: ZIO[Console with Any with Has[Consumer] with Clock, Throwable, Unit] =
Consumer.subscribeAnd(Subscription.topics("myTopic"))
.plainStream(Serde.string, Serde.string)
.tap(cr => zio.console.putStrLn(cr.value))
.map(_.offset)
.aggregateAsync(Consumer.offsetBatches)
.run(ZSink.foreach(_.commit))
然后我使用 zhttp 描述了 REST 端点:
private val app = HttpApp.fromEffectFunction{
case Method.POST -> Root / "stop" => for {
_ <- ZIO.serviceWith[Consumer](_.unsubscribe)
_ <- zio.console.putStrLn("stopped")
} yield Response.ok
case Method.POST -> Root / "start" => for {
_ <- startStream.fork
_ <- zio.console.putStrLn("started")
} yield Response.ok
}
private val server = Server.port(8080) ++ Server.app(app)
最后我用 main 方法运行我的简单程序:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = (for {
_ <- startStream.provideSomeLayer(consumer ++ Console.live).fork
_ <- server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
} yield ()).exitCode
它运行良好,但问题是当我运行程序时,它会对/stop 请求作出反应,但Consumer 仍处于订阅状态,并且仍然从主题读取消息。
如果我只使用服务器效果运行我的程序,如下所示:
override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] =
server.make.use(_ => console.putStrLn("server started") *> ZIO.never)
.provideCustomLayer(ServerChannelFactory.auto ++ EventLoopGroup.auto() ++ consumer)
.exitCode
在我调用/start 端点之后,在控制台中我可以看到消费者还活着,我可以看到一些关于 kafka 集群的信息,但没有从主题中读取任何消息。
请告诉我我哪里错了,我的误解在哪里。
谢谢。
【问题讨论】:
标签: scala apache-kafka zio