【发布时间】:2020-09-19 06:12:57
【问题描述】:
[INFO] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 开始。 StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] SLF4J:未找到 SLF4J 提供程序。 SLF4J:默认为无操作(NOP)记录器实现 SLF4J:有关详细信息,请参阅http://www.slf4j.org/codes.html#noProviders。 [信息] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 完成 [信息] [akkaDeadLetter] [06/01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka://default/system/kafka-consumer-1] 消息 [akka. kafka.internal.KafkaConsumerActor$Internal$StopFromStage] 从 Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] 到 Actor[akka://default/system/kafka-consumer-1# 76265671] 未交付。 [1] 遇到死信。如果这不是预期行为,则 Actor[akka://default/system/kafka-consumer-1#76265671] 可能已意外终止。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录。
这是我面临的问题,但仅当我使用带有 Kafka 源的流时。
这是我的代码
val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
Json.parse(message.value).as[Book]
}
val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))
def toInstant(book: Book): Future[Book] = Future{
val array = book.publicationDate.split("-")
val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
val timeTillNow = Time.timeSince(instant)
val royalty = if (timeTillNow.toDays > 1000) {
.10 * book.price * book.copiesSold
} else {
.15 * book.price * book.copiesSold
}
book.copy(royalty = Some(royalty))
}
val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))
在运行后我收到了死信
bookData.via(parseBook).via(flow).runWith(print)
但运行时没有死信:bookData.via(parseBook).runWith(print)。
即使在运行后一个时,我也得到了预期的输出。
有人可以帮忙吗?
【问题讨论】:
标签: scala apache-kafka akka