【问题标题】:How to replace groupBy with more efficient method如何用更有效的方法替换 groupBy
【发布时间】:2019-10-27 18:17:03
【问题描述】:

我的任务是使用 Apache Spark 分析肯尼迪航天中心的日志。该代码正在运行,但我想摆脱 groupBy 操作,因为它成本高。

下面的代码收集带有 5xx 错误代码的请求列表并计算失败的请求。

我的代码

SparkSession session = SparkSession.builder().master("local").appName(application_name).getOrCreate();
JavaSparkContext jsc = new JavaSparkContext(session.sparkContext());
JavaRDD<LogEntry> input = jsc.textFile(hdfs_connect + args[0])
                .map(App::log_entry_extractor)
                .filter(Objects::nonNull);

Dataset<Row> dataSet = session.createDataFrame(input, LogEntry.class);

// task 1
dataSet.filter(col("returnCode").between(500, 599))
                .groupBy("request")
                .count()
                .select("request", "count")
//                .sort(desc("count"))
                .coalesce(1)
                .toJavaRDD()
                .saveAsTextFile(hdfs_connect + output_folder_task_1);

数据示例

199.72.81.55 - - [01/Jul/1995:00:00:01 -0400] "GET /history/apollo/ HTTP/1.0" 200 6245
unicomp6.unicomp.net - - [01/Jul/1995:00:00:06 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
199.120.110.21 - - [01/Jul/1995:00:00:09 -0400] "GET /shuttle/missions/sts-73/mission-sts-73.html HTTP/1.0" 200 4085
burger.letters.com - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/countdown/liftoff.html HTTP/1.0" 304 0
199.120.110.21 - - [01/Jul/1995:00:00:11 -0400] "GET /shuttle/missions/sts-73/sts-73-patch-small.gif HTTP/1.0" 200 4179
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /images/NASA-logosmall.gif HTTP/1.0" 304 0
burger.letters.com - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/video/livevideo.gif HTTP/1.0" 200 0
205.212.115.106 - - [01/Jul/1995:00:00:12 -0400] "GET /shuttle/countdown/countdown.html HTTP/1.0" 200 3985
d104.aa.net - - [01/Jul/1995:00:00:13 -0400] "GET /shuttle/countdown/ HTTP/1.0" 200 3985
129.94.144.152 - - [01/Jul/1995:00:00:13 -0400] "GET / HTTP/1.0" 200 7074

【问题讨论】:

  • 您能否详细说明您想要的输出?你期望得到什么?
  • 代码读取RDD,然后转换为DataFrame,再转回RDD;这种转换可能需要一些时间。例如,所有操作都可以使用 RDD:提取键,然后使用“reduceByKey”。
  • 哦,我可怜的碎椒盐卷饼......我们怎么会在这里得到如此可怕的一堆框架和复杂的代码......你真的是在浪费时间和精力。在java中,我会读取行并计算一个变量(如果需要,同时写入另一个文件)。在linux中你会grep。 awk,wc(通过 ssh 发送的一些整洁的 oneliner。)。这种火花疯狂对于你需要的东西来说太复杂了 10000 倍。

标签: java apache-spark hadoop mapreduce


【解决方案1】:

groupBy 在这种情况下没有任何问题 - DataFrame / Dataset groupBy behaviour/optimization - 也没有真正可行的替代方案。

另一方面,coalesce(1) 在大多数情况下是一种反模式,在最坏的情况下可以turn your process into a sequential one

但是,如果您要进行剧烈的合并,例如对于 numPartitions = 1,这可能会导致您的计算发生在比您喜欢的更少的节点上(例如,在 numPartitions = 1 的情况下为一个节点)。为避免这种情况,您可以调用 repartition。这将添加一个 shuffle 步骤,但意味着当前上游分区将并行执行(无论当前分区是什么)。

考虑将其替换为 repartition(1) 或删除任何内容

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-01-23
    • 1970-01-01
    • 2021-04-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-06-14
    • 2021-08-20
    相关资源
    最近更新 更多