【问题标题】:Processing json tabular data incoming from Kafka topics in Python在 Python 中处理从 Kafka 主题传入的 json 表格数据
【发布时间】:2021-10-21 02:36:06
【问题描述】:

我有事件以 key:value jsons(没有嵌套结构)的形式流入多个 Kafka 主题,例如:

event_1: {"name": "Alex", "age": 27, "hobby": "pc games"},  
event_2: {"name": "Bob", "age": 33, "hobby: "swimming"},  
event_3: {"name": "Charlie", "age": 12, "hobby: "collecting stamps"}

我正在使用 Python 3.7,并希望使用这些主题中的一批事件,假设每 5 分钟一次,将其转换为数据帧,对这些数据进行一些处理和丰富,并将结果保存到 csv文件。

我是 Spark 的新手,我搜索了帮助我完成这项任务的文档,但没有找到任何文档。 是否有任何更新的信息来源推荐?
此外,如果有任何其他推荐的适合此任务的大数据框架,我很想听听。

【问题讨论】:

  • 您在 Spark Streaming 文档中没有发现任何关于从 Kafka 消费的信息?

标签: apache-spark pyspark apache-kafka apache-spark-sql


【解决方案1】:

请参阅:结构化流编程指南的triggers 部分。触发器有 3 种不同类型,默认为 micro-batch,只要前一个 micro-batch 处理完成,就会生成 micro-batch。

在您的情况下,您需要固定间隔微批处理,您可以在其中指定必须触发查询的持续时间。以下是执行此操作的代码 sn-p。

df.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("path", "path/to/destination/dir") \
    .trigger(processingTime='5 minutes') \ # fixed interval trigger
    .start()

简要代码

from pyspark.sql import SparkSession
from pyspark.sql.types import StructType, StructField, StringType, LongType, IntegerType


# Define schema of kafak message

schema = StructType([
    StructField("name", StringType, true),
    StructField("age", IntegerType, true),
    StructField("hobby", StringType, true),
])

# Initialize spark session

spark = SparkSession.builder.appName("example").getOrCreate()

# Read Kafka topic and load data using schema

df = spark.readStream.format("kafka")\
    .option("kafka.bootstrap.servers","x.x.x.x:2181")\
    .option("startingOffsets", "latest")\
    .option("subscribe","testdata")\
    .load()\
    .select(from_json(col("value").cast("string"), schema).alias("data"))\
    .select(f.col("data.*"))\

# Do some transformation
df1 = df...

# Write the resultant dataframe as CSV file

df1.writeStream \
    .format("csv") \
    .option("header", True) \
    .option("path", "path/to/destination/dir") \
    .trigger(processingTime='5 minutes') \
    .start()

如果需要,您还可以在写入 csv 文件之前重新分区最终数据帧

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2019-04-28
    • 2021-06-11
    • 1970-01-01
    • 2022-01-03
    • 1970-01-01
    • 2022-01-11
    • 2021-08-20
    • 2019-10-26
    相关资源
    最近更新 更多