【发布时间】:2017-08-10 19:33:27
【问题描述】:
我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,选择部分消息并将消息发送出去(仍然是 json 字符串格式)。
出于测试目的,我的输入字符串消息如下所示:
{"a":1,"b":2,"c":"3"}
我的实现代码是:
def main(args: Array[String]): Unit = {
val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"
val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"
val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
inputTopic,
new JSONDeserializationSchema(),
inputProperties)
val messageStream : DataStream[ObjectNode]= env
.addSource(kafkaConsumer).rebalance
val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
.asText.equals("1") && node.get("b").asText.equals("2"))
// Need help in this part, how to extract for instance a,c and
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
node => {
node.get("a")
}
)
testStream.addSink(new FlinkKafkaProducer010[JsonNode](
outputTopic,
new SerializationSchema[JsonNode] {
override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
}, outputProperties
))
env.execute("Kafka 0.10 Example")
}
正如这段代码的注释所示,我不确定如何正确选择部分消息。我使用地图,但我不知道如何连接整个消息。例如,我在代码中所做的只能给我一个结果为“1”,但我想要的是 {"a":1, "c":"3"}
或者也许有一个完全不同的方法来解决这个问题。问题在于火花流中有一个“选择”API,但是我在 Flink 中找不到它。
非常感谢 flink 社区的帮助!这是我想在这个小项目中实现的最后一个功能。
【问题讨论】: