【问题标题】:I'm getting dead letters while consuming from alpakka kafka我从 alpakka kafka 消费时收到了死信
【发布时间】:2020-09-19 06:12:57
【问题描述】:

[INFO] [06/01/2020 05:05:53.947] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 开始。 StageActor Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] SLF4J:未找到 SLF4J 提供程序。 SLF4J:默认为无操作(NOP)记录器实现 SLF4J:有关详细信息,请参阅http://www.slf4j.org/codes.html#noProviders。 [信息] [06/01/2020 05:06:05.058] [default-akka.actor.default-dispatcher-6] [SingleSourceLogic(akka://default)] [7347a] 完成 [信息] [akkaDeadLetter] [06/01/2020 05:06:35.165] [default-akka.actor.default-dispatcher-24] [akka://default/system/kafka-consumer-1] 消息 [akka. kafka.internal.KafkaConsumerActor$Internal$StopFromStage] 从 Actor[akka://default/system/Materializers/StreamSupervisor-0/$$a#-993709322] 到 Actor[akka://default/system/kafka-consumer-1# 76265671] 未交付。 [1] 遇到死信。如果这不是预期行为,则 Actor[akka://default/system/kafka-consumer-1#76265671] 可能已意外终止。可以使用配置设置“akka.log-dead-letters”和“akka.log-dead-letters-during-shutdown”关闭或调整此日志记录。

这是我面临的问题,但仅当我使用带有 Kafka 源的流时。

这是我的代码

val bookData: Source[ConsumerRecord[Array[Byte], String], Consumer.Control] = KafkaSource.createSource(TOPIC)
  val parseBook: Flow[ConsumerRecord[Array[Byte], String], Book, NotUsed] = Flow[ConsumerRecord[Array[Byte], String]].map { message =>
    Json.parse(message.value).as[Book]
  }
  val flow: Flow[Book, Book, NotUsed] = Flow[Book].mapAsync(4)(book => toInstant(book))

  def toInstant(book: Book): Future[Book] = Future{
    val array = book.publicationDate.split("-")
    val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
    val timeTillNow = Time.timeSince(instant)
    val royalty = if (timeTillNow.toDays > 1000) {
      .10 * book.price * book.copiesSold
    } else {
      .15 * book.price * book.copiesSold
    }
    book.copy(royalty = Some(royalty))
  }
 val print: Sink[Any, Future[Done]] = Sink.foreach(element => println(element))

在运行后我收到了死信 bookData.via(parseBook).via(flow).runWith(print) 但运行时没有死信:bookData.via(parseBook).runWith(print)。 即使在运行后一个时,我也得到了预期的输出。

有人可以帮忙吗?

【问题讨论】:

    标签: scala apache-kafka akka


    【解决方案1】:

    这很可能发生,因为流程正在引发异常。通常,我希望看到更多详细信息的日志记录,但这些可能会丢失,因为没有配置 SLF4J 实现。有关如何配置日志记录的详细信息,请参阅 Akka 文档中的 "SLF4J backend"

    查看您的代码,我认为错误最有可能在这一行:

    val instant = Time.instantOfDate(array(2).toInt, Month.of(array(2).toInt), array(0).toInt)
    

    请注意,它使用array(2).toInt 表示日和月。我的猜测是月份应该是Month.of(array(1).toInt)。如果 array(2) 确实是一个月中的某一天,那么任何大于 12 的值都会导致 Month.of 抛出 DateTimeException,这将终止流。

    【讨论】:

      猜你喜欢
      • 2020-07-15
      • 2020-02-22
      • 2020-11-27
      • 2021-06-06
      • 2016-05-15
      • 1970-01-01
      • 2020-10-16
      • 2018-09-12
      • 1970-01-01
      相关资源
      最近更新 更多