【问题标题】:Using a dynamic generator as input in pyspark在 pyspark 中使用动态生成器作为输入
【发布时间】:2017-07-21 00:58:26
【问题描述】:

我正在尝试使用 pyspark 处理一个非常大的语料库,但是我的输入文件不是结构化的“每行一个文档”,所以我不能简单地使用 sc.textFile 直接加载文件。

相反,我使用生成器函数加载文件,每当遇到停止序列时,yields 就会记录该函数。我可以使用sc.parallelize 包装这个生成器,但是这会导致 pyspark 将我的所有数据一次全部加载到 RAM 中,这是我负担不起的。

有没有办法解决这个问题?或者我肯定需要转换我的文本文件?

这基本上是我想要运行的:

def repaired_corpus(path):
    _buffer = ""
    for line in open(path):
        doc_end = line.find(doc_end_pattern)
        if doc_end != -1:
            _buffer += line[:doc_end + len(doc_end_pattern)]
            yield _buffer
            _buffer = ""
        else:
            _buffer += line

some_state = sc.broadcast(my_state)
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path))
json_docs = in_rdd.map(
    lambda item: process_element(
        item, some_state.value
    )
).saveAsTextFile("processed_corpus.out")

【问题讨论】:

  • 在 spark 2.2 中有一个选项可以读取整个文件(或者您可以在以前的版本中读取整个文本文件),这会满足您的需求吗?
  • 我不这么认为,因为我试图永远不会一次将整个数据集真正读入 RAM。
  • 如果文件太大,那么 HDFS(或在节点之间划分文件的任何东西)不会将其拆分为块大小,然后您将在两个节点之间有一条分割线?

标签: python apache-spark pyspark


【解决方案1】:

虽然有点老,但您可以尝试使用答案here

基本上:

rdd = sc.newAPIHadoopFile(path, "org.apache.hadoop.mapreduce.lib.input.TextInputFormat",
            "org.apache.hadoop.io.LongWritable", "org.apache.hadoop.io.Text",
            conf={"textinputformat.record.delimiter": doc_end_pattern}).map(lambda l:l[1])

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2014-12-27
    • 1970-01-01
    • 2016-02-23
    • 1970-01-01
    • 1970-01-01
    • 2020-07-30
    • 2020-08-11
    • 1970-01-01
    相关资源
    最近更新 更多