【问题标题】:Split string twice and reduceByKey in Scala在Scala中拆分字符串两次并reduceByKey
【发布时间】:2019-09-20 23:48:01
【问题描述】:

我有一个 .csv 文件,我正在尝试使用 spark 进行分析。 .csv 文件包含主题列表及其计数等。主题及其计数用“,”分隔,所有这些主题+计数都在同一个字符串中,用“;”分隔像这样

"topic_1,10;topic_2,12;topic_1,3"

如您所见,某些主题多次出现在字符串中。

我有一个 rdd,其中包含一些日期的键值对和主题字符串 [日期,主题字符串]

我想做的是在';'处拆分字符串获取所有单独的主题,然后在','处拆分这些主题,并创建主题名称和计数的键值对,可以通过键减少。对于上面的示例,这将是

[date, ((topic_1, 13), (topic_2, 12))]

所以我一直在玩 spark 很多,因为我是 scala 的新手。我试图做的是

val separateTopicsByDate = topicsByDate
  .mapValues(_.split(";").map({case(str) => (str)}))
  .mapValues({case(arr) => arr
    .filter(str => str.split(",").length > 1)
    .map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
  })

问题是这会返回一个元组数组,我不能 reduceByKey。当我在';'处拆分字符串时这将返回一个数组。我尝试将它映射到一个元组(从映射操作中可以看到),但这不起作用。

我使用的完整代码是

val rdd = sc.textFile("./data/segment/*.csv")

val topicsByDate = rdd
  .filter(line => line.split("\t").length > 23)
  .map({case(str) => (str.split("\t")(1), str.split("\t")(23))})
  .reduceByKey(_ + _)

val separateTopicsByDate = topicsByDate
  .mapValues(_.split(";").map({case(str) => (str)}))
  .mapValues({case(arr) => arr
    .filter(str => str.split(",").length > 1)
    .map({case(str) => (str.split(",")(0), str.split(",")(1).toInt)})
  })

separateTopicsByDate.take(2)

这会返回

res42: Array[(String, Array[(String, Int)])] = Array((20150219001500,Array((Cecilia Pedraza,91), (Mexico City,110), (Soviet Union,1019), (Dutch Warmbloods,1236), (Jose Luis Vaquero,1413), (National Equestrian Club,1636), (Lenin Park,1776), (Royal Dutch Sport Horse,2075), (North American,2104), (Western Hemisphere,2246), (Maydet Vega,2800), (Mirach Capital Group,110), (Subrata Roy,403), (New York,820), (Saransh Sharma,945), (Federal Bureau,1440), (San Francisco,1482), (Gregory Wuthrich,1635), (San Francisco,1652), (Dan Levine,2309), (Emily Flitter,2327), (K...

如您所见,这是一个元组数组,我无法在其上使用 .reduceByKey(_ + _)。

有没有办法将字符串拆分为可以通过键减少的方式?

【问题讨论】:

    标签: scala apache-spark


    【解决方案1】:

    如果你的 RDD 有如下行:

    [date, "topic1,10;topic2,12;topic1,3"]  
    

    您可以使用flatMap 拆分值并将行分解为:

    [date, ["topic1,10", "topic2,12", "topic1,3"]] ->
    
    [date, "topic1,10"]  
    [date, "topic2,12"]  
    [date, "topic1,3"]
    

    然后将每一行转换为 [String,Integer] 元组(代码中的rdd1):

    ["date_topic1",10]  
    ["date_topic2",12]  
    ["date_topic1",3]
    

    并使用加法键减少(代码中的rdd2):

    ["date_topic1",13]  
    ["date_topic2",12]  
    

    然后你将日期与主题分开,并将主题与值结合起来,得到 [String,String] 元组,如:

    ["date", "topic1,13"]  
    ["date", "topic2,12"]  
    

    最后将值拆分为 [topic,count] 元组,准备 ["date", [(topic,count)]] 对(代码中为 rdd3)并通过 Key 减少(代码中为 rdd4),得到:

    ["date", [(topic1, 13), (topic2, 12)]]
    

    ===
    下面是四个中间 RDD 序列的 Java 实现,您可以参考它进行 Scala 开发:

        JavaPairRDD<String, String> rdd;     //original data. contains [date, "topic1,10;topic2,12;topic1,3"] 
    
        JavaPairRDD<String, Integer> rdd1 =  //contains
                                             //["date_topic1",10]  
                                             //["date_topic2",12]  
                                             //["date_topic1",3]
    
    
                rdd.flatMapToPair(
    
                    pair -> //pair=[date, "topic1,10;topic2,12;topic1,3"]
                    {
    
                        List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
    
                        String k = pair._1; //date
                        String v = pair._2; //"topic,count;topic,count;topic,count"
    
                        String[] v_splits = v.split(";");
    
                        for(int i=0; i<v_splits.length; i++)
                        {
                            String[] v_split_topic_count = v_splits[i].split(",");  //"topic,count"
    
                            list.add(new Tuple2<String,Integer>(k + "_" + v_split_topic_count[0], Integer.parseInt(v_split_topic_count[1]))); //"date_topic,count"
                        }
    
                        return list.iterator();
                    }//end call
    
                );
    
    
        JavaPairRDD<String,Integer> rdd2 = //contains
                                           //["date_topic1",13]  
                                           //["date_topic2",12]  
    
    
               rdd1.reduceByKey((Integer i1, Integer i2) -> i1+i2);     
    
    
        JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd3 = //contains
                                                                    //["date", [(topic1,13)]]  
                                                                    //["date", [(topic2,12)]]  
    
               rdd2.mapToPair(
    
                    pair -> //["date_topic1",13]
                    {
                        String k  = pair._1; //date_topic1
                        Integer v = pair._2; //13
    
    
                        String[] dateTopicSplits = k.split("_");
    
                        String new_k = dateTopicSplits[0]; //date                    
    
                        List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
                        list.add(new Tuple2<String,Integer>(dateTopicSplits[1], v)); //[(topic1,13)]
    
                        return new Tuple2<String,Iterator<Tuple2<String,Integer>>>(new_k, list.iterator());
                    }
    
               );
    
        JavaPairRDD<String,Iterator<Tuple2<String,Integer>>> rdd4 = //contains
                                                                    //["date", [(topic1, 13), (topic2, 12)]]
    
                rdd3.reduceByKey(
    
                (Iterator<Tuple2<String,Integer>> itr1, Iterator<Tuple2<String,Integer>> itr2) ->
                {
                   List<Tuple2<String,Integer>> list = new ArrayList<Tuple2<String,Integer>>();
    
                   while(itr1.hasNext())
                         list.add(itr1.next());
    
                   while(itr2.hasNext())
                         list.add(itr2.next());
    
                   return list.iterator();
                }
    
                );
    

    UPD。这个问题实际上可以通过仅使用单个 map 来解决 - 您将行值(即主题字符串)拆分为 ;,因此它为您提供 [key,value] 对作为 [topic,count] 并且您填充哈希图这些对加起来计数。最后,您输出 date 键,其中包含在哈希图中累积的所有不同键及其值。
    这种方式似乎也更有效,因为 hashmap 的大小不会大于原始行的大小,因此 mapper 消耗的内存空间将受到最大行大小的限制,而在 flatmap 解决方案中,内存应该足够大以容纳所有这些扩展行

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2022-01-25
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2016-04-14
      • 1970-01-01
      相关资源
      最近更新 更多