【发布时间】:2019-03-29 16:15:27
【问题描述】:
我有一个练习,它实现了一项服务,该服务从 Kafka 获取数据,对其进行处理并使用 Spark Streaming 将结果存储到 elasticsearch 中。
我可以将数据从 Kafka 提取到我的服务并在 Spark 集群中对其进行处理,但我不知道如何在操作中将结果持久保存到 elasticsearch 中。截至目前,我的代码如下所示:
SparkConf sparkConf = new SparkConf()...
JavaStreamingContext streamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1));
...
JavaPairInputDStream<String, Event> eventStream = KafkaUtils.createDirectStream(...);
eventStream.foreachRDD( rdd -> {
rdd.foreach(Application::processEvent);
});
方法 processEvent 看起来像:
public static void processEvent(Tuple2<String, Event> t) {
//Process event t here
...
// here I want to persist the result into elasticsearch
}
我是 Apache Spark 的新手。请告诉我如何在处理事件后将结果保存到 elasticsearch 中。 请注意,我知道如何在独立的 java 应用程序中将文档索引到 elasticsearch,我只是不知道如何将它与 Spark Streaming 一起使用。
谢谢。
【问题讨论】:
标签: java apache-spark elasticsearch streaming