【问题标题】:Pyspark Write DStream data to Elasticsearch using saveAsNewAPIHadoopFilePyspark 使用 saveAsNewAPIHadoopFile 将 DStream 数据写入 Elasticsearch
【发布时间】:2017-05-14 02:36:42
【问题描述】:

我正在尝试将 Kafka 流转换为 RDD,并将这些 RDD 插入到 Elasticsearch 数据库中。这是我的代码:

conf = SparkConf().setAppName("ola")
sc = SparkContext(conf=conf) 
es_write_conf = {
    "es.nodes": "localhost",
    "es.port": "9200",
    "es.resource": "pipe/word"
}

ssc = StreamingContext(sc, 2)
brokers, topic = sys.argv[1:]
kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list": brokers}) 
lines = kvs.map(lambda x: x[1])  
value_counts = lines.flatMap(lambda line: line.split(" ")).map(lambda word: (word, 1)).reduceByKey(lambda a, b: a+b)

value_counts.transform(lambda rdd: rdd.map(f))
value_counts.saveAsNewAPIHadoopFile(
    path='-',
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
    keyClass="org.apache.hadoop.io.NullWritable",
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
    conf=es_write_conf)

ssc.start()  
ssc.awaitTermination() 

saveAsNewAPIHadoopFile 函数应该将这些 RDD 写入 ES。但是我收到此错误:

   value_counts.saveAsNewAPIHadoopFile(
   AttributeError: 'TransformedDStream' object has no attribute 'saveAsNewAPIHadoopFile'

转换函数应该能够将流转换为 Spark 数据帧。如何将这些 RDD 写入 Elasticsearch?谢谢!

【问题讨论】:

    标签: elasticsearch apache-spark pyspark apache-kafka spark-streaming


    【解决方案1】:

    你可以使用foreachRDD:

    value_counts.foreachRDD(lambda rdd: rdd.saveAsNewAPIHadoopFile(...))
    

    【讨论】:

    • 感谢您的建议!但是,现在我遇到了一个巨大的错误:6/12/29 19:23:06 WARN EsOutputFormat: Speculative execution enabled for reducer - 考虑禁用它以防止数据损坏 16/12/29 19:23:06 WARN EsOutputFormat: 无法确定任务 ID 16/12/29 19:23:07 错误执行程序:阶段 83.0 (TID 55) 中任务 0.0 中的异常 org.elasticsearch.hadoop.rest.EsHadoopInvalidRequest:发现不可恢复的错误 [127.0.0.1:9200] 返回了错误请求( 400) - 解析失败;压缩器检测只能在某些 xcontent 字节或压缩的 xcontent 字节上调用;救出..
    • 我不熟悉 ES,所以在这里我不会提供任何帮助。手动保存单个 RDD 是否有效?
    【解决方案2】:
    new = rawUser.rdd.map(lambda item: ('key', {'id': item['entityId'],'targetEntityId': item['targetEntityId']}))
    

    rawUser 是 DATAFRAME 并且 new 是 PipelinedRDD

    new.saveAsNewAPIHadoopFile(
        path='/home/aakash/test111/', 
        outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
        keyClass="org.apache.hadoop.io.NullWritable", 
        valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
        conf={ "es.resource" : "index/test" ,"es.mapping.id":"id","es.nodes" : "localhost","es.port" : "9200","es.nodes.wan.only":"false"})
    

    这里最重要的是下载合适的兼容 JAR https://mvnrepository.com/artifact/org.elasticsearch/elasticsearch-hadoop 检查弹性版本并下载正确的jar。

    使 pyspark 使用 jar 的命令。 pyspark --jars elasticsearch-hadoop-6.2.4.jar

    【讨论】:

      猜你喜欢
      • 2019-08-07
      • 1970-01-01
      • 2017-10-04
      • 1970-01-01
      • 2016-04-24
      • 2020-10-09
      • 2015-01-20
      • 1970-01-01
      • 2019-10-31
      相关资源
      最近更新 更多