【发布时间】:2019-01-19 20:49:10
【问题描述】:
我正在使用 pyspark 处理数据并生成一些指标(大约 25/30)。生成彼此独立的每个指标。由于公司限制,我无法粘贴代码。但是下面提到了我的代码流程
def metric1_job():
some operations
Write data from above df
def metric2_job()
some operations
Write data from above df
def metric3_job()
.
.
.
def metric25_job()
some operations
Write data from above df
if __name__ == "__main__":
Read Df 1
Read Df 2
Read Df 3
Read Df 4
Read Df 5
Some operations on above Df.
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
metric1_job(df1, df2, df3, df4, df5)
.
.
.
metric25_job(df1, df2, df3, df4, df5)
现在 pyspark 在每个函数中写入时停止执行,然后在其他函数中开始处理 DAG。所有这些功能都是 DAG,彼此不依赖。一个明显的解决方案是将然后拆分为单独的文件并作为单独的作业运行。但这对我来说是不可用的。有人可以告诉我如何让 spark 并行运行这些 DAG 并同时并行编写。
非常感谢任何帮助。由于上述工作的串行处理花费了太多时间
提前致谢
马尼什
【问题讨论】:
-
这些不是真正的工作。你在缓存东西吗?并行性是 SPARK 所固有的,如果我对 DF 进行 UNION,我可以在 DAG 输出中清楚地看到并行使用的 4DF。我想你每次都会去采购。请确认。
-
否 我的 dfs 已缓存。请理解我没有做任何工会。我知道火花中存在并行性。在我的情况下,每个函数都是单独的 DAG,它将数据保存到不同的位置。但是当我们将数据保存到 df.write.format("com.databricks.spark.avro").save(output_location) 时它会等待
-
这只是 UNION 的一个例子。有趣的。你是如何开始这份工作的?提供的链接有用吗?我问,因为我更像是一个 SCALA 人。不知道该怎么做,但是如果您显示代码会有所帮助,但这是不可能的。成功
-
不,它没那么有用。使用 scala 我没有遇到这个问题。我面临 pyspark 的问题,考虑到外部因素,这是一个硬性要求。调用这个函数多进程有帮助吗?
-
但是您如何提交作业?我只在 spark-shell 中使用过 pyspark,而不是使用 YARN
标签: apache-spark pyspark