【问题标题】:How to extract part of a string in json format from Kafka in Flink 1.2如何在 Flink 1.2 中从 Kafka 中提取部分 json 格式的字符串
【发布时间】:2017-08-10 19:33:27
【问题描述】:

我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,选择部分消息并将消息发送出去(仍然是 json 字符串格式)。

出于测试目的,我的输入字符串消息如下所示:

{"a":1,"b":2,"c":"3"}

我的实现代码是:

def main(args: Array[String]): Unit = {

val inputProperties = new Properties()
inputProperties.setProperty("bootstrap.servers", "localhost:9092")
inputProperties.setProperty("group.id", "myTest2")
val inputTopic = "test"

val outputProperties = new Properties()
outputProperties.setProperty("bootstrap.servers", "localhost:9092")
val outputTopic = "test2"


val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  inputTopic,
  new JSONDeserializationSchema(),
  inputProperties)

val messageStream : DataStream[ObjectNode]= env
  .addSource(kafkaConsumer).rebalance

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a")
  .asText.equals("1") && node.get("b").asText.equals("2"))

// Need help in this part, how to extract for instance a,c and 
// get something like {"a":"1", "c":"3"}?
val testStream:DataStream[JsonNode] = filteredStream.map(
  node => {
    node.get("a")
  }
)

testStream.addSink(new FlinkKafkaProducer010[JsonNode](
  outputTopic,
  new SerializationSchema[JsonNode] {
    override def serialize(element: JsonNode): Array[Byte] = element.toString.getBytes()
  }, outputProperties
))

env.execute("Kafka 0.10 Example")
 }

正如这段代码的注释所示,我不确定如何正确选择部分消息。我使用地图,但我不知道如何连接整个消息。例如,我在代码中所做的只能给我一个结果为“1”,但我想要的是 {"a":1, "c":"3"}

或者也许有一个完全不同的方法来解决这个问题。问题在于火花流中有一个“选择”API,但是我在 Flink 中找不到它。

非常感谢 flink 社区的帮助!这是我想在这个小项目中实现的最后一个功能。

【问题讨论】:

    标签: streaming apache-flink


    【解决方案1】:

    Flink Streaming 作业处理每个输入一次并将其输出到下一个任务或将它们保存到外部存储中。

    一种方法是将所有输出保存到外部存储中,例如 HDFS。流式作业完成后,使用批处理作业将它们组合成 JSON。

    另一种方法是使用 state 和 RichMapFunction 来获取包含所有键值的 JSON。

    stream.map(new MapFunction<String, Tuple2<String, String>>() {
        public Tuple2<String, String> map(String value) throws Exception {
            return new Tuple2<String, String>("mock", value);
        }
    }).keyBy(0).map(new RichMapFunction<Tuple2<String,String>, String>() {
        @Override
        public String map(Tuple2<String, String> value) throws Exception {
            ValueState<String> old = getRuntimeContext().getState(new ValueStateDescriptor<String>("test", String.class));
            String newVal = old.value();
            if (newVal != null) makeJSON(newVal, value.f1);
            else newVal = value.f1;
            old.update(newVal);
            return newVal;
        }
    }).print();
    

    并使用这个映射函数:filteredStream.map(function);

    请注意,使用状态时,您将看到如下输出: {“a”:1},{“a”:1,“c”:3}。 最后的输出应该是你想要的。

    【讨论】:

    • 谢谢! makeJSON 是 Fl​​ink 的内置函数吗?或者你的意思是我需要自己写一个函数并把它放在那里?
    • @teddy 不,Flink 不包含这种方法,它是用于说明的伪代码。你可以实现一个。不需要大量代码;)
    • 我收到一条错误消息,指出键控状态只能用于“键控流”,即在此行上执行“keyBy()”操作后 (ValueState state = getRuntimeContext(). getState(new ValueStateDescriptor("json", String.class));)
    • 我所做的是将您的 makeJSON(state.value(), value) 替换为 "{\"a\":\"" +value。 get("a").toString+"\",\"c\":\"" + value.get("c") + "\"}"。我不确定如何使用 state.value 变量,所以我只是使用 value 并自己创建一个字符串 json @David
    • 我的程序通过删除 ValueState&lt;String&gt; state = getRuntimeContext().getState(new ValueStateDescriptor&lt;String&gt;("json", String.class)); state.update(newJSON); 来工作。但我不确定这是不是一个好方法@David
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-08-03
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多