【问题标题】:pySpark: Put a Kafka stream into parquet and read parquet from remote sessionpySpark:将 Kafka 流放入 parquet 并从远程会话中读取 parquet
【发布时间】:2021-07-22 14:30:01
【问题描述】:

我有一个 bitnami spark docker 基础设施(一个主人和一个工人)。

Spark 读取 Kafka 流。

stream_df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers", "kafka1:19091")\
    .option("subscribe", "Aleca")\
    .option("startingOffsets", "earliest")\
    .load()

使用 select、filter 修改 stream_df .....

并将stream_df写入parquet文件。

   df_edge.writeStream\
        .format("parquet")\
        .option("checkpointLocation", "/tmp/edge/check")\
        .option("path", "/tmp/edge/data")\
        .trigger(processingTime='10 seconds')\
        .start()\
        .awaitTermination()

我使用 Kafka 发送数据,当检查目录“/tmp/edge/data”时,我只有一个目录 _spark_metadata。

在这个目录中,我有带有快速文件路径的 json 文件。但 snappy 并没有被创建。

从另一个 docker 容器,我尝试读取 parquet 文件。

spark = SparkSession.builder\
    .appName('Flask_gunicorn') \
    .master('spark://0.0.0.0:7077') \
    .config('spark.jars.packages', 'graphframes:graphframes:0.8.1-spark3.0-s_2.12') \
    .config('spark.submit.deployMode', 'client') \
    .config('spark.executor.memory', '1g') \
    .config('spark.cores.max', '1') \
    .config('spark.jars.ivy', '/opt/bitnami/spark/ivy') \
    .config('spark.jars', '/opt/bitnami/spark/jars') \
    .getOrCreate()

edge_df = spark.read.csv(edge_location)

读取返回错误:

Traceback (most recent call last):
  File "/usr/src/app/apao-flask-gunicorn/graph_generator.py", line 22, in <module>
    vertex_df = spark.read.parquet(edge_location)
  File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/readwriter.py", line 353, in parquet
  File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
  File "/usr/local/lib/python3.9/site-packages/pyspark/python/lib/pyspark.zip/pyspark/sql/utils.py", line 134, in deco
  File "<string>", line 3, in raise_from
pyspark.sql.utils.AnalysisException: Path does not exist: file:/tmp/edge/parquet;

如果我尝试使用 spark shell 读取: 我有一个错误文件.....snappy.parquet 不存在。

但是,如果提交一个在 spark 容器上读取 parquet 文件的应用程序,我可以访问数据 ....

我尝试过使用 csv 文件,我有类似的错误。

当一个 put 流时,有什么办法拥有 snappy 文件?

谢谢 塞巴斯蒂安

【问题讨论】:

  • "来自不同的 docker 容器"... 默认情况下,容器不共享卷。其次,您正在写入 /tmp/edge/data,而不是 /tmp/edge/parquet
  • 您好,感谢您的回复。我做了很多尝试并复制粘贴,我复制粘贴了 2 次不同的尝试,路径不连贯(路径不是根本原因)。我尝试共享卷,但这些卷是以 root 身份创建的。我的应用程序无权将镶木地板文件放入创建的卷中。有没有一种简单的方法来共享非根卷?谢谢
  • 我不知道您是否可以将卷挂载为不同的用户,但挂载应该可以由任何用户写入。如果您可以显示您的 docker 命令或撰写文件,那将会很有用。不过,总的来说,我建议使用单独的 MinIO 或 HDFS 容器,而不是读取/写入本地容器文件系统
  • 再来一次。我在 spark master 上安装了一个卷,以便与我的烧瓶容器共享元数据。这部分工作,现在烧瓶可以看到目录。但是通过这种修改,worker 中的 parquets 文件丢失了....我不知道为什么在 master 上挂载一个卷会破坏 worker?
  • 我不太确定您为什么需要共享数据,但就像我说的,您确实应该使用共享的 networked 文件系统,例如我的两个选项列出...如果您希望数据返回烧瓶容器,您需要在 Spark 代码中使用 collect() 并将其作为驱动程序(这会导致网络服务器不必要的负载,所以总体上不是推荐)

