【发布时间】:2018-02-16 08:47:03
【问题描述】:
我正在使用本地 [8] 配置运行 spark。输入是一个带有 8 个代理的 kafka 流。但是从系统监视器中可以看出,它不够并行,似乎只有一个节点在运行。 kafka 流媒体的输入大约为 1.6GB,因此它的处理速度应该更快。
卡夫卡制作人:
import java.io.{BufferedReader, FileReader}
import java.util
import java.util.{Collections, Properties}
import logparser.LogEvent
import org.apache.hadoop.conf.Configuration
import org.apache.kafka.clients.producer.{KafkaProducer, Producer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
object sparkStreaming{
private val NUMBER_OF_LINES = 100000000
val brokers ="localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096,localhost:9097,localhost:9098,localhost:9099"
val topicName = "log-1"
val fileName = "data/HDFS.log"
val producer = getProducer()
// no hdfs , read from text file.
def produce(): Unit = {
try { //1. Get the instance of Configuration
val configuration = new Configuration
val fr = new FileReader(fileName)
val br = new BufferedReader(fr)
var line = ""
line = br.readLine
var count = 1
//while (line != null){
while ( {
line != null && count < NUMBER_OF_LINES
}) {
System.out.println("Sending batch " + count + " " + line)
producer.send(new ProducerRecord[String, LogEvent](topicName, new LogEvent(count,line,System.currentTimeMillis())))
line = br.readLine
count = count + 1
}
producer.close()
System.out.println("Producer exited successfully for " + fileName)
} catch {
case e: Exception =>
System.out.println("Exception while producing for " + fileName)
System.out.println(e)
}
}
private def getProducer() : KafkaProducer[String,LogEvent] = { // create instance for properties to access producer configs
val props = new Properties
//Assign localhost id
props.put("bootstrap.servers", brokers)
props.put("auto.create.topics.enable", "true")
//Set acknowledgements for producer requests.
props.put("acks", "all")
//If the request fails, the producer can automatically retry,
props.put("retries", "100")
//Specify buffer size in config
props.put("batch.size", "16384")
//Reduce the no of requests less than 0
props.put("linger.ms", "1")
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", "33554432")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "logparser.LogEventSerializer")
props.put("topic.metadata.refresh.interval.ms", "1")
val producer = new KafkaProducer[String, LogEvent](props)
producer
}
def sendBackToKafka(logEvent: LogEvent): Unit ={
producer.send(new ProducerRecord[String, LogEvent] ("times",logEvent))
}
def main (args: Array[String]): Unit = {
println("Starting to produce");
this.produce();
}
}
消费者:
package logparser
import java.io._
import java.util.Properties
import kafka.serializer.StringDecoder
import org.apache.kafka.clients.producer.{KafkaProducer, ProducerRecord}
import org.apache.kafka.common.serialization.StringDeserializer
import org.apache.spark.SparkConf
import org.apache.spark.rdd.RDD
import org.apache.spark.streaming._
import org.apache.spark.streaming.kafka010._
object consumer {
var tFromKafkaToSpark: Long = 0
var tParsing : Long = 0
val startTime = System.currentTimeMillis()
val CPUNumber = 8
val pw = new PrintWriter(new FileOutputStream("data/Streaming"+CPUNumber+"config2x.txt",false))
pw.write("Writing Started")
def printstarttime(): Unit ={
pw.print("StartTime : " + System.currentTimeMillis())
}
def printendtime(): Unit ={
pw.print("EndTime : " + System.currentTimeMillis());
}
val producer = getProducer()
private def getProducer() : KafkaProducer[String,TimeList] = { // create instance for properties to access producer configs
val props = new Properties
val brokers ="localhost:9090,"
//Assign localhost id
props.put("bootstrap.servers", brokers)
props.put("auto.create.topics.enable", "true")
//Set acknowledgements for producer requests.
props.put("acks", "all")
//If the request fails, the producer can automatically retry,
props.put("retries", "100")
//Specify buffer size in config
props.put("batch.size", "16384")
//Reduce the no of requests less than 0
props.put("linger.ms", "1")
//The buffer.memory controls the total amount of memory available to the producer for buffering.
props.put("buffer.memory", "33554432")
props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer")
props.put("value.serializer", "logparser.TimeListSerializer")
props.put("topic.metadata.refresh.interval.ms", "1")
val producer = new KafkaProducer[String, TimeList](props)
producer
}
def sendBackToKafka(timeList: TimeList): Unit ={
producer.send(new ProducerRecord[String, TimeList]("times",timeList))
}
def main(args: Array[String]) {
val topics = "log-1"
//val Array(brokers, ) = Array("localhost:9092","log-1")
val brokers = "localhost:9092"
// Create context with 2 second batch interval
val sparkConf = new SparkConf().setAppName("DirectKafkaWordCount").setMaster("local[" + CPUNumber + "]")
val ssc = new StreamingContext(sparkConf, Seconds(1))
// Create direct kafka stream with brokers and topics
val topicsSet = topics.split(",").toSet
var kafkaParams = Map[String, AnyRef]("metadata.broker.list" -> brokers)
kafkaParams = kafkaParams + ("bootstrap.servers" -> "localhost:9092,localhost:9093,localhost:9094,localhost:9095,localhost:9096,localhost:9097,localhost:9098,localhost:9099")
kafkaParams = kafkaParams + ("auto.offset.reset"-> "latest")
kafkaParams = kafkaParams + ("group.id" -> "test-consumer-group")
kafkaParams = kafkaParams + ("key.deserializer" -> classOf[StringDeserializer])
kafkaParams = kafkaParams + ("value.deserializer"-> "logparser.LogEventDeserializer")
//kafkaParams.put("zookeeper.connect", "192.168.101.165:2181");
kafkaParams = kafkaParams + ("enable.auto.commit"-> "true")
kafkaParams = kafkaParams + ("auto.commit.interval.ms"-> "1000")
kafkaParams = kafkaParams + ("session.timeout.ms"-> "20000")
kafkaParams = kafkaParams + ("metadata.max.age.ms"-> "1000")
val messages = KafkaUtils.createDirectStream[String, LogEvent](
ssc,
LocationStrategies.PreferConsistent,
ConsumerStrategies.Subscribe[String, LogEvent](topicsSet, kafkaParams))
var started = false
val lines = messages.map(_.value)
val lineswTime = lines.map(event =>
{
event.addNextEventTime(System.currentTimeMillis())
event
}
)
lineswTime.foreachRDD(a => a.foreach(e => println(e.getTimeList)))
val logLines = lineswTime.map(
(event) => {
//println(event.getLogline.stringMessages.toString)
event.setLogLine(event.getContent)
println("Got event with id = " + event.getId)
event.addNextEventTime(System.currentTimeMillis())
println(event.getLogline.stringMessages.toString)
event
}
)
//logLines.foreachRDD(a => a.foreach(e => println(e.getTimeList + e.getLogline.stringMessages.toString)))
val x = logLines.map(le => {
le.addNextEventTime(System.currentTimeMillis())
sendBackToKafka(new TimeList(le.getTimeList))
le
})
x.foreachRDD(a => a.foreach(e => println(e.getTimeList)))
//logLines.map(ll => ll.addNextEventTime(System.currentTimeMillis()))
println("--------------***///*****-------------------")
//logLines.print(10)
/*
val words = lines.flatMap(_.split(" "))
val wordCounts = words.map(x => (x, 1L)).reduceByKey(_ + _)
wordCounts.print()
*/
// Start the computation
ssc.start()
ssc.awaitTermination()
ssc.stop(false)
pw.close()
} }
【问题讨论】:
-
我猜 Kafka 是用 8 核处理器与 Spark 共享资源
标签: scala apache-spark apache-kafka