package test05

import java.util.Properties
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object WriteDataToKafka {
def main(args: Array[String]): Unit = {

val conf = new SparkConf().setAppName("ReadS3LogToKafka").setMaster("local[*]")
val sc = new SparkContext(conf)
val logData:RDD[String] = sc.textFile("/Users/huiliyang/vwlog/")

//logData.collect().foreach(println(_))

writeToKafkaTopic(logData,"192.168.1.112:9092","huiliyang")

}

//写入数据到Kafka
def writeToKafkaTopic(lines: RDD[String], kafkaServer: String, kafkaTopic: String): Unit ={
val props = new Properties()
props.put("bootstrap.servers", kafkaServer)
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer")


for (line <- lines) {
val producer = new KafkaProducer[String, String](props)
val record = new ProducerRecord(kafkaTopic, "key", line)
producer.send(record)
//Thread.sleep(10000)
producer.close()
}
}
}

相关文章:

  • 2022-12-23
  • 2021-04-02
  • 2022-12-23
  • 2022-12-23
  • 2022-01-29
  • 2021-08-20
  • 2022-12-23
  • 2021-12-18
猜你喜欢
  • 2021-09-28
  • 2022-12-23
  • 2021-07-30
  • 2021-12-14
  • 2021-04-08
  • 2021-05-20
  • 2021-12-26
相关资源
相似解决方案