【发布时间】:2017-01-26 07:32:05
【问题描述】:
我不知道如何使用来自 spark 的 python 将数据帧写入 elasticsearch。我按照here的步骤操作。
这是我的代码:
# Read file
df = sqlContext.read \
.format('com.databricks.spark.csv') \
.options(header='true') \
.load('/vagrant/data/input/input.csv', schema = customSchema)
df.registerTempTable("data")
# KPIs
kpi1 = sqlContext.sql("SELECT * FROM data")
es_conf = {"es.nodes" : "10.10.10.10","es.port" : "9200","es.resource" : "kpi"}
kpi1.rdd.saveAsNewAPIHadoopFile(
path='-',
outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat",
keyClass="org.apache.hadoop.io.NullWritable",
valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable",
conf=es_conf)
上面的代码给出了
原因:net.razorvine.pickle.PickleException:预期为零 构造 ClassDict 的论据(对于 pyspark.sql.types._create_row)
我还从以下位置启动脚本:
spark-submit --master spark://aggregator:7077 --jars ../jars/elasticsearch-hadoop-2.4.0/dist/elasticsearch-hadoop-2.4.0.jar /vagrant/scripts/aggregation.py 确保已加载 elasticsearch-hadoop
【问题讨论】:
-
您使用的是哪个版本的 elasticseach?
-
@eliasah
2.4.0,也尝试将elasticsearch-hadoop-5.0.0-alpha5.jar用于 es 的 2.x 版本
标签: elasticsearch apache-spark pyspark elasticsearch-hadoop