【问题标题】:Zio-Kafka: Producing messages to topicZio-Kafka:为主题生成消息
【发布时间】:2021-10-11 23:36:53
【问题描述】:

我正在尝试使用库 zio-kafka,版本 0.15.0 向 Kafka 主题生成一些消息。

显然,我对 ZIO 生态系统的理解并不理想,因为我无法提供简单的信息。我的程序如下:

object KafkaProducerExample extends zio.App {

  val producerSettings: ProducerSettings = ProducerSettings(List("localhost:9092"))

  val producer: ZLayer[Blocking, Throwable, Producer[Nothing, String, String]] =
    ZLayer.fromManaged(Producer.make(producerSettings, Serde.string, Serde.string))

  val effect: RIO[Nothing with Producer[Nothing, String, String], RecordMetadata] =
    Producer.produce("topic", "key", "value")

  override def run(args: List[String]): URIO[zio.ZEnv, ExitCode] = {
    effect.provideSomeLayer(producer).exitCode
  }
}

编译器给我以下错误:

[error] KafkaProducerExample.scala:19:28: Cannot prove that zio.blocking.Blocking with zio.Has[zio.kafka.producer.Producer.Service[Nothing,String,String]] <:< Nothing with zio.kafka.producer.Producer[Nothing,String,String].
[error]     effect.provideSomeLayer(producer).exitCode
[error]                            ^
[error] one error found

谁能帮助我了解发生了什么?

【问题讨论】:

    标签: scala apache-kafka zio


    【解决方案1】:

    好的,ZIO 在创建producer 层期间需要一些关于类型的提示:

    val producer: ZLayer[Blocking, Throwable, Producer[Any, String, String]] =
        ZLayer.fromManaged(Producer.make[Any, String, String](producerSettings, Serde.string, Serde.string))
    

    当调用make 智能构造函数时,我们必须给他我们想要使用的类型。第一个表示构建key和value序列化器所需的环境,后两个是消息的key和value的类型。

    在这种情况下,我们根本不需要环境来构建两个序列化程序,所以我们传递Any

    最后,Producer.produce 函数也需要一些类型提示:

    val effect: RIO[Producer[Any, String, String], RecordMetadata] =
        Producer.produce[Any, String, String]("topic", "key", "value")
    

    做完以上修改后,类型完美对齐,编译器又开心了。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-03-22
      • 2016-12-29
      • 2014-01-04
      • 2021-10-06
      • 2021-01-09
      • 1970-01-01
      • 1970-01-01
      • 2019-10-09
      相关资源
      最近更新 更多