【问题标题】:How to filter out values from pyspark.rdd.PipelinedRDD?如何从 pyspark.rdd.PipelinedRDD 中过滤掉值?
【发布时间】:2017-10-26 08:59:20
【问题描述】:

我有一个名为myRDDpyspark.rdd.PipelinedRDD。这是它的示例内容:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),
 ((333, u'BB', u'B'), (999, u'BB', u'A')),...]

我需要删除所有第三列值不一致的条目。预期的结果是这样的:

[((111, u'BB', u'A'), (444, u'BB', u'A')),
 ((222, u'BB', u'A'), (888, u'BB', u'A')),...]

我该怎么做?

【问题讨论】:

标签: python apache-spark pyspark rdd


【解决方案1】:

您可以使用带有 lambda 表达式的过滤器来检查每个元组对的第三个元素是否相同,例如:

l = [((111, u'BB', u'A'), (444, u'BB', u'A')),
     ((222, u'BB', u'A'), (888, u'BB', u'A')),
     ((333, u'BB', u'B'), (999, u'BB', u'A'))]

rdd = sc.parallelize(l)
rdd = rdd.filter(lambda x: x[0][2] == x[1][2])
result = rdd.collect()
print result

>>> [((111, u'BB', u'A'), (444, u'BB', u'A')), ((222, u'BB', u'A'), (888, u'BB', u'A'))]

要回答您的后续评论,请记住,lambda 只是一个函数,如果您有更复杂的逻辑,您可以将它写成一个函数。你可以这样做:

def do_stuff(x):
    if (x[0][2] == 'C') or (x[1][2] == 'C'):
        return x     
    else:
        if x[0][2] == x[1][2]: return x
    return None

rdd = rdd.map(do_stuff).filter(lambda x: x is not None)

res = rdd.collect()

【讨论】:

  • 效果很好。顺便说一句,是否可以添加异常,例如排除值C?如果对的第三列中的任何一个值为C,则不应进行比较。
猜你喜欢
  • 1970-01-01
  • 2019-08-08
  • 2023-04-08
  • 2015-10-19
  • 2012-04-23
  • 2017-08-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多