【发布时间】:2018-06-02 20:25:15
【问题描述】:
有很多从 Kafka 读取 json(写入 parquet)的在线示例 - 但我不知道如何将模式应用于来自 kafka 的 CSV 字符串。
流式数据:
customer_1945,cusaccid_995,27999941
customer_1459,cusaccid_1102,27999942
架构:
schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())
读取流:
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
.option("subscribe", "test") \
.load()
我将它用于 JSON:
interval=df \
.select(from_json(col("value").cast("string"), schema).alias("json")) \
.select("json.*")
在使用分配的架构将其写入镶木地板之前:
query=interval \
.writeStream \
.format("parquet") \
.option("checkpointLocation", "/user/whatever/checkpoint24") \
.start("/user/ehatever/interval24")
由于我不能将 from_json() 用于 CSV - 我不知道如何将架构应用于数据帧,以便可以使用类似的 writeStream() 命令。
【问题讨论】:
标签: python csv apache-spark apache-kafka spark-structured-streaming