【问题标题】:apply a map function on pyspark RDD在 pyspark RDD 上应用地图功能
【发布时间】:2016-10-20 11:05:13
【问题描述】:

我通过读取 mongodb 集合获得了 rdd,现在我想更改一些值并将该数据更新/加载回相同或其他集合。

mr1 = sc.mongoRDD('mongodb://localhost:27017/test_database.test2')
type(mr1)   #<class 'pyspark.rdd.PipelinedRDD'>
mr1.collect()
#[{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'ravi', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'ravi', u'sal': u'3000'}]
#I want to change the name 'ravi' to 'Satya'
mr2 = mr1.map( lambda x: x['name'].replace('ravi','SATYA'))
#o/p: [u'SATYA', u'SATYA']  ##not all values
#Expected: [{u'_id': ObjectId('58089490d7531cd8b071f48c'), u'name': u'SATYA', u'sal': u'2000'}, {u'_id': ObjectId('58089491d7531cd8b071f48d'), u'name': u'SATYA', u'sal': u'3000'}]

请帮忙,如何在这里应用地图功能来取回相同的rdd mr1并替换名称。

谢谢。

【问题讨论】:

    标签: mongodb python-2.7 pyspark pyspark-sql


    【解决方案1】:

    试试:

    def replace(x, key, fr, to):
        d = x.copy()
        if key in d:
            d[key] = d[key].replace('ravi','SATYA')
        return d
    
    mr1.map(lambda x: replace(x, 'name', 'ravi','SATYA'))
    

    【讨论】:

      【解决方案2】:

      成功了-

      def rep(x):
          if x['name'] == 'ravi':
            x['name']='SATYA'
          return x
      mr2 = mr1.map(lambda x: rep(x))
      

      【讨论】:

        猜你喜欢
        • 2016-06-05
        • 2017-04-29
        • 2015-05-07
        • 2018-09-27
        • 2020-10-21
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多