【问题标题】:Spark: Load multiple files, analyze individually, merge results, and saveSpark:加载多个文件,单独分析,合并结果,保存
【发布时间】:2019-04-14 16:25:02
【问题描述】:

我是 Spark 的新手,不知道如何问这个问题(使用哪些术语等),所以这是我在概念上试图完成的图片:

我有很多小的、单独的 .txt“分类帐”文件(例如,当时带有时间戳和属性值的行分隔文件)。

我想:

  1. 将每个“分类帐”文件读入单独的数据帧(读取:不合并为一个,大数据帧);

  2. 对每个单独的数据框执行一些基本计算,从而产生一行新的数据值;然后

  3. 将所有单独的结果行合并到最终对象中,并以行分隔文件的形式将其保存到磁盘。

似乎我找到的几乎每个答案(在谷歌搜索相关术语时)都是关于将多个文件加载到单个 RDD 或 DataFrame 中,但我确实找到了这个 Scala 代码:

val data = sc.wholeTextFiles("HDFS_PATH")
val files = data.map { case (filename, content) => filename}
def doSomething(file: String) = { 
println (file);

 // your logic of processing a single file comes here

 val logData = sc.textFile(file);
 val numAs = logData.filter(line => line.contains("a")).count();
 println("Lines with a: %s".format(numAs));

 // save rdd of single file processed data to hdfs comes here
}

files.collect.foreach( filename => {
    doSomething(filename)
})

...但是:

A.我不知道这是否会并行读取/分析操作,并且

B.我不认为它可以将结果合并到一个对象中。

非常感谢任何方向或建议!

更新

似乎我正在尝试做的事情(在多个文件上并行运行脚本,然后合并结果)可能需要thread pools (?) 之类的东西。

为了清楚起见,下面是我想对通过读取“分类帐”文件创建的 DataFrame 执行的计算示例:

from dateutil.relativedelta import relativedelta
from datetime import datetime
from pyspark.sql.functions import to_timestamp

# Read "ledger file"
df = spark.read.json("/path/to/ledger-filename.txt")

# Convert string ==> timestamp & sort
df = (df.withColumn("timestamp", to_timestamp(df.timestamp, 'yyyy-MM-dd HH:mm:ss'))).sort('timestamp')

columns_with_age = ("location", "status")
columns_without_age = ("wh_id")

# Get the most-recent values (from the last row of the df)
row_count = df.count()
last_row = df.collect()[row_count-1]

# Create an empty "final row" dictionary
final_row = {}

# For each column for which we want to calculate an age value ...
for c in columns_with_age:

    # Initialize loop values
    target_value = last_row.__getitem__(c)
    final_row[c] = target_value
    timestamp_at_lookback = last_row.__getitem__("timestamp")
    look_back = 1
    different = False

    while not different:
        previous_row = df.collect()[row_count - 1 - look_back]
        if previous_row.__getitem__(c) == target_value:
            timestamp_at_lookback = previous_row.__getitem__("timestamp")
            look_back += 1

        else:
            different = True

    # At this point, a difference has been found, so calculate the age
    final_row["days_in_{}".format(c)] = relativedelta(datetime.now(), timestamp_at_lookback).days

因此,这样的账本:

+---------+------+-------------------+-----+
| location|status|          timestamp|wh_id|
+---------+------+-------------------+-----+
|  PUTAWAY|     I|2019-04-01 03:14:00|   20|
|PICKABLE1|     X|2019-04-01 04:24:00|   20|
|PICKABLE2|     X|2019-04-01 05:33:00|   20|
|PICKABLE2|     A|2019-04-01 06:42:00|   20|
|  HOTPICK|     A|2019-04-10 05:51:00|   20|
| ICEXCEPT|     A|2019-04-10 07:04:00|   20|
| ICEXCEPT|     X|2019-04-11 09:28:00|   20|
+---------+------+-------------------+-----+

将减少到(假设计算在 2019-04-14 运行):

{ '_id': 'ledger-filename', 'location': 'ICEXCEPT', 'days_in_location': 4, 'status': 'X', 'days_in_status': 3, 'wh_id': 20 }

【问题讨论】:

  • 您的文件的典型大小是多少?我认为使用 spark 单独分析您的文件可能不是一个好主意,因为您需要几个 collect(这会大大降低 spark 的性能)。但是您可以在单个内核上与 ThreadPool 并行分析它们,然后将它们按顺序合并到 spark 数据帧中,这适合您吗? :)
  • 一位数和两位数千字节。我认为您可能是对的……也许只是 PyArrow 和 Pandas 用于从 HDFS 摄取、初始处理和输出到 HDFS。然后使用 Spark 摄取和处理更大的合并文件。
  • 是的,我真的认为这是最好的方法。如果只是 kb 的问题,您将通过初始化 spark 会话来浪费大量时间,我认为那里只需要 hdfs api
  • 如果他们有同样的问题,你绝对应该让这个对话让其他人看到。

标签: python apache-spark pyspark hdfs


【解决方案1】:

不推荐使用wholeTextFiles,因为它会将整个文件一次加载到内存中。如果您真的想为每个文件创建一个单独的数据框,您可以简单地使用完整路径而不是目录。但是,不建议这样做,并且很可能会导致资源利用率低下。相反,请考虑使用input_file_path https://spark.apache.org/docs/2.4.0/api/java/org/apache/spark/sql/functions.html#input_file_name--

例如:

spark
.read
  .textFile("path/to/files")
  .withColumn("file", input_file_name())
  .filter($"value" like "%a%")
  .groupBy($"file")
  .agg(count($"value"))
  .show(10, false)
+----------------------------+------------+
|file                        |count(value)|
+----------------------------+------------+
|path/to/files/1.txt         |2           |
|path/to/files/2.txt         |4           |
+----------------------------+------------+

因此可以单独处理文件,然后再合并。

【讨论】:

    【解决方案2】:

    您可以在 hdfs 中获取文件路径

    import  org.apache.hadoop.fs.{FileSystem,Path}
    
    val files=FileSystem.get( sc.hadoopConfiguration ).listStatus( new Path(your_path)).map( x => x.getPath ).map(x=> "hdfs://"+x.toUri().getRawPath())
    

    为每条路径创建一个唯一的数据框

    val arr_df= files.map(spark.read.format("csv").option("delimeter", ",").option("header", true).load(_))
    

    在联合到一个数据帧之前应用您的过滤器或任何转换

    val df= arr_df.map(x=> x.where(your_filter)).reduce(_ union _)
    

    【讨论】:

      猜你喜欢
      • 2018-03-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-11-05
      • 1970-01-01
      相关资源
      最近更新 更多