【发布时间】:2017-05-14 02:36:42
【问题描述】:
我正在尝试将 Kafka 流转换为 RDD,并将这些 RDD 插入到 Elasticsearch 数据库中。这是我的代码:
conf = SparkConf().setAppName("ola")
sc = SparkContext(conf=conf)
es_write_conf = {
"es.nodes": "localhost",
"es.port": "9200",
"es.resource": "pipe/word"
}
ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
value_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)
value_counts.transform(lambda rdd: rdd.map(f))
value_counts.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_write_conf)
ssc.start()
ssc.awaitTermination()
saveAsNewAPIHadoopFile 函数应该将这些 RDD 写入 ES。但是我收到此错误:
value_counts.saveAsNewAPIHadoopFile(
AttributeError: 'TransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'
转换函数应该能够将流转换为 Spark 数据帧。如何将这些 RDD 写入 Elasticsearch?谢谢!
【问题讨论】:
标签: elasticsearch apache-spark pyspark apache-kafka spark-streaming