【问题标题】:Spark replace rdd field value by another valueSpark用另一个值替换rdd字段值
【发布时间】:2017-05-14 08:25:17
【问题描述】:

我是 Spark 新手。

我可以使用以下命令查看我的 elasticsearch 数据库中第一个 RDD 的内容:

print(es_rdd.first())
>>>(u'1', {u'name': u'john'})

我还可以使用以下方法为我的 Dstream 获取所需的值:

kvs = KafkaUtils.createDirectStream(ssc, [topic], {"metadata.broker.list":brokers})
name=kvs.map(lambda x: x[1])
name.pprint()
>>>>robert

我打算将 rdd "name": "john" 替换为 "robert",然后使用 saveAsNewAPIHadoopFile() 在 elasticsearch 中插入新的 rdd

我该怎么做? 有没有办法将“罗伯特”映射到一个新的rdd?有点像..

new_rdd=es_rdd.map(lambda item: {item[0]:name})

谢谢

【问题讨论】:

    标签: apache-spark pyspark apache-kafka spark-streaming rdd


    【解决方案1】:

    我们可以根据索引列表将RDD的一部分替换为另一个RDD。例如,将 (RDD) 中的元素从 1,2,3,4 替换为 2,3,4,4。

    a = sc.parallelize([1,2,3,4])
    repVals = sc.parallelize([2,3,4])
    idx = sc.parallelize([0,1,2]) . # idx has the same number of values with repVals
    
    a = a.zipWithIndex()
    ref = idx.zip(repVals).collectAsMap() # create a dictionary of format {idex:repValue}
    
    anew = a.map(lambda x:ref[x[1]] if x[1] in ref else x[0])
    anew.collect()
    

    结果显示[2,3,4,4]

    【讨论】:

      猜你喜欢
      • 2016-08-23
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2023-01-19
      • 1970-01-01
      • 1970-01-01
      • 2017-06-27
      • 1970-01-01
      相关资源
      最近更新 更多