【问题标题】:Found java.util.concurrent.Future Required scala.concurrent.Future发现 java.util.concurrent.Future 需要 scala.concurrent.Future
【发布时间】:2017-09-18 09:28:00
【问题描述】:

相关:scala.concurrent.Future wrapper for java.util.concurrent.Future

这来自我的另一个问题:

How to integrate akka streams kafka (reactive-kafka) into akka http application?

我有一个 AKKA HTTP 应用程序,我想在我的路由中的 onComplete 函数中向 Kafka 发送消息/ProducerRecord,如下所示:

val producer : KafkaProducer = new KafkaProducer(producerSettings)

val routes : Route = 
  post {
    entity(as[User]) { user =>
      val createUser = userService.create(user)
      onSuccess(createUser) {
        case Invalid(y: NonEmptyList[Err]) =>  
          complete(BadRequest -> "invalid user")
        case Valid(u: User) => { 
          val producerRecord = 
            new ProducerRecord[Array[Byte], String]("topic1","some message")

          onComplete(producer.send(producerRecord)) { _ =>
            complete(ToResponseMarshallable((StatusCodes.Created, u)))
          }
        }
      }
    }
  }

但是,onComplete(producer send producerRecord) 会产生以下类型不匹配错误:

[错误] 发现:Future[org.apache.kafka.clients.producer.RecordMetadata](在 java.util.concurrent 中) [错误] 必需:Future[org.apache.kafka.clients.producer.RecordMetadata](在 scala.concurrent 中) [错误] onCompleteRecordMetadata { _ =>

有什么办法可以解决这个问题,也许通过使用 Producer 作为接收器 (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) 而不是 java producer.send 函数?

【问题讨论】:

    标签: scala apache-kafka akka akka-http reactive-kafka


    【解决方案1】:

    您可以利用Cake's Scala based Kafka client,它将完成运行 Java 期货的工作并为您提供 Scala 期货。一旦您确定创建的是 cakesolutions.kafka.KafkaProducer 而不是 org.apache.kafka.clients.producer.KafkaProducer,您的其余代码实际上应该保持不变。

    或者,您可以利用Reactive Kafka 解决这个问题,同时继续使用高级 Akka HTTP DSL。您可以通过将您的生产者记录运行到 Kafka Sink 来做到这一点:

    val producerSink = Producer.plainSink(producerSettings)
    
    ...
            // inside the route
            val producerRecord =
              new ProducerRecord[Array[Byte], String]("topic1", "some message")
    
            onComplete(Source.single(producerRecord).runWith(producerSink)) { _ =>
              complete(ToResponseMarshallable((StatusCodes.Created, u)))
            }
    

    【讨论】:

    • 永远不要在生产代码中这样做。与普通的 producer.send 相比,使用 Source.single 发送每条 Kafka 消息效率低 10 倍。它每次都会物化整个流,包括 actor 和所有内容。
    【解决方案2】:

    为了回答您的具体问题,scala-java8-compat 库提供了 java8 和 Scala Futures 之间的转换器。

    具体可以使用FutureConverters.toScala(producer.send(producerRecord))java.util.concurrent.Future转换为scala.concurrent.Future

    但是,使用本身具有对 Scala 友好的 API 的客户端库(如上面的 Stefano 所建议)可能会为您带来最佳结果。

    【讨论】:

      猜你喜欢
      • 2013-06-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-09-20
      • 1970-01-01
      相关资源
      最近更新 更多