【问题标题】:Kafka string to Dataframe - pysparkKafka字符串到Dataframe - pyspark
【发布时间】:2018-09-26 03:10:10
【问题描述】:

我有一个 Kafka 生产者:

producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('topic', ('12', 'AB DD', 'targer_1', '18'))
producer.send('topic', ('33', 'CC FF', 'target_2', '23'))

Spark 消费者应该处理这个流:

sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountw")
ssc = StreamingContext(sc, 4)
kvs = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])

请帮助我将此流转换为可查询的 JSON 其中键值结构是这样的:

{"A": '12', "B": 'AB DD', "C": 'targer_1', "D": '18'}

我想像这样过滤对象流:

Df.select("A", "C").where("D > 19")

然后将其发送回 Kafka。 如果您有任何建议,我很乐意听到。

【问题讨论】:

    标签: apache-spark pyspark apache-kafka pyspark-sql


    【解决方案1】:

    使用结构化流和完整的 JSON 编码对您来说会容易得多。以 JSON 格式写入数据

    from pyspark.sql.functions import from_json, col, to_json
    from pyspark.sql.types import *
    
    producer = KafkaProducer(
       value_serializer=lambda v: json.dumps(dict(zip(["A", "B", "C", "D"], v))).encode('utf-8')
    )
    

    使用 Spark Kafka 阅读器阅读(您必须包含 spark-sql-kafka 包):

    df = (spark.readStream.format("kafka")
       .option("kafka.bootstrap.servers", brokers)
       .option("subscribe", "topic")
       .load())
    

    定义架构:

    schema = StructType([StructField(c, StringType()) for c in ["A", "B", "C", "D"]])
    

    解析、过滤和写入

    (df
        # Parse JSON
        .select(from_json(col("value").cast("string"), schema).alias("value"))
        # Filter
        .where(col("value.D").cast("integer") > 19)
        # Serialize to JSON
        .select(to_json("value").alias("value"))
        # And write
        .writeStream
        .format("kafka")
        .option("topic", output_topic)
        .option("kafka.bootstrap.servers", brokers)
        .option("checkpointLocation", checkpont_directory)
        .start())
    

    使用旧 API,您可以:

    • 使用createDirectStream 可以使用valueDecodermessageHandler 参数来解码传入数据。您也可以使用map
    • 使用foreachPartition 来:

      • 将转换后的数据转换为DataFrame
      • 过滤掉记录。
      • 开始制作人。
      • 写信给 Kafka。

    【讨论】:

      猜你喜欢
      • 2021-06-28
      • 2018-09-15
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-11-10
      • 2017-05-08
      • 2022-11-02
      • 1970-01-01
      相关资源
      最近更新 更多