【发布时间】:2017-07-21 00:58:26
【问题描述】:
我正在尝试使用 pyspark 处理一个非常大的语料库,但是我的输入文件不是结构化的“每行一个文档”,所以我不能简单地使用 sc.textFile 直接加载文件。
相反,我使用生成器函数加载文件,每当遇到停止序列时,yields 就会记录该函数。我可以使用sc.parallelize 包装这个生成器,但是这会导致 pyspark 将我的所有数据一次全部加载到 RAM 中,这是我负担不起的。
有没有办法解决这个问题?或者我肯定需要转换我的文本文件?
这基本上是我想要运行的:
def repaired_corpus(path):
_buffer = ""
for line in open(path):
doc_end = line.find(doc_end_pattern)
if doc_end != -1:
_buffer += line[:doc_end + len(doc_end_pattern)]
yield _buffer
_buffer = ""
else:
_buffer += line
some_state = sc.broadcast(my_state)
in_rdd = spark.sparkContext.parallelize(repaired_corpus(path))
json_docs = in_rdd.map(
lambda item: process_element(
item, some_state.value
)
).saveAsTextFile("processed_corpus.out")
【问题讨论】:
-
在 spark 2.2 中有一个选项可以读取整个文件(或者您可以在以前的版本中读取整个文本文件),这会满足您的需求吗?
-
我不这么认为,因为我试图永远不会一次将整个数据集真正读入 RAM。
-
如果文件太大,那么 HDFS(或在节点之间划分文件的任何东西)不会将其拆分为块大小,然后您将在两个节点之间有一条分割线?
标签: python apache-spark pyspark