【发布时间】:2018-09-26 03:10:10
【问题描述】:
我有一个 Kafka 生产者:
producer = KafkaProducer(value_serializer=lambda v: json.dumps(v).encode('utf-8'))
producer.send('topic', ('12', 'AB DD', 'targer_1', '18'))
producer.send('topic', ('33', 'CC FF', 'target_2', '23'))
Spark 消费者应该处理这个流:
sc = SparkContext(appName="PythonStreamingDirectKafkaWordCountw")
ssc = StreamingContext(sc, 4)
kvs = KafkaUtils.createDirectStream(ssc, topic, {"metadata.broker.list": brokers})
lines = kvs.map(lambda x: x[1])
请帮助我将此流转换为可查询的 JSON 其中键值结构是这样的:
{"A": '12', "B": 'AB DD', "C": 'targer_1', "D": '18'}
我想像这样过滤对象流:
Df.select("A", "C").where("D > 19")
然后将其发送回 Kafka。 如果您有任何建议,我很乐意听到。
【问题讨论】:
标签: apache-spark pyspark apache-kafka pyspark-sql