背景:连个rdd进行join关联时,一方rdd存在大量数据倾斜的key,如果通过reduceBykey,设置分区数为10,由于相同key,占用数据比例大,其余9个task可能是至于空闲状态,而另外一个task却要处理大量数据,导致任务分配不均匀

解决办法:双重聚合

思路:

1、通过抽样,排序,take前几,找到导致数据倾斜的key

2、数据倾斜rdd,倾斜key加随机前缀,比如随机前缀为5

3、另外一方rdd,数据倾斜key,对于每一条数据膨胀5倍,从0-5循环加前缀

4、使用union合并,处理

逻辑图如下

[大数据]连载No14之数据倾斜解决办法之双重聚合

代码如下

   public static void main(String[] args) {
        SparkConf conf = new SparkConf().setAppName("DoubleJoin").setMaster("local");
        JavaSparkContext sc = new JavaSparkContext(conf);

        List<Tuple2<String, Integer>> dataList1 = Arrays.asList(
                new Tuple2<String, Integer>("hello", 1),
                new Tuple2<String, Integer>("hello", 2),
                new Tuple2<String, Integer>("bjsxt", 3),
                new Tuple2<String, Integer>("shsxt", 1),
                new Tuple2<String, Integer>("gzsxt", 1)
        );
        List<Tuple2<String, Integer>> dataList2 = Arrays.asList(
                new Tuple2<String, Integer>("hello", 100),
                new Tuple2<String, Integer>("bjsxt", 99),
                new Tuple2<String, Integer>("shsxt", 88),
                new Tuple2<String, Integer>("gzsxt", 66)
        );


        JavaPairRDD<String, Integer> rdd1 = sc.parallelizePairs(dataList1);
        JavaPairRDD<String, Integer> rdd2 = sc.parallelizePairs(dataList2);

        JavaPairRDD<Integer, String> sortRdd = rdd1.sample(false, 1)
                .mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> t) throws Exception {
                        return new Tuple2<>(t._1, 1);
                    }
                }).reduceByKey(new Function2<Integer, Integer, Integer>() {
                    @Override
                    public Integer call(Integer n1, Integer n2) throws Exception {
                        return n1 + n2;
                    }
                }).mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                    @Override
                    public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
                        return new Tuple2<Integer, String>(s._2, s._1);
                    }
                }).sortByKey(false);

        sortRdd.foreach(new VoidFunction<Tuple2<Integer, String>>() {
            @Override
            public void call(Tuple2<Integer, String> t1) throws Exception {
                System.out.println(t1._1 + "\t" + t1._2);
            }
        });

        //取数据倾斜key

        String skewedKey = sortRdd.take(1).get(0)._2;

        final Broadcast<String> broadcastSkewedKey = sc.broadcast(skewedKey);


        final Random random = new Random();
        final Broadcast<Random> broadcastRandom = sc.broadcast(random);

        JavaPairRDD<String, Integer> keyeRdd1 = rdd1
                /**
                 *过滤key
                 */
                .filter(new Function<Tuple2<String, Integer>, Boolean>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, Integer> par) throws Exception {
                        return par._1.equals(broadcastSkewedKey.value());
                    }
                })
                /**
                 *加上随机前缀
                 */
                .mapToPair(new PairFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Tuple2<String, Integer> call(Tuple2<String, Integer> pam) throws Exception {
                        int random = broadcastRandom.value().nextInt(2);
                        return new Tuple2<>(random + "_" + pam._1, pam._2);
                    }
                });

        JavaPairRDD<String, Integer> skeyeRdd2 = rdd2
                /**
                 *rdd2过滤key
                 */
                .filter(new Function<Tuple2<String, Integer>, Boolean>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, Integer> par) throws Exception {
                        return par._1.equals(broadcastSkewedKey.value());
                    }
                })
                /**
                 *加上随机前缀
                 */
                .flatMapToPair(new PairFlatMapFunction<Tuple2<String, Integer>, String, Integer>() {
                    @Override
                    public Iterable<Tuple2<String, Integer>> call(Tuple2<String, Integer> pam) throws Exception {
                        List<Tuple2<String, Integer>> list = new ArrayList<>();
                        for (int i = 0; i < 2; i++) {
                            list.add(new Tuple2<>(i + "_" + pam._1, pam._2));
                        }

                        return list;
                    }
                });


//        keyeRdd1.foreach(new VoidFunction<Tuple2<String, Integer>>() {
//            @Override
//            public void call(Tuple2<String, Integer> t) throws Exception {
//                System.out.println("--keyeRdd1---"+t);
//            }
//        });
//
//
//        skeyeRdd2.foreach(new VoidFunction<Tuple2<String, Integer>>() {
//            @Override
//            public void call(Tuple2<String, Integer> t) throws Exception {
//                System.out.println("---keyeRdd2--"+t);
//            }
//        });

        JavaPairRDD<String, Tuple2<Integer, Integer>> join1 = keyeRdd1
                /**
                 * join rdd1
                 */
                .join(skeyeRdd2, 2)
                /**
                 * 去前缀,取结果
                 */
                .mapToPair(new PairFunction<Tuple2<String, Tuple2<Integer, Integer>>, String, Tuple2<Integer, Integer>>() {
                    @Override
                    public Tuple2<String, Tuple2<Integer, Integer>> call(Tuple2<String, Tuple2<Integer, Integer>> tuple) throws Exception {
                        return new Tuple2<String, Tuple2<Integer, Integer>>(tuple._1.split("_")[1],tuple._2);
                    }
                });


        JavaPairRDD<String, Integer> rdd1Com = rdd1
                /**
                 *过滤key
                 */
                .filter(new Function<Tuple2<String, Integer>, Boolean>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, Integer> par) throws Exception {
                        return ! par._1.equals(broadcastSkewedKey.value());
                    }
                });

        JavaPairRDD<String, Integer> rdd2Com = rdd2
                /**
                 *rdd2过滤key
                 */
                .filter(new Function<Tuple2<String, Integer>, Boolean>() {
                    private static final long serialVersionUID = 1L;

                    @Override
                    public Boolean call(Tuple2<String, Integer> par) throws Exception {
                        return ! par._1.equals(broadcastSkewedKey.value());
                    }
                });


        /**
         * union  rdd1和rdd2
         * 注意  rdd1Com.join(rdd2Com ).union(join1) 错误写法,要注意顺序
         * 应该是  join1。join(rdd1Com)
         * 因为  join1前面是随机的, rdd1Com可能会有
         */

        JavaPairRDD<String, Tuple2<Integer, Integer>> joinResult=rdd1Com.join(rdd2Com).union(join1);


        /**
         * 打印
         */
        joinResult.foreach(new VoidFunction<Tuple2<String, Tuple2<Integer, Integer>>>() {
            @Override
            public void call(Tuple2<String, Tuple2<Integer, Integer>> t) throws Exception {
                System.out.println(t);
            }
        });

        sc.stop();


    }


相关文章:

  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-10-31
  • 2021-08-09
  • 2021-07-31
  • 2021-09-15
  • 2021-12-26
猜你喜欢
  • 2021-08-02
  • 2021-04-23
  • 2022-12-23
  • 2022-12-23
  • 2022-12-23
  • 2021-08-17
相关资源
相似解决方案