【问题标题】:Spark Structred Streaming Pyspark Sink Csv Does'nt AppendSpark 结构化流 Pyspark Sink Csv 不附加
【发布时间】:2020-04-28 21:31:39
【问题描述】:

将 json 写入 Kafka 主题并从 kafka 主题中读取 json。实际上我订阅主题并逐行编写控制台。但我必须下沉/写入文件 csv.但我不能。我写了一次 csv 但不追加。

您可以在下面看到我的代码。

谢谢!

import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.types import *
import pyspark.sql.functions as func
spark = SparkSession.builder\
                    .config('spark.jars.packages', 'org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.0') \
                    .appName('kafka_stream_test')\
                    .getOrCreate()
ordersSchema = StructType() \
        .add("a", StringType()) \
        .add("b", StringType()) \
        .add("c", StringType()) \
        .add("d", StringType())\
        .add("e", StringType())\
        .add("f", StringType())

df = spark \
    .readStream \
    .format("kafka") \
    .option("kafka.bootstrap.servers", "localhost:9092") \
    .option("subscribe", "product-views") \
    .load()\


df_query = df \
    .selectExpr("cast(value as string)") \
    .select(func.from_json(func.col("value").cast("string"),ordersSchema).alias("parsed"))\
    .select("parsed.a","parsed.b","parsed.c","parsed.d","parsed.e","parsed.f")\


df = df_query \
    .writeStream \
    .format("csv")\
    .trigger(processingTime = "5 seconds")\
    .option("path", "/var/kafka_stream_test_out/")\
    .option("checkpointLocation", "/user/kafka_stream_test_out/chk") \
    .start()

df.awaitTermination()

【问题讨论】:

  • 为什么还要附加到同一个文件?几乎所有 Hadoop 进程都能够读取整个目录,这就是它们的用途。因此 Spark 默认会写入多个文件

标签: python-3.x apache-spark pyspark apache-kafka spark-structured-streaming


【解决方案1】:

是的,因为你需要这个额外的选项.option("format", "append")

aa = df_query \
    .writeStream \
    .format("csv")\
    .option("format", "append")\
    .trigger(processingTime = "5 seconds")\
    .option("path", "/var/kafka_stream_test_out/")\
    .option("checkpointLocation", "/user/kafka_stream_test_out/chk") \
    .outputMode("append") \
    .start()

【讨论】:

  • 感谢您的回复。正如你所说,我添加了额外的选项。但是我有同样的问题。 hdfs dfs -ls /var/kafka_stream_test_out 找到 2 个项目 drwxr-xr-x - supergroup supergroup 0 2020-01-11 15:40 /var/kafka_stream_test_out/_spark_metadata -rw-r--r-- 1 supergroup supergroup 1460 2020-01 -11 15:40 /var/kafka_stream_test_out/part-00000-2114b4f5-e045-457f-bc3c-fb4d980fdc73-c000.csv 它只创建了一次 csv,但它没有附加。
  • @Serkanşengönül 错误是否相同?你能附上错误日志消息吗(不是所有的跟踪,只有消息)。
  • ERROR FileFormatWriter:91 - Aborting job null。 java.lang.IllegalStateException:/var/kafka_stream_test_out/_spark_metadata/0 在压缩第 9 批(compactInterval:10)时不存在0ece-4a7e-8ddb-ebc6658f56de] 以错误 org.apache.spark.SparkException 终止:作业中止。
  • @Serkanşengönül 您是否尝试删除包含文件的目录并重试?因为,带有 csvs 的导出目录具有元数据,并且您可能在附加选项之前具有元数据。试试看告诉我
  • 是的,它有效。我将检查点位置用户更改为 var 目录。非常感谢。
猜你喜欢
  • 2020-10-22
  • 1970-01-01
  • 1970-01-01
  • 2020-02-27
  • 2019-09-05
  • 1970-01-01
  • 2017-05-04
  • 2022-08-12
  • 1970-01-01
相关资源
最近更新 更多