【问题标题】:Cannot see message while sinking kafka stream and cannot see print message in flink 1.2下沉 kafka 流时看不到消息,在 flink 1.2 中看不到打印消息
【发布时间】:2017-08-08 20:10:16
【问题描述】:

我的目标是使用 kafka 读取 json 格式的字符串,对字符串进行过滤,然后将消息发送出去(仍然是 json 字符串格式)。

出于测试目的,我的输入字符串消息如下所示:

{"a":1,"b":2}

而我的实现代码是:

def main(args: Array[String]): Unit = {

// parse input arguments
val params = ParameterTool.fromArgs(args)

if (params.getNumberOfParameters < 4) {
  println("Missing parameters!\n"
    + "Usage: Kafka --input-topic <topic> --output-topic <topic> "
    + "--bootstrap.servers <kafka brokers> "
    + "--zookeeper.connect <zk quorum> --group.id <some id> [--prefix <prefix>]")
  return
}

val env = StreamExecutionEnvironment.getExecutionEnvironment
env.getConfig.disableSysoutLogging
env.getConfig.setRestartStrategy(RestartStrategies.fixedDelayRestart(4, 10000))
// create a checkpoint every 5 seconds
env.enableCheckpointing(5000)
// make parameters available in the web interface
env.getConfig.setGlobalJobParameters(params)

// create a Kafka streaming source consumer for Kafka 0.10.x
val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new JSONKeyValueDeserializationSchema(false),
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)

val filteredStream: DataStream[ObjectNode] = messageStream.filter(node => node.get("a").asText.equals("1")
                      && node.get("b").asText.equals("2"))

messageStream.print()
// Refer to: https://stackoverflow.com/documentation/apache-flink/9004/how-to-define-a-custom-deserialization-schema#t=201708080802319255857
filteredStream.addSink(new FlinkKafkaProducer010[ObjectNode](
  params.getRequired("output-topic"),
  new SerializationSchema[ObjectNode] {
    override def serialize(element: ObjectNode): Array[Byte] = element.toString.getBytes()
  }, params.getProperties
))

env.execute("Kafka 0.10 Example")
}

可以看出,我想将消息流打印到控制台并将过滤后的消息接收到 kafka。但是,我一个都看不到。

有趣的是,如果我将 KafkaConsumer 的模式从 JSONKeyValueDeserializationSchema 修改为 SimpleStringSchema,我可以看到 messageStream 打印到控制台。代码如下图:

 val kafkaConsumer = new FlinkKafkaConsumer010(
  params.getRequired("input-topic"),
  new SimpleStringSchema,
  params.getProperties)

val messageStream = env.addSource(kafkaConsumer)
messageStream.print()

这让我觉得如果我使用 JSONKeyValueDeserializationSchema,我的输入消息实际上是不被 Kafka 接受的。但这看起来很奇怪,和网上的文档有很大的不同(https://ci.apache.org/projects/flink/flink-docs-release-1.2/dev/connectors/kafka.html)

希望有人能帮帮我!

【问题讨论】:

    标签: streaming apache-flink


    【解决方案1】:

    JSONKeyValueDeserializationSchema() 需要每个 kafka 消息的消息密钥,我假设在生成 JSON 消息并通过 kafka 主题发送时没有提供密钥。

    因此,要解决此问题,请尝试使用 JSONDeserializationSchema(),它只需要消息并根据收到的消息创建对象节点。

    【讨论】:

    • 我之前确实尝试过。我的程序将在大约 30 秒内毫无问题地启动,但仍然没有输出(控制台和 kafka)。然后,我会收到一条错误消息:线程“主”org.apache.flink.runtime.client.JobExecutionException 中的异常:作业执行失败。引起:com.fasterxml.jackson.core.JsonParseException:无法识别的令牌'PREFIX':在[来源:[B@68d8025;行:1,列:8]
    • 你确定你输入的 JSON 是正确的吗?因为我使用示例代码运行了您的示例,并且它在我的最后工作。你能在这里检查你的 JSON 的有效性吗:jsonlint.com
    • 是的,{"a":1,"b":2} 绝对是一个有效的 json(我也检查过)。我想知道你如何测试我的代码?我所做的是使用本地 kafka 消费者作为输入,本地 kafka 生产者作为输出。我看不到 flink 程序的输出。
    • 我通过 kafka 主题发送 JSON,并使用 JSONDeserializationSchema() 从 kafka 主题中读取,然后在将源添加到执行环境后打印。您是否使用任何特定的时间特征?我做了这样的事情:pastebin.com/xzuK7YK6
    • 我看到了问题。如果我像您所做的那样进行测试,那么我确实看到了输出。但是,如果我应用了过滤器(messageStream.filter),那么我看不到输出。可能过滤器实现有问题。
    猜你喜欢
    • 1970-01-01
    • 2011-04-13
    • 2021-04-06
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-12-13
    • 2017-10-29
    • 2021-10-22
    相关资源
    最近更新 更多