标签: python docker pyspark apache-kafka parquet


【解决方案1】:

最新消息

我的基础设施:

Container Flask(无数据)-> Container Spark master(json 元数据)-> Container Spark worker(parquet 文件)

当在 Flask 容器上启动我的 python 脚本(在 Spark 主机上使用远程会话)时,元数据的读取是在 Flask 容器上本地完成的,而不是在 Spark 主机上。

如果我从烧瓶容器调用 spark-submit 来掌握 spark(带有本地会话),我有相同的 pb。

如果我从 master spark 容器调用 spark-submit 到 master spark(带有本地会话),它工作正常,但我更喜欢单独的 docker 文件。

感谢@OneCricketeer 的解释。

docker-compose.yml:

version: '2'

networks:
  default:
    driver: bridge
    name: aleca_network

services:
  spark:
    build: .
    environment:
      - SPARK_MODE=master
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_LOCAL_DIRS=/tmp
    ports:
      - '8080:8080'
      - '7077:7077'
    volumes:
      - ./master-ivy-jars:/opt/bitnami/spark/ivy:z
      #- ./vertex_data:/opt/bitnami/spark/data/vertex
      #- ./edge_data:/opt/bitnami/spark/data/edge

  spark-worker-1:
    build: .
    environment:
      - SPARK_MODE=worker
      - SPARK_MASTER_URL=spark://spark:7077
      - SPARK_WORKER_INSTANCES=2
      - SPARK_WORKER_MEMORY=4G
      - SPARK_WORKER_CORES=4
      - SPARK_RPC_AUTHENTICATION_ENABLED=no
      - SPARK_RPC_ENCRYPTION_ENABLED=no
      - SPARK_LOCAL_STORAGE_ENCRYPTION_ENABLED=no
      - SPARK_SSL_ENABLED=no
      - SPARK_LOCAL_DIRS=/tmp
      - SPARK_WORKER_PORT=7078
    volumes:
      - ./worker1-ivy-jars:/opt/bitnami/spark/ivy:z
      #- ./vertex_data:/opt/bitnami/spark/data/vertex
      #- ./edge_data:/opt/bitnami/spark/data/edge

Dockerfile:

FROM docker.io/bitnami/spark:3-debian-10

USER root
RUN pip install numpy

USER 1001
RUN mkdir /tmp/vertex
RUN mkdir /tmp/vertex/data
RUN mkdir /tmp/vertex/check

RUN mkdir /tmp/edge
RUN mkdir /tmp/edge/data
RUN mkdir /tmp/edge/check


RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-sql-kafka-0-10_2.12/3.0.1/spark-sql-kafka-0-10_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-sql-kafka-0-10_2.12-3.0.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/commons/commons-pool2/2.6.2/commons-pool2-2.6.2.jar --output /opt/bitnami/spark/jars/commons-pool2-2.6.2.jar
RUN curl https://repo1.maven.org/maven2/org/apache/kafka/kafka-clients/2.4.1/kafka-clients-2.4.1.jar --output /opt/bitnami/spark/jars/kafka-clients-2.4.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-token-provider-kafka-0-10_2.12/3.0.1/spark-token-provider-kafka-0-10_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-token-provider-kafka-0-10_2.12-3.0.1.jar
RUN curl https://repo1.maven.org/maven2/org/apache/spark/spark-tags_2.12/3.0.1/spark-tags_2.12-3.0.1.jar --output /opt/bitnami/spark/jars/spark-tags_2.12-3.0.1.jar

【讨论】:

    猜你喜欢
    • 2018-01-31
    • 1970-01-01
    • 2018-01-18
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-01-12
    • 2022-01-16
    • 2016-03-16
    相关资源
    最近更新 更多