【问题标题】:Java Spark count column's element frequenciesJava Spark 计数列的元素频率
【发布时间】:2018-03-23 01:30:43
【问题描述】:

我正在尝试为输入数据集 (csv) 获取 Map<String, Map<String, Long>>,该输入数据集 (csv) 对 Map 的每个元素(数据集的列)都有一个 Map,其中元素存在于各自的列中,并带有它们的出现次数。 所以有一个这样的示例输入:

col1,col2,col3
a,1,c6
ab,23,c6
cd,23,c8
a,1,x

我的输出应该是这样的:

{col1:{a:2, ab:1, cd:1}},
{col2:{1:2, 23:2}},
{col3:{c6:2, c8:1, x:1}}

我有一种方法可以单独获取每一列并使用“countByValue”将元素计数为 Map,然后将每个 Map 作为值存储在列的 Map 中。 现在我正在考虑一种通过读取文件一次来加快计算速度的方法,并且我尝试在我的文件上使用“flatMapToPair”函数:

JavaRDD<String> fileRdd

像这样:

JavaPairRDD<String, String> res = fileRdd.flatMapToPair(
    new PairFlatMapFunction<String, String, String>() { 
        public Iterator<Tuple2<String, String>> call(String x) {
            List<Tuple2<String, String>> res = new ArrayList<>();
            List<String> d =  Arrays.asList(x.split(",(?=(?:[^\"]*\"[^\"]*\")*[^\"]*$)", -1));
            for (int i = 0; i < columns.size(); i++) {
                res.add(new Tuple2<String, String>(columns.get(i), d.get(i)));
            }
            return res.iterator();
        }
});

然后groupingByKey:

JavaPairRDD<String,Iterable<String>> groupMap = res.groupByKey();

现在我有这样的结果:

col1:[a,ab,cd,a]

而且我认为我需要另一个 map reduce 步骤来计算出现次数,所以这可能不是实现目标的最佳方式...

另外我注意到,仅对 200MB 的文件进行第一次 flatMapToPair 计算后,我在处理同一文件的时间比之前的计算时间长后内存不足,所以我可能对 flatMapToPair 做错了。

【问题讨论】:

    标签: java csv apache-spark dataset


    【解决方案1】:

    如果您使用 DataFrame 而不是 RDD,则有一个简单的解决方案。

    //import com.fasterxml.jackson.core.JsonGenerator;
    //import com.fasterxml.jackson.core.JsonParseException;
    //import com.fasterxml.jackson.core.JsonProcessingException;
    //import com.fasterxml.jackson.core.type.TypeReference;
    //import com.fasterxml.jackson.databind.JsonMappingException;
    //import com.fasterxml.jackson.databind.ObjectMapper;
    
    // Read CSV
    Dataset<Row> df = spark.read().csv(fileName);
    // Initialize ObjectMapper
    ObjectMapper mapper = new ObjectMapper();
    mapper.configure(JsonGenerator.Feature.QUOTE_FIELD_NAMES, false);
    
    // Map for collecting column information
    Map<String, Map<String,Long>> columnCountMap = new HashMap<String, Map<String,Long>>();
    for (String columnName : df.columns())
        {
            // Group and count using groupBy function 
            // and then convert to JSON and collect as List
            List<String> jsons = df.groupBy(columnName).count().toJSON().collectAsList();
            try
                {
                    Map<String,Long> countMap = new HashMap<String, Long>();
                    // Iterate through the strings/rows; 
                    // map it to Map then collect values;
                    // put them into the countMap
                    for (String json : jsons)
                        {
                            Map<String, Object> map = mapper.readValue(json, new TypeReference<Map<String, String>>(){});
                            String[] keyValues = map.values().toArray(new String[map.values().size()]);
                            countMap.put(keyValues[0], Long.parseLong(keyValues[1]));
    
                        }
                    columnCountMap.put(columnName, countMap);
                }
            catch (JsonParseException e)
                {
                    e.printStackTrace();
                }
            catch (JsonMappingException e)
                {
                    e.printStackTrace();
                }
            catch (IOException e)
                {
                    e.printStackTrace();
                }
        }
    String output = "":
    try
    {
            // If you need to output as {col1:{a:2, ab:1, cd:1}},
            // {col2:{1:2, 23:2}},
            // {col3:{c6:2, c8:1, x:1}}
            output = mapper.writeValueAsString(columnCountMap);
    }
    catch (JsonProcessingException e)
    {
           e.printStackTrace();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2021-08-19
      • 2021-07-28
      • 1970-01-01
      • 2017-03-01
      • 1970-01-01
      • 1970-01-01
      • 2021-12-04
      • 2011-01-10
      相关资源
      最近更新 更多