【问题标题】:No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig)找不到记录器的附加程序(org.apache.kafka.clients.producer.ProducerConfig)
【发布时间】:2017-05-30 01:37:23
【问题描述】:

我正在编写一个代码,我试图在其中使用 kafka 和 spark 来消费消息。 但是我的代码不起作用。这是我的代码:

import org.apache.kafka.clients.producer.{ KafkaProducer, ProducerConfig, ProducerRecord }
import org.apache.spark.streaming._
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession
import java.util._

object Smack_Kafka_Spark extends App {
def main(args: Array[String]) {
val kafkaBrokers = "localhost:2181"

val kafkaOpTopic = "test"
/*val props = new HashMap[String, Object]()
props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, kafkaBrokers)
props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")
props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
  "org.apache.kafka.common.serialization.StringSerializer")*/

val props = new Properties()
props.put("bootstrap.servers", "localhost:2181")

props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")

val producer = new KafkaProducer[String, String](props)

var spark: SparkSession = null
val textFile: RDD[String] = spark.sparkContext.textFile("dataset.txt")
textFile.foreach(record => {
  val data = record.toString
  val message = new ProducerRecord[String, String](kafkaOpTopic, null, data)
  producer.send(message)
})
 producer.close()
 }
}

这是我得到的错误:

log4j:WARN No appenders could be found for logger (org.apache.kafka.clients.producer.ProducerConfig).
log4j:WARN Please initialize the log4j system properly.
log4j:WARN See http://logging.apache.org/log4j/1.2/faq.html#noconfig for more info.
Exception in thread "main" java.lang.NullPointerException
at Smack_Kafka_Spark$.main(Smack_Kafka_Spark.scala:25)
at Smack_Kafka_Spark.main(Smack_Kafka_Spark.scala)

如果有任何帮助,我将不胜感激!

【问题讨论】:

    标签: scala apache-spark apache-kafka kafka-consumer-api kafka-producer-api


    【解决方案1】:

    你得到NullPointerException 因为SparkSession 为空。像下面这样创建它。

    val spark : SparkSession = SparkSession.builder()
      .appName("Smack_Kafka_Spark")
      .master("local[*]")
      .getOrCreate()
    

    现在阅读您的文本文件,如下所示。

    val textFile: Dataset[String] = spark.read.textFile("dataset.txt")
    

    您在运行程序时可能遇到的另一个问题是

    Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    Caused by: java.io.NotSerializableException: org.apache.kafka.clients.producer.KafkaProducer
    

    KafkaProducer 不可串行化。您需要将您的 KafkaProducer 实例创建移动到 foreachPartition 中。请查看SO帖子spark kafka producer serializable

    【讨论】:

    • 谢谢!正如你所说,序列化的问题出现了。然后我点击了你提到的链接。
    • 这是更正的代码:这是更正的代码 val textFile: RDD[String] = spark.sparkContext.textFile("Dataset.txt") textFile.foreachPartition((partisions: Iterator[String] ) => { val producer: KafkaProducer[String, String] = new KafkaProducer[String, String](props) partitions.foreach((line: String) => { try { producer.send(new ProducerRecord[String, String]( "test", line)) } catch { case ex: Exception => { } } })})
    • 你能告诉我如何在这段代码中添加一个消费者来检索生产者发送的内容吗?
    • 您的 Kafka 消费者应该是不同的 Spark 应用程序。您需要创建 StreamingContext 并使用 KafkaUtils 类中的方法。请在github.com/apache/spark/blob/master/examples/src/main/scala/org/…查看样品。您也可以 google 并找到一些工作示例。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2012-09-13
    • 2014-11-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多