【问题标题】:PySpark + Google Cloud Storage (wholeTextFiles)PySpark + 谷歌云存储(wholeTextFiles)
【发布时间】:2016-07-12 07:45:25
【问题描述】:

我正在尝试使用 PySpark (Google Dataproc) 解析大约 100 万个 HTML 文件,并将相关字段写入压缩文件。每个 HTML 文件大约 200KB。因此,所有数据约为 200GB。

如果我使用数据的子集,下面的代码可以正常工作,但运行几个小时,然后在整个数据集上运行时崩溃。此外,工作节点未使用(

我相信系统会因从 GCS 中提取数据而窒息。有一个更好的方法吗?另外,当我以这种方式使用 wholeTextFiles 时,主程序是否会尝试下载所有文件然后将它们发送给执行程序,还是让执行程序下载它们?

def my_func(keyval):
   keyval = (file_name, file_str)
   return parser(file_str).__dict__

data = sc.wholeTextFiles("gs://data/*")
output = data.map(my_func)
output.saveAsTextFile("gs://results/a")

【问题讨论】:

  • 任何错误消息、堆栈跟踪等都会有所帮助。 master 不会读取所有包含的数据,但它会在开始工作之前获取所有输入文件的状态。默认情况下,Dataproc 将属性“mapreduce.input.fileinputformat.list-status.num-threads”设置为 20,以帮助缩短此查找的时间,但在 GCS 中仍会为每个文件执行 RPC。进一步改进查找的一种方法是通过 spark 执行某些查找逻辑,方法是创建一个包含文件前缀的 RDD,使用 flatMap 将这些前缀转换为文件名,然后将文件名映射到文件内容。
  • 好的。假设我按照您的建议创建了一个文件名的 RDD。我应该如何将此文件名映射到文件内容?我不能在执行程序中调用 sc.wholeTextFile。我可以在执行程序中使用 boto API 来下载文件。我试过这个,但它更慢。我怀疑 boto API 在每个请求上都有很多身份验证开销。

标签: google-cloud-storage google-compute-engine pyspark google-cloud-dataproc


【解决方案1】:

为了回答您的问题,master 不会读取所有包含的数据,但会在开始工作之前获取所有输入文件的状态。默认情况下,Dataproc 将属性“mapreduce.input.fileinputformat.list-status.num-threads”设置为 20,以帮助缩短此查找的时间,但在 GCS 中仍会为每个文件执行 RPC。

您似乎发现了一个案例,即使添加线程也无济于事,只是让驱动程序更快地进入 OOM。

关于如何并行化读取,我有两个想法。

但首先,有一点警告:这些解决方案都不是非常健壮的,因为它们对包含在 glob 中的目录非常健壮。您可能希望防止目录出现在要读取的文件列表中。

第一个是使用 python 和 hadoop 命令行工具完成的(这也可以使用 gsutil 完成)。下面是一个例子,它可能看起来如何并在工作人员上执行文件列表,将文件内容成对读取并最终计算(文件名,文件长度)对:

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext

import sys
import subprocess


def hadoop_ls(file_glob):
  lines = subprocess.check_output(["/usr/bin/hadoop", "fs", "-ls", file_glob]).split("\n")
  files = [line.split()[7] for line in lines if len(line) > 0]
  return files

def hadoop_cat(file):
  return subprocess.check_output(["/usr/bin/hadoop", "fs", "-cat", file]).decode("utf-8")

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  # This is just for testing. You'll want to generate a list 
  # of prefix globs instead of having a list passed in from the 
  # command line.
  globs = sys.argv[1:]
  # Desired listing partition count
  lpc = 100
  # Desired 'cat' partition count, should be less than total number of files
  cpc = 1000
  files = sc.parallelize(globs).repartition(lpc).flatMap(hadoop_ls)
  files_and_content = files.repartition(cpc).map(lambda f: [f, hadoop_cat(f)])
  files_and_char_count = files_and_content.map(lambda p: [p[0], len(p[1])])
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

我会首先从这个子流程解决方案开始,并尝试对 hadoop_ls 和 hadoop_cat 调用进行分区,看看你是否能得到一些可以接受的东西。

第二种解决方案更复杂,但可能会通过避免很多很多 exec 调用来产生一个性能更高的管道。

在第二个解决方案中,我们将编译一个特殊用途的帮助程序 jar,使用初始化操作将该 jar 复制到所有工作人员,最后使用我们驱动程序中的帮助程序。

我们的 scala jar 项目的最终目录结构如下所示:

helper/src/main/scala/com/google/cloud/dataproc/support/PysparkHelper.scala
helper/build.sbt

在我们的 PysparkHelper.scala 文件中,我们将有一个小的 scala 类,其功能与我们上面的纯 Python 解决方案一样。首先,我们将创建一个文件 glob 的 RDD,然后是文件名的 RDD,最后是文件名和文件内容对的 RDD。

package com.google.cloud.dataproc.support

import collection.JavaConversions._

import org.apache.commons.io.IOUtils
import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.apache.spark.api.java.{JavaPairRDD, JavaSparkContext}

import java.util.ArrayList
import java.nio.charset.StandardCharsets

class PysparkHelper extends Serializable {
  def wholeTextFiles(
    context: JavaSparkContext,
    paths: ArrayList[String],
    partitions: Int): JavaPairRDD[String, String] = {

    val globRDD = context.sc.parallelize(paths).repartition(partitions)
    // map globs to file names:
    val filenameRDD = globRDD.flatMap(glob => {
      val path = new Path(glob)
      val fs: FileSystem = path.getFileSystem(new Configuration)
      val statuses = fs.globStatus(path)
      statuses.map(s => s.getPath.toString)
    })
    // Map file name to (name, content) pairs:
    // TODO: Consider adding a second parititon count parameter to repartition before
    // the below map.
    val fileNameContentRDD = filenameRDD.map(f => {
      Pair(f, readPath(f, new Configuration))
    })

    new JavaPairRDD(fileNameContentRDD)
  }

  def readPath(file: String, conf: Configuration) = {
    val path = new Path(file)
    val fs: FileSystem = path.getFileSystem(conf)
    val stream = fs.open(path)
    try {
      IOUtils.toString(stream, StandardCharsets.UTF_8)
    } finally {
      stream.close()
    }
  }
}

helper/build.sbt 文件看起来像这样:

organization := "com.google.cloud.dataproc.support"
name := "pyspark_support"
version := "0.1"
scalaVersion := "2.10.5"
libraryDependencies +=  "org.apache.spark" % "spark-core_2.10" % "1.6.0" % "provided"
libraryDependencies +=  "org.apache.hadoop" % "hadoop-common" % "2.7.1" % "provided"
exportJars := true

然后我们可以使用 sbt 构建帮助器:

$ cd helper && sbt package

输出帮助 jar 应该是 target/scala-2.10/pyspark_support_2.10-0.1.jar

我们现在需要将这个 jar 放到我们的集群中,为此,我们需要做两件事:1)将 jar 上传到 GCS 和 2)在 GCS 中创建一个初始化操作以将 jar 复制到集群节点。

