【发布时间】:2017-04-24 12:32:31
【问题描述】:
我有一个火花流作业,它消耗来自 kafka 的数据并在对每个数据进行一些处理后发送回 kafka。 为此,我正在对数据进行一些地图操作,
val lines = KafkaUtils.createStream[String, String, StringDecoder, StringDecoder](ssc, kafkaParams, topicNameMap, StorageLevel.MEMORY_AND_DISK)
var ad = ""
val abc = lines.map(_._2).map { x =>
val jsonObj = new JSONObject(x)
val data = someMethod(schema, jsonObj)
data
}
然后我正在对其进行 foreach 操作,我没有将所有数据收集到这里的驱动程序,因为我想将这些记录发送到执行程序本身。
abc.foreachRDD(rdd => {
rdd.foreach { toSend =>
val producer = KafkaProducerUtils.getKafkaProducer(kafkaBrokers)
println("toSend---->" + toSend)
producer.send(new ProducerRecord[String, String](topicToSend, toSend))
}
我为 1405 数据尝试了此代码 10 秒,但完成这项工作大约需要 2.5 分钟。我知道创建KafkaProducer 的成本很高,有没有其他方法可以减少处理时间。为了我的测试目的,我使用了 2 个执行器,每个执行器有 2 个核心和 1GM。
【问题讨论】:
标签: scala apache-spark apache-kafka apache-spark-sql spark-streaming