【发布时间】:2017-01-31 05:34:37
【问题描述】:
为了广播 KafkaProducer 来激发执行者,我创建了一个如下所示的包装器:
public class KafkaSink implements Serializable {
private static KafkaProducer<String, String> producer = null;
public KafkaProducer<String, String> getInstance(final Properties properties) {
if(producer == null) {
producer = new KafkaProducer<>(properties);
}
return producer;
}
public void close() {
producer.close();
}
}
并像下面这样使用它
JavaSparkContext jsc = new JavaSparkContext(sc);
Broadcast<KafkaSink> kafkaSinkBroadcast = jsc.broadcast(new KafkaSink()));
dataset.toJavaRDD().foreach(row -> kafkaSinkBroadcast.getValue().getInstance(kafkaProducerProps()).send(new ProducerRecord<String, String>(topic, row.mkString(", "))))
我只是想知道这样做是否正确,或者最好的方法是什么
【问题讨论】:
-
您是否有适用于此的示例 Java 代码?
标签: java apache-spark apache-kafka spark-streaming