【问题标题】:Filtering data in an RDD过滤 RDD 中的数据
【发布时间】:2016-11-14 19:35:39
【问题描述】:

我已经设法在 pyspark 中预处理我的数据以获得类似的东西

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key3', u'2'), (u'key4', u'1'), (u'key1', u'4'), (u'key5', u'1'), (u'key6', u'2'), (u'key7', u'4'), (u'key8', u'5'), (u'key9', u'6'), (u'key10', u'7')]

现在我需要根据这些条件进行过滤:

1) 过滤与至少 2 个键关联的值。

输出 - 只有那些具有 '1','2','4' 作为值的 (k,v) 对应该存在,因为它们与超过 2 个键相关联

 [(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key3', u'2'), (u'key4', u'1'), (u'key1', u'4'), (u'key5', u'1'), (u'key6', u'2'), (u'key2', u'4')]

2) 与至少 2 个值关联的过滤键

输出 - 只有那些以 key1, key2 作为键的 (k,v) 对应该存在,因为它们与至少 2 个值相关联

[(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key1', u'4'), (u'key2', u'4')]

任何建议都会有很大帮助。

更新:我使用 groupBy 和过滤器对具有多个值的键进行分组

 [(u'key1', [u'1', u'2', u'4']), (u'key2',[u'1', u'4'])]

现在我如何将此 (key, list(values)) 拆分为单个 (k,v) 对以应用进一步的转换?

【问题讨论】:

  • 您可以一次性完成所有操作 - reduceByKey,过滤具有超过 2 个值的项目,然后收集或处理那里的任何内容。您具体在哪个部分遇到问题?
  • @khachik 按键减少会根据按键进行聚合,对吗?因此,如果我将reduceByKey中的值加入以','分隔,它会给出类似(u'key1',u'1,2,3')的东西。我不需要我的数据来汇总。如果我错了,请纠正我。
  • 如果我使用reduceByKey,然后一旦聚合,我可以使用过滤器仅过滤具有2个以上值的那些。现在如何执行第二轮过滤来过滤与超过 2 个值关联的键?
  • 一旦您拥有像key1 -> 1, 2, 3 这样的组,您就可以根据值的大小(len >= 2)进行过滤并收集键和值。

标签: python apache-spark pyspark rdd


【解决方案1】:
my_rdd = sc.parallelize([(u'key1', u'1'), (u'key2', u'1'), (u'key1', u'2'), (u'key2', u'3'), (u'key4', u'1'), (u'key1', u'4'), (u'key4', u'1'), (u'key6', u'2'), (u'key7', u'4'), (u'key8', u'5'), (u'key9', u'6'), (u'key10', u'7')])

#filter keys which are associated to atleast 2 values

filter2_rdd = my_rdd.groupByKey() \
                    .mapValues(lambda x: list(x)) \
                    .filter(lambda x: len(x[1])>=2) \
                    .flatMap(lambda x: [(x[0],item) for item in x[1]])

#filter values associated to atleast 2 keys.
filte1_rdd = filter2_rdd.map(lambda x: (x[1],x[0])) \
                        .groupByKey().mapValues(lambda x: list(x))\
                        .filter(lambda x: len(x[1])>=2)\
                        .flatMap(lambda x: [(item,x[0]) for item in x[1]])

这行得通!

【讨论】:

  • 已经完成了几乎所有的事情,除了你将列表拆分为单个 (k,v) 对的 flatMap 部分。 FlatMap 部分有所帮助,谢谢!
【解决方案2】:

按键归约、过滤和连接:

>>> rdd.mapValues(lambda _: 1) \  # Add key of value 1
...     .reduceByKey(lambda x, y: x + y) \ # Count keys
...     .filter(lambda x: x[1] >= 2) \ # Keep only if number is >= 2
...     .join(rdd) # join with original (serves as filter)
...     .mapValues(lambda x: x[0]) # reshape

【讨论】:

  • 您能解释一下您要做什么吗?
猜你喜欢
  • 1970-01-01
  • 2017-11-06
  • 2015-01-17
  • 1970-01-01
  • 2017-07-03
  • 1970-01-01
  • 1970-01-01
  • 2021-04-10
  • 1970-01-01
相关资源
最近更新 更多