【问题标题】:How to process Map<Key,value > in spark?如何在 Spark 中处理 Map<Key,value>?
【发布时间】:2017-09-16 14:32:17
【问题描述】:

我是 Spark 编程的新手,我试图找出一个字符串在文件中针对某个键出现的次数。 这是我的输入:

-------------
2017-04-13 15:56:57.147::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::5::0::
2017-04-13 15:57:01.008::ProductSelectPanel::1599::PRODUCT_SALE_WITH_BARCODE::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::4::1::1013065197
2017-04-13 15:57:09.182::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:15.153::ProductSelectPanel::1121::NO_STOCK_PRODUCT::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0::0::
2017-04-13 15:57:19.696::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:23.190::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CALP0005::CALPOL 500MG TAB::110::0::
2017-04-13 15:56:57.147::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::5::0::
2017-04-13 15:57:01.008::ProductSelectPanel::1599::PRODUCT_SALE_WITH_BARCODE::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::4::1::1013065197
2017-04-13 15:57:09.182::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:15.153::ProductSelectPanel::1121::NO_STOCK_PRODUCT::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0::0::
2017-04-13 15:57:19.696::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
2017-04-13 15:57:23.190::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CALP0005::CALPOL 500MG TAB::110::0::
2017-04-13 15:56:57.147::ProductSelectPanel::1291::PRODUCT_SALE_ENTRY::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::5::0::
2017-04-13 15:57:01.008::ProductSelectPanel::1599::PRODUCT_SALE_WITH_BARCODE::INAPHYD00124::1::CROC0008::CROCIN 120MG 60ML SYP::4::1::1013065197
2017-04-13 15:57:09.182::ProductSelectPanel::1118::ALTERNATIVE_PRODUCT_ENTRY::INAPHYD00124::1::CROC0005::CROCIN 500MG TAB::0
.......

我的 Spark 程序是这样的。

final Function<String, List<String>> LINE_MAPPER=new Function<String, List<String>>() {

            @Override
            public List<String> call(String line) throws Exception {
                String[] lineArary=line.split("::");
                return Arrays.asList(lineArary[3],lineArary[6]);
            }
        };
        final PairFunction<String, String, Integer> word_paper=new PairFunction<String, String, Integer>() {

            @Override
            public Tuple2<String, Integer> call(String word) throws Exception {

                return new Tuple2<String, Integer>(word, Integer.valueOf(1));
            }
        };
        JavaRDD<List<String>> javaRDD =lineRDD.map(LINE_MAPPER);


After doing map transformation  i am getting like this:

[[PRODUCT_SALE_ENTRY,CROC0008],[NO_STOCK_PRODUCT,CROC0005],[NO_STOCK_PRODUCT,CROC0005],[PRODUCT_SALE_WITH_BARCODE,CROC0008],[PRODUCT_SALE_WITH_BARCODE,CROC0005],[PRODUCT_SALE_WITH_BARCODE,CROC003],....]

but i want the result like..
[[NO_STOCK_PRODUCT,[CROC0005,4]],[PRODUCT_SALE_WITH_BARCODE,[CROC0008,2]],[PRODUCT_SALE_WITH_BARCODE,[CROC0005,1]],....]

请帮助我。 提前致谢。

【问题讨论】:

    标签: hadoop dictionary apache-spark bigdata


    【解决方案1】:

    看起来您需要将每个键+字符串对视为一个组合键,并计算该组合键的出现次数。

    您可以使用countByValue() 执行类似的操作(请参阅JavaDoc)。但是,正如文档所述:

    请注意,只有在生成的地图是 预计很小,因为整个东西都加载到驱动程序的 记忆。要处理非常大的结果,请考虑使用 rdd.map(x => (x, 1L)).reduceByKey(_ + _)...

    因此,只需将map 您的每个值(例如[PRODUCT_SALE_ENTRY,CROC0008] 转换为一对形式((PRODUCT_SALE_ENTRY,CROC0008), 1L),然后reduceByKey()(例如here)。

    我只在 Scala 中做过这个,而不是 Java - 我认为你可能需要使用 mapToPair() 例如如图here。这将给出以下形式的 RDD:

    ((NO_STOCK_PRODUCT,CROC0005), 4),
    ((PRODUCT_SALE_WITH_BARCODE,CROC0008), 2),
    ((PRODUCT_SALE_WITH_BARCODE,CROC0005), 1),
    ...
    

    这与您的要求接近。

    【讨论】:

    • maptoPair 之后我得到的输出为:
    • maptoPair 后我得到的输出为:[(ALTERNATIVE_PRODUCT_ENTRY,[CROC0005]), (NO_STOCK_PRODUCT,[CROC0005]), (PRODUCT_SALE_ENTRY,[CROC0008]), (PRODUCT_SALE_WITH_BARCODE,[CROC0008])] 我的代码:JavaPairRDD> javapRDD=lineRDD.mapToPair(obj -> { String[] split = obj.split("::"); return new Tuple2(split[3], Arrays.asList(new字符串[]{split[6]})); });
    • 怎么弄到这个
    • 我怎样才能得到这个:((NO_STOCK_PRODUCT,CROC0005), 4), ((PRODUCT_SALE_WITH_BARCODE,CROC0008), 2), ((PRODUCT_SALE_WITH_BARCODE,CROC0005), 1), ...
    【解决方案2】:
    Thank you DNA, Its works great.
    finally my code like that:
    
    JavaPairRDD<String, String> keyValuePairs = lineRDD.mapToPair(obj -> {
                String[] split = obj.split("::");
                return new Tuple2<String, String>(split[3],split[6]);
            });
    
             JavaPairRDD<Tuple2<String, String>, Integer> newRFDD=keyValuePairs.mapToPair(obj -> {
                return new Tuple2<Tuple2<String, String>, Integer>(new Tuple2<>(obj._1, obj._2),1);
            });
             JavaPairRDD<Tuple2<String, String>, Integer> result = newRFDD.reduceByKey((v1, v2) -> {
                    return v1+v2;
                });
             result.map(f->{ return f._1._2()+"\t"+f._2()+"\t"+f._1._1(); }).saveAsTextFile("file:///home/charan/offlinefiles/result");
             System.out.println("result :"+result.take(10));
    
    and output would be:
    
    CROC0005    620 NO_STOCK_PRODUCT
    CROC2107    15  PRODUCT_SALE_ENTRY
    CROC2120    7   NO_STOCK_PRODUCT
    CROC0229    2   NO_STOCK_PRODUCT
    CROC0009    1   NO_STOCK_PRODUCT
    CROC0005    1250    ALTERNATIVE_PRODUCT_ENTRY
    CROC2302    2   ALTERNATIVE_PRODUCT_ENTRY
    CROC2807    5   PRODUCT_SALE_ENTRY
    CROC0213    2   ALTERNATIVE_PRODUCT_ENTRY
    CROC20221 18    ALTERNATIVE_PRODUCT_ENTRY.
    

    【讨论】:

      猜你喜欢
      • 2016-07-14
      • 1970-01-01
      • 1970-01-01
      • 2015-03-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2021-03-17
      • 2011-12-14
      相关资源
      最近更新 更多