【问题标题】:Two questions about Flink deserializing关于 Flink 反序列化的两个问题
【发布时间】:2017-06-12 00:29:29
【问题描述】:

我是 Flink 和集群计算的新手。我花了一整天的时间试图在 Flink 上正确解析来自 Kafka 的愚蠢流,但没有结果:这有点令人沮丧...... 我在 kafka 中有一个用字符串键标识的 JSON-LD 消息流。我只是想在 Flink 中检索它们,然后用不同的键分隔消息。

1) 最初我考虑将消息作为字符串而不是 JSON-LD 发送。我虽然更容易......

我尝试了所有反序列化器,但没有一个有效。简单的反序列化器显然可以工作,但它完全忽略了键。

我相信我必须使用(Flink 显然只有两个支持密钥的反序列化器):

DataStream<Object> stream = env
            .addSource(new FlinkKafkaConsumer010<>("topicTest", new TypeInformationKeyValueSerializationSchema(String.class, String.class, env.getConfig()), properties))
            .rebalance();

    stream.print();

但我得到:

06/12/2017 02:09:12 来源:自定义来源(4/4)切换到失败 java.io.EOFException 在 org.apache.flink.runtime.util.DataInputDeserializer.readUnsignedByte(DataInputDeserializer.java:306)

如何在不丢失密钥的情况下接收流消息?

2) 我的 kafka 生产者是用 javascript 实现的,因为 Flink 支持 JSONDeserialization 我想直接在 kafka 中发送 JSON 对象。 我不确定这是否适用于 JSON-LD,但我使用过:

json.parse(jsonld_message)

将消息序列化为 json。然后我用通常的字符串键发送了这个。

但在 Flink 中这段代码不起作用:

DataStream<ObjectNode> stream = env
            .addSource(new FlinkKafkaConsumer010<>("topicTest", new JSONKeyValueDeserializationSchema(false), properties))
            .rebalance();

    stream.print();

养一个

JsonParserException。

我认为第一种方法更简单,我更喜欢它,因为允许一次考虑一个问题(第一个:接收数据,第二个:我猜想用外部库重新转换 JSON-LD 中的字符串)。

【问题讨论】:

    标签: java json deserialization apache-flink json-ld


    【解决方案1】:

    已解决:

    最后我决定实现一个自定义反序列化器来实现 KeyedDeserializedSchema 接口。

    【讨论】:

      【解决方案2】:

      为了使用 Flink 的 TypeInformationKeyValueSerializationSchema 从 Kafka 读取数据,它必须以兼容的方式写入。假设你的 key 和 value 是 String 类型,那么 key 和 value 必须以 Flink 的 StringSerializer 理解数据的方式编写。

      因此,您必须确保您的 Kafka 生产者以兼容的方式写入数据。否则 Flink' 将无法读取数据。

      【讨论】:

      • 我认为 StringSerializer 理解数据,因为如果我使用 simpleserializer 我的字符串消息被正确解析。不幸的是,在 doc 中编写的 simpleserializer 忽略了键。
      • 嗯,这很奇怪。你能检查一下你用你的 JavaScript 生产者序列化的数据在直接向它提供数据时是否可以通过 Flink 反序列化器读取?
      • 怎么样?发送给kafka生产者的字符串只是python的一个字符串对象,并封装在kafka消息中。我可以确认 Flink 中的 new SimpleStringSchema() 返回正常字符串
      【解决方案3】:

      ** 我遇到了类似的问题。理想情况下,具有键和值的字符串类型的 TypeInformationKeyValueSerializationSchema 应该能够读取我的 kafka 记录,其中键和值都是字符串。但正如上面的帖子所指出的那样,它不能并且有一个 EOF 异常。所以这个问题很容易重现,需要修复。请让我知道我是否可以在此过程中提供任何帮助。同时使用实现自定义序列化器

      Kafka 反序列化器架构

      。这是代码,因为几乎没有关于读取键/值和其他内容的文档: **

      import org.apache.flink.api.common.typeinfo.TypeHint;
      import org.apache.flink.api.common.typeinfo.TypeInformation;
      import org.apache.flink.api.java.tuple.Tuple2;
      import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
      
      import org.apache.kafka.clients.consumer.ConsumerRecord;
      
      public class CustomKafkaSerializer implements KafkaDeserializationSchema<Tuple2<String,String>> {
      
      
          @Override
          public boolean isEndOfStream(Tuple2<String,String> stringStringPair) {
              return false;
          }
      
          @Override
          public Tuple2<String,String> deserialize(ConsumerRecord<byte[], byte[]> consumerRecord) throws Exception {
              String key = new String(consumerRecord.key());
              String value = new String(consumerRecord.value());
              return new Tuple2<>(key,value);
          }
      
          @Override
          public TypeInformation<Tuple2<String,String>> getProducedType() {
              return TypeInformation.of(new TypeHint<Tuple2<String, String>>(){});
          }
      
      }
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-08-17
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多