【发布时间】:2021-07-22 14:30:01
【问题描述】:
我有一个 bitnami spark docker 基础设施(一个主人和一个工人)。
Spark 读取 Kafka 流。
stream_df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers", "kafka1:19091")\
.option("subscribe", "Aleca")\
.option("startingOffsets", "earliest")\
.load()
使用 select、filter 修改 stream_df .....
并将stream_df写入parquet文件。
df_edge.writeStream\
.format("parquet")\
.option("checkpointLocation", "/tmp/edge/check")\
.option("path", "/tmp/edge/data")\
.trigger(processingTime='10 seconds')\
.start()\
.awaitTermination()
我使用 Kafka 发送数据,当检查目录“/tmp/edge/data”时,我只有一个目录 _spark_metadata。
在这个目录中,我有带有快速文件路径的 json 文件。但 snappy 并没有被创建。
从另一个 docker 容器,我尝试读取 parquet 文件。
spark = SparkSession.builder\
.appName('Flask_gunicorn') \
.master('spark://0.0.0.0:7077') \
.config('spark.jars.packages', 'graphframes:graphframes:0.8.1-spark3.0-s_2.12') \
.config('spark.submit.deployMode', 'client') \
.config('spark.executor.memory', '1g') \
.config('spark.cores.max', '1') \
.config('spark.jars.ivy', '/opt/bitnami/spark/ivy') \
.config('spark.jars', '/opt/bitnami/spark/jars') \
.getOrCreate()
edge_df = spark.read.csv(edge_location)
读取返回错误:
Traceback (most recent call last):
File "/usr/src/app/apao-flask-gunicorn/graph_generator.py", line 22, in <module>
vertex_df = spark.read.parquet(edge_location)
File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 353, in parquet
File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco
File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: Path does not exist: file:/tmp/edge/parquet;
如果我尝试使用 spark shell 读取: 我有一个错误文件.....snappy.parquet 不存在。
但是,如果提交一个在 spark 容器上读取 parquet 文件的应用程序,我可以访问数据 ....
我尝试过使用 csv 文件,我有类似的错误。
当一个 put 流时,有什么办法拥有 snappy 文件?
谢谢 塞巴斯蒂安
【问题讨论】:
-
"来自不同的 docker 容器"... 默认情况下,容器不共享卷。其次,您正在写入 /tmp/edge/data,而不是 /tmp/edge/parquet
-
您好,感谢您的回复。我做了很多尝试并复制粘贴,我复制粘贴了 2 次不同的尝试,路径不连贯(路径不是根本原因)。我尝试共享卷,但这些卷是以 root 身份创建的。我的应用程序无权将镶木地板文件放入创建的卷中。有没有一种简单的方法来共享非根卷?谢谢
-
我不知道您是否可以将卷挂载为不同的用户,但挂载应该可以由任何用户写入。如果您可以显示您的 docker 命令或撰写文件,那将会很有用。不过,总的来说,我建议使用单独的 MinIO 或 HDFS 容器,而不是读取/写入本地容器文件系统
-
再来一次。我在 spark master 上安装了一个卷,以便与我的烧瓶容器共享元数据。这部分工作,现在烧瓶可以看到目录。但是通过这种修改,worker 中的 parquets 文件丢失了....我不知道为什么在 master 上挂载一个卷会破坏 worker?
-
我不太确定您为什么需要共享数据,但就像我说的,您确实应该使用共享的 networked 文件系统,例如我的两个选项列出...如果您希望数据返回烧瓶容器,您需要在 Spark 代码中使用
collect()并将其作为驱动程序(这会导致网络服务器不必要的负载,所以总体上不是推荐)
标签: python docker pyspark apache-kafka parquet