【问题标题】:spark streaming: read CSV string from kafka, write to parquet火花流:从kafka读取CSV字符串,写入镶木地板
【发布时间】:2018-06-02 20:25:15
【问题描述】:

有很多从 Kafka 读取 json(写入 parquet)的在线示例 - 但我不知道如何将模式应用于来自 kafka 的 CSV 字符串。

流式数据:

customer_1945,cusaccid_995,27999941    
customer_1459,cusaccid_1102,27999942

架构:

schema = StructType() \
.add("customer_id",StringType()) \
.add("customer_acct_id",StringType()) \
.add("serv_acct_id",StringType())

读取流:

df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "xx.xx.xx.xx:9092") \
  .option("subscribe", "test") \
  .load()

我将它用于 JSON:

interval=df \
  .select(from_json(col("value").cast("string"), schema).alias("json")) \
  .select("json.*")

在使用分配的架构将其写入镶木地板之前:

query=interval     \
  .writeStream  \
  .format("parquet") \
  .option("checkpointLocation", "/user/whatever/checkpoint24") \
  .start("/user/ehatever/interval24")

由于我不能将 from_json() 用于 CSV - 我不知道如何将架构应用于数据帧,以便可以使用类似的 writeStream() 命令。

【问题讨论】:

    标签: python csv apache-spark apache-kafka spark-structured-streaming


    【解决方案1】:

    我就是这样做的。没有from_json,提取csv字符串:

    interval=df.select(col("value").cast("string")) .alias("csv").select("csv.*")
    

    然后将其拆分为列。这可以使用上面相同的语句写成 parquet 文件

    interval2=interval \
          .selectExpr("split(value,',')[0] as customer_id" \
                     ,"split(value,',')[1] as customer_acct_id" \
                     ,"split(value,',')[2] as serv_acct_id" \
                     ,"split(value,',')[3] as installed_service_id" \
                     ,"split(value,',')[4] as meter_id" \
                     ,"split(value,',')[5] as channel_number" \
                     ... etc
                     )
    

    【讨论】:

      猜你喜欢
      • 2018-12-20
      • 2017-07-26
      • 2018-05-19
      • 2019-02-19
      • 2017-03-17
      • 1970-01-01
      • 2021-09-05
      • 2020-01-10
      • 2020-02-25
      相关资源
      最近更新 更多