【问题标题】:Sending to Kafka from a spark job taking too much time从 Spark 作业发送到 Kafka 花费了太多时间
【发布时间】:2017-04-24 12:32:31
【问题描述】:

我有一个火花流作业,它消耗来自 kafka 的数据并在对每个数据进行一些处理后发送回 kafka。 为此,我正在对数据进行一些地图操作,

val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicNameMap, StorageLevel.MEMORY_AND_DISK)
var ad = ""
val abc = lines.map(_._2).map { x =>
  val jsonObj = new JSONObject(x)
  val data = someMethod(schema, jsonObj)
  data
}

然后我正在对其进行 foreach 操作,我没有将所有数据收集到这里的驱动程序,因为我想将这些记录发送到执行程序本身。

abc.foreachRDD(rdd => {
  rdd.foreach { toSend =>
    val producer = KafkaProducerUtils.getKafkaProducer(kafkaBrokers)
    println("toSend---->" + toSend)
    producer.send(new ProducerRecord[String, String](topicToSend, toSend))
  }

我为 1405 数据尝试了此代码 10 秒,但完成这项工作大约需要 2.5 分钟。我知道创建KafkaProducer 的成本很高,有没有其他方法可以减少处理时间。为了我的测试目的,我使用了 2 个执行器,每个执行器有 2 个核心和 1GM。

【问题讨论】:

    标签: scala apache-spark apache-kafka apache-spark-sql spark-streaming


    【解决方案1】:

    经过大量搜索,我发现了这篇关于 KafkaSink 的文章。这将为您提供有关在火花流中有效地向 kafka 生成数据的想法。

    【讨论】:

      【解决方案2】:

      处理这么多消息的巨大延迟一定有几个原因:

      1. 问题可能存在于您的消费阶段。如果您使用“createStream”,至少,Spark 的次要版本使用高级消费者实现,这需要 Zookeeper 存储属于特定组的消费者的偏移量。所以我会检查这个通信,因为它在 commit 阶段可能需要太多时间。如果出于任何原因逐一提交,您的消费率可能会受到影响。所以首先,检查一下。

      2. 还有另一个原因是文件系统的预写日志。尽管您的配置表明内存是磁盘,但您可以在 Spark 文档中看到:

      效率:在第一种方法中实现零数据丢失需要将数据存储在预写日志中,从而进一步复制数据。这实际上是低效的,因为数据有效地被复制了两次——一次由 Kafka 复制,第二次由 Write Ahead Log 复制。第二种方法消除了这个问题,因为没有接收器,因此不需要预写日志。只要你有足够的 Kafka 保留,就可以从 Kafka 中恢复消息

      为了获得更好的消费率,我会改用 createDirectStream。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 2014-03-22
        • 2015-04-14
        • 2017-02-15
        • 2020-07-23
        • 2013-07-11
        • 2014-01-13
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多