【发布时间】:2017-09-18 09:28:00
【问题描述】:
相关:scala.concurrent.Future wrapper for java.util.concurrent.Future
这来自我的另一个问题:
How to integrate akka streams kafka (reactive-kafka) into akka http application?
我有一个 AKKA HTTP 应用程序,我想在我的路由中的 onComplete 函数中向 Kafka 发送消息/ProducerRecord,如下所示:
val producer : KafkaProducer = new KafkaProducer(producerSettings)
val routes : Route =
post {
entity(as[User]) { user =>
val createUser = userService.create(user)
onSuccess(createUser) {
case Invalid(y: NonEmptyList[Err]) =>
complete(BadRequest -> "invalid user")
case Valid(u: User) => {
val producerRecord =
new ProducerRecord[Array[Byte], String]("topic1","some message")
onComplete(producer.send(producerRecord)) { _ =>
complete(ToResponseMarshallable((StatusCodes.Created, u)))
}
}
}
}
}
但是,onComplete(producer send producerRecord) 会产生以下类型不匹配错误:
[错误] 发现:Future[org.apache.kafka.clients.producer.RecordMetadata](在 java.util.concurrent 中) [错误] 必需:Future[org.apache.kafka.clients.producer.RecordMetadata](在 scala.concurrent 中) [错误] onCompleteRecordMetadata { _ =>
有什么办法可以解决这个问题,也许通过使用 Producer 作为接收器 (http://doc.akka.io/docs/akka-stream-kafka/current/producer.html#producer-as-a-sink) 而不是 java producer.send 函数?
【问题讨论】:
标签: scala apache-kafka akka akka-http reactive-kafka