【问题标题】:How to group key/values by partition in Spark?如何在 Spark 中按分区对键/值进行分组?
【发布时间】:2016-06-20 00:45:38
【问题描述】:

我有一个 Spark Streaming 应用程序,它每秒接收几条 JSON 消息,每条消息都有一个标识其来源的 ID。

使用此 ID 作为键,我可以执行 MapPartitionsToPair,从而创建一个 JavaPairDStream,其中包含键/值对的 RDD,每个分区一个键值对(例如,如果我收到 5 条 JSON 消息,我得到一个包含 5 个分区的 RDD,每个分区都以消息的 ID 作为键,以 JSON 消息本身作为值)。

我现在想要做的是,我想将所有具有相同键的值分组到同一个分区中。因此,例如,如果我有 3 个带有键“a”的分区和 2 个带有键“b”的分区,我想创建一个带有 2 个分区而不是 5 个分区的新 RDD,每个分区包含一个键具有的所有值,一个用于'a' 和一个 'b'。

我怎样才能做到这一点? 到目前为止,这是我的代码:

JavaReceiverInputDStream<String> streamData2 = ssc.socketTextStream(args[0], Integer.parseInt(args[1]),
            StorageLevels.MEMORY_AND_DISK_SER);

JavaPairDStream<String,String> streamGiveKey= streamData2.mapPartitionsToPair(new PairFlatMapFunction<Iterator<String>, String, String>() {
        @Override
        public Iterable<Tuple2<String, String>> call(Iterator<String> stringIterator) throws Exception {

            ArrayList<Tuple2<String,String>>a= new ArrayList<Tuple2<String, String>>();

            while (stringIterator.hasNext()){
                String c=stringIterator.next();
                if(c==null){
                    return null;

                }

                JsonMessage retMap = new Gson().fromJson(c,JsonMessage.class);
                String key= retMap.getSid();
                Tuple2<String,String> b= new Tuple2<String,String>(key,c);
                a.add(b);

                System.out.print(b._1+"_"+b._2);
                // }
                //break;
            }


            return a;
        }
    });

//我创建一个JavaPairDStream,其中每个分区包含一个键/值对。

我尝试使用grouByKey(),但是无论消息数量是多少,我总是得到分区号2。

我该怎么做? 非常感谢。

【问题讨论】:

  • 为什么要每个分区 1 个元素?你想解决什么问题?

标签: apache-spark parallel-processing spark-streaming


【解决方案1】:

你可以使用

groupByKey(Integer numPartitions)

并将numPartitions 设置为等于您拥有的不同键的数量。

但是 .. 您需要预先知道 您有多少个不同的键。你有那个信息吗?可能不是。那么......你需要做一些额外的(/冗余)工作。例如。使用

countByKey

作为第一步。这比 groupByKey 快 - 所以至少您没有将总处理时间加倍

更新 OP 询问他们为什么默认获得 2 个分区。

默认的groupByKey 使用defaultPartitioner() 方法

groupByKey(defaultPartitioner(self))
  • 从具有最大基数的父分区中选择Partitioner

-- 否则会使用spark.default.parallelism

【讨论】:

  • 谢谢,这绝对解决了我的问题。但是,只有一个问题:你知道为什么groupByKey() 默认返回 2 个分区吗?无论我在每个批处理间隔发送多少输入,或者我有多少输出,似乎 groupByKey 都独立于所有这些。当我做 getNumPartitions 时它只返回 2
猜你喜欢
  • 1970-01-01
  • 2021-02-05
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2022-01-08
相关资源
最近更新 更多