请参阅:结构化流编程指南的triggers 部分。触发器有 3 种不同类型,默认为 micro-batch,只要前一个 micro-batch 处理完成,就会生成 micro-batch。
在您的情况下,您需要固定间隔微批处理,您可以在其中指定必须触发查询的持续时间。以下是执行此操作的代码 sn-p。
df.writeStream \
.format("csv") \
.option("header", True) \
.option("path", "path/to/destination/dir") \
.trigger(processingTime='5 minutes') \ # fixed interval trigger
.start()
简要代码
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType
# Define schema of kafak message
schema = StructType([
StructField("name", StringType, true),
StructField("age", IntegerType, true),
StructField("hobby", StringType, true),
])
# Initialize spark session
spark = SparkSession.builder.appName("example").getOrCreate()
# Read Kafka topic and load data using schema
df = spark.readStream.format("kafka")\
.option("kafka.bootstrap.servers","x.x.x.x:2181")\
.option("startingOffsets", "latest")\
.option("subscribe","testdata")\
.load()\
.select(from_json(col("value").cast("string"), schema).alias("data"))\
.select(f.col("data.*"))\
# Do some transformation
df1 = df...
# Write the resultant dataframe as CSV file
df1.writeStream \
.format("csv") \
.option("header", True) \
.option("path", "path/to/destination/dir") \
.trigger(processingTime='5 minutes') \
.start()
如果需要,您还可以在写入 csv 文件之前重新分区最终数据帧