为了便于说明,我们假设您的存储桶名为 MY_BUCKET(在此处插入适当的海象相关 meme)。

$ gsutil cp target/scala-2.10/pyspark_support_2.10-0.1.jar gs://MY_BUCKET/pyspark_support.jar

创建一个初始化操作(我们称之为 pyspark_init_action.sh,根据需要替换 MY_BUCKET):

#!/bin/bash

gsutil cp gs://MY_BUCKET/pyspark_support.jar /usr/lib/hadoop/lib/

最后将初始化动作上传到GCS:

$ gsutil cp pyspark_init_action.sh gs://MY_BUCKET/pyspark_init_action.sh

现在可以通过将以下标志传递给 gcloud 来启动集群:

--initialization-actions gs://MY_BUCKET/pyspark_init_action.sh

在构建、上传和安装我们的新库之后,我们终于可以从 pyspark 中使用它了:

from __future__ import print_function

from pyspark.rdd import RDD
from pyspark import SparkContext
from pyspark.serializers import PairDeserializer, UTF8Deserializer

import sys

class DataprocUtils(object):

  @staticmethod
  def wholeTextFiles(sc, glob_list, partitions):
    """
    Read whole text file content from GCS.
    :param sc: Spark context
    :param glob_list: List of globs, each glob should be a prefix for part of the dataset.
    :param partitions: number of partitions to use when creating the RDD
    :return: RDD of filename, filecontent pairs.
    """
    helper = sc._jvm.com.google.cloud.dataproc.support.PysparkHelper()
    return RDD(helper.wholeTextFiles(sc._jsc, glob_list, partitions), sc,
               PairDeserializer(UTF8Deserializer(), UTF8Deserializer()))

if __name__ == "__main__":
  if len(sys.argv) < 2:
    print("Provide a list of path globs to read.")
    exit(-1)

  sc = SparkContext()
  globs = sys.argv[1:]
  partitions = 10
  files_and_content = DataprocUtils.wholeTextFiles(sc, globs, partitions)
  files_and_char_count = files_and_content.map(lambda p: (p[0], len(p[1])))
  local = files_and_char_count.collect()
  for pair in local:
    print("File {} had {} chars".format(pair[0], pair[1]))

【讨论】:

    【解决方案2】:

    谢谢!我尝试了第一种方法。它可以工作,但由于 exec 调用和 RPC/auth 开销,性能不是很好。在 32 个节点的集群上运行大约需要 10 个小时。我能够在 4 节点集群上使用带有 Amazon s3 连接器的数据块在 30 分钟内运行它。那里的开销似乎要少得多。我希望 Google 能够提供一种更好的方法来将数据从 GCS 提取到 Spark。

    【讨论】:

    • 我想多看看这个; 10 小时似乎太高了——你能分享你的文件/目录的布局吗(例如,是单个目录中有 100 万个对象,还是 1000 个目录,每个目录有 1000 个对象,等等)。另外 - 你能分享一下你 glob 的一般形式吗 - (例如,/bucket/*/dir/* 或其他东西)?
    猜你喜欢
    • 2020-09-10
    • 2016-10-12
    • 1970-01-01
    • 1970-01-01
    • 2012-08-19
    • 2017-01-11
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多