【发布时间】:2019-10-31 15:11:21
【问题描述】:
我正在尝试将 kafka Stream 数据解析为 JSON 格式,以便我可以解析实时传入的跨国数据以获得所需的逻辑,并进一步希望在 Hbase 表中对其进行更新。
1.传入的数据流将采用这种格式。
2.我需要在哪里提取card_id、amount、postcode和transaction_dt
{ “card_id”:348702330256514, “member_id”: 000037495066290, “金额”: 9084849,“pos_id”:614677375609919,“邮政编码”:33946, “transaction_dt”:“11-02-2018 00:00:00”}
使用下面列出的代码创建了 Kafka Consumer,但不确定如何通过 RDD 将其进一步处理到 Jason。
Logger.getLogger("org").setLevel(Level.OFF);
Logger.getLogger("akka").setLevel(Level.OFF);
SparkConf sparkConf = new SparkConf().setAppName("KafkaSparkStreamingDemo").setMaster("local");
JavaStreamingContext jssc = new JavaStreamingContext(sparkConf, Durations.seconds(10));
Map<String, Object> kafkaParams = new HashMap<>();
kafkaParams.put("bootstrap.servers", "100.xx.xxx.xxx:9092");
kafkaParams.put("key.deserializer", StringDeserializer.class);
kafkaParams.put("value.deserializer", StringDeserializer.class);
kafkaParams.put("group.id", "groupkafkaspark2");
kafkaParams.put("auto.offset.reset", "earliest");
kafkaParams.put("enable.auto.commit", true);
Collection<String> topics = Arrays.asList("transactions-topic-verified");
JavaDStream<ConsumerRecord<String, String>> stream = KafkaUtils.createDirectStream(jssc,
LocationStrategies.PreferConsistent(),
ConsumerStrategies.<String, String>Subscribe(topics, kafkaParams));
我需要获取上述 4 个字段,然后需要对其进行处理,方法是查看类似数据上预先创建的 hbase 表。
【问题讨论】:
标签: java json apache-spark