【问题标题】:Strange issue regarding for-comprehension关于理解的奇怪问题
【发布时间】:2014-06-27 19:00:21
【问题描述】:

我是整个 Scala 场景的新手,但到目前为止我一直很喜欢这个旅程!但是,我遇到了一个问题,还无法理解原因...... 我目前正在使用 Kafka,并试图从一个主题中读取数据并将其传递到其他地方。

问题是:内部 for-comprehension 中的 println 按预期输出底部的行,但内部 for 之外的所有其他 prinln 都被跳过,函数最终什么也不返回(甚至不能发出测试用例中的 getClass !)...可能是什么原因造成的?我真的没办法了……

相关代码:

def tryBatchRead(maxMessages: Int = 100, skipMessageOnError: Boolean = true): List[String] = {
  var numMessages = 0L

  var list = List[String]()

  val iter = if (maxMessages >= 0) stream.slice(0, maxMessages) else stream

  for (messageAndTopic <- iter) {
    for (m <- messageAndTopic) {
      println(m.offset.toString + " --- " + new String(m.message))
      list = list ++ List(new String(m.message))
      println("DEBUG " + list)
      numMessages += 1
    }
    println("test1")
  }

  println("test2")
  println("FINISH" + list)
  connector.shutdown()
  println("test3")
  list
}

输出:

6 --- {"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}})
7 --- test 2
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2)
8 --- {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}}
DEBUG List({"user":{"id":"4d9e3582-2d35-4600-b070-e4d92e42c534","age":25,"sex":"M","location":"PT"}}, test 2, {"StartSurvey":{"user":{"id":"6a736fdd-79a0-466a-9030-61b5ac3a3a0e","age":25,"sex":"M","location":"PT"}}})

感谢您的帮助!

【问题讨论】:

  • 你的代码里有一个简单的 for 循环,for-comprehension 是一个围绕单子编程风格的语法糖,即for {...} yield ... 构造
  • 感谢您的评论。 ;-)

标签: java scala apache-kafka for-comprehension


【解决方案1】:

我不完全确定,但很有可能您在阅读了等待下一条消息的最后一条消息后阻塞(kafka 流基本上是无限的)。为kafka消费者配置超时时间,如果一段时间没有消息就会放弃。有consumer.timeout.ms 属性(例如将其设置为3000 ms),一旦达到等待限制,将导致ConsumerTimeoutException。

顺便说一句,我会将你的代码重写为:

def tryBatchRead(maxMessages: Int = 100): List[String] = {
  // `.take` works fine if collection has less elements than max
  val batchStream = stream.take(maxMessages)  

  // TODO: add try/catch section, according to the above comments
  // in scala we usually write a single joined for, instead of multiple nested ones
  val batch = for {
    messageAndTopic <- batchStream.take(maxMessages)
    msg <- messageAndTopic // are you sure you can iterate message and topic? 0_o
  } yield {
    println(m.offset.toString + " --- " + new String(m.message))
    msg
  }

  println("Number of messages: " + batch.length)

  // shutdown has to be done outside, it's bad idea to implicitly tear down streams in reading function
  batch 
}

【讨论】:

  • 您好!谢谢您的帮助!我将您的示例代码与@chekkal 的建议和一些更正结合起来,但仍然无法使其工作。 =S 我试图将其保持在非常低的水平,以便不需要混合 Akka 功能,但似乎无法以这种方式达到这个目标。它一直在阻塞...
  • 另外,我写的代码很大程度上受到了 Kafka 自己的 ConsoleConsumer 代码 link 的启发——包括那个 consumer.shutdown 的东西......
  • @PedroAlmeida 您将提到的参数设置为合理的正值,但它仍然阻塞?
  • sort of... 使用 set 参数,它不会输出任何内容然后抛出异常。参数未设置:在 for 内部执行 println 并无限期阻塞。与此同时,我将 Kafka 升级到 v0.8.1.1(从 v0.8.0),但一直停滞不前。 =S
  • @PedroAlmeida 让我猜猜,你还没有调整auto.offset.reset?默认情况下,消费者将等待全新的消息,忽略已经进入主题的消息。将auto.offset.reset 设置为smallest 以从头开始。
【解决方案2】:

我认为这是一种正常行为,因为您正在对理论上无限大小的流执行 for . 恕我直言,我宁愿写 for (m 而不是 for (m

【讨论】:

    猜你喜欢
    • 2019-08-28
    • 2021-08-27
    • 2011-07-17
    • 2011-02-18
    • 1970-01-01
    • 1970-01-01
    • 2021-10-28
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多