【问题标题】:Serialization with KStream groupBy operation使用 KStream groupBy 操作进行序列化
【发布时间】:2019-02-10 05:18:44
【问题描述】:

我正在尝试对 KStream 执行计数操作,但在理解序列化在这里的工作方式时遇到了一些困难。我有一个推送人们信息的流,例如姓名年龄。使用此流后,我正在尝试创建一个包含人们年龄的 KTable。

输入: {“姓名”:“abc”,“年龄”:“15”}

输出: 30, 10 20, 4 10、8 35、22 ...

属性

props.put(StreamsConfig.APPLICATION_ID_CONFIG, "person_processor");
props.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");

处理器

KStream<Object, Person> people = builder.stream("people");
people.print(Printed.<Object, Person>toSysOut().withLabel("consumer-1"));

输出 [consumer-1]:空,[B@7e37bab6

问题 1 我了解主题中的数据以字节为单位。我没有为 Key 或 Value 设置任何 Serdes。 KStream 是否将输入从字节转换为 Person 并在此处打印 Person 的地址?

问题 2 当我添加以下值 Serdes 时,我得到了更有意义的输出。这里的字节信息是先转换成字符串再转换成​​人吗?为什么现在可以正确打印值?

props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, Serdes.String().getClass());

[consumer-1]: null, {"name" : "abc","age" : "15"}

问题 3 现在,在对年龄执行计数时,将字符串转换为人员时出现运行时错误。如果 groupBy 将 age 设置为 Key,count 设置为 Long,为什么会发生 String 到 Person 的转换?

KTable<Integer, Long> integerLongKTable = people.groupBy((key, value) -> value.getAge())
    .count();

Exception in thread "person_processor-9ff96b38-4beb-4594-b2fe-ae191bf6b9ff-StreamThread-1" java.lang.ClassCastException: java.lang.String cannot be cast to com.example.kafkastreams.KafkaStreamsApplication$Person
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:152)
at org.apache.kafka.streams.kstream.internals.KStreamImpl$1.apply(KStreamImpl.java:149)

Edit-1

阅读@Matthias J. Sax 的回复后,我使用此位置的 Serializer 和 DeSerializer 创建了一个 PersonSerde,我得到了这个 SerializationException...

https://github.com/apache/kafka/tree/1.0/streams/examples/src/main/java/org/apache/kafka/streams/examples/pageview

static class Person {

    String name;
    String age;

    public Person(String name, String age) {

      this.name = name;
      this.age = age;
    }

    void setName(String name) {

      this.name = name;
    }

    String getName() {

      return name;
    }

    void setAge(String age) {

      this.age = age;
    }

    String getAge() {

      return age;
    }

    @Override
    public String toString() {

      return "Person {name:" + this.getName() + ",age:" + this.getAge() + "}";
    }
  }

public class PersonSerde implements Serde {

  @Override
  public void configure(Map map, boolean b) {

  }

  @Override
  public void close() {

  }

  @Override
  public Serializer serializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Serializer<Person> personSerializer = new JsonPOJOSerializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personSerializer.configure(serdeProps, false);

    return personSerializer;
  }

  @Override
  public Deserializer deserializer() {

    Map<String, Object> serdeProps = new HashMap<>();

    final Deserializer<Person> personDeserializer = new JsonPOJODeserializer<>();
    serdeProps.put("JsonPOJOClass", Person.class);
    personDeserializer.configure(serdeProps, false);

    return personDeserializer;
  }
}

props.put(StreamsConfig.DEFAULT_KEY_SERDE_CLASS_CONFIG, Serdes.String().getClass());
props.put(StreamsConfig.DEFAULT_VALUE_SERDE_CLASS_CONFIG, personSerde.getClass());

KTable<String, Long> count = people.selectKey((key, value) -> value.getAge()).groupByKey(Serialized.with(Serdes.String(), personSerde))
      .count();

错误

Caused by: org.apache.kafka.common.errors.SerializationException: Error serializing JSON message
Caused by: com.fasterxml.jackson.databind.exc.InvalidDefinitionException: No serializer found for class com.example.kafkastreams.KafkaStreamsApplication$Person and no properties discovered to create BeanSerializer (to avoid exception, disable SerializationFeature.FAIL_ON_EMPTY_BEANS)
at com.fasterxml.jackson.databind.exc.InvalidDefinitionException.from(InvalidDefinitionException.java:77)
at com.fasterxml.jackson.databind.SerializerProvider.reportBadDefinition(SerializerProvider.java:1191)
at com.fasterxml.jackson.databind.DatabindContext.reportBadDefinition(DatabindContext.java:313)

编辑 5

因此,当我将值映射到字符串时,计数似乎可以正常工作。但是当我在自定义对象上使用它时,它会失败

KStream<String, Person> people = builder.stream("person-topic", Consumed.with(Serdes.String(), personSerde));
people.print(Printed.<String, Person>toSysOut().withLabel("person-source"));

KStream<String, Person> agePersonKStream = people.selectKey((key, value) -> value.getAge());
agePersonKStream.print(Printed.<String, Person>toSysOut().withLabel("age-person"));

KStream<String, String> stringStringKStream = agePersonKStream.mapValues((person -> person.name));
stringStringKStream.print(Printed.<String, String>toSysOut().withLabel("age-name"));

KTable<String, Long> stringLongKTable = stringStringKStream.groupByKey(Serialized.with(Serdes.String(), Serdes.String())).count();
stringLongKTable.toStream().print(Printed.<String, Long>toSysOut().withLabel("age-count"));

如果没有第 3 步将值映射到名称,第 4 步将失败。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    问题1 我了解主题中的数据以字节为单位。我没有为 Key 或 Value 设置任何 Serdes。 KStream 是否将输入从字节转换为 Person 并在此处打印 Person 的地址?

    如果您没有在StreamsConfigbuilder.stream(..., Consumers.with(/*serdes*/)) 中指定任何Serde,则字节将不会转换为Person 对象,但该对象将是byte[] 类型。因此,print() 将调用 byte[].toString(),这会导致您看到的神秘输出 ([B@7e37bab6)。

    问题 2 当我添加以下值 Serdes 时,我得到了更有意义的输出。这里的字节信息是先转换成字符串再转换成​​人吗?为什么现在可以正确打印值?

    当您在 StreamsConfig 中指定 Serde.String() 时,字节将转换为 String 类型。看起来,StringSerde 能够以一种有意义的方式反序列化字节——但这似乎是一个巧合,它完全可以工作。看来您的数据实际上是在 JSON 中序列化的,这可以解释为什么 StringSerde() 可以将字节转换为 String

    问题 3 现在,在对年龄进行计数时,将字符串转换为人员时出现运行时错误。如果 groupBy 将 age 设置为 Key,count 设置为 Long,为什么会发生 String 到 Person 的转换?

    这是意料之中的。因为字节被转换为String 对象(如您指定的Serdes.String()),所以无法执行转换。

    最后的评论:

    如果只使用print(),则不会出现类转换异常,因为在这种情况下,不会执行转换操作。 Java 仅在需要时插入强制转换操作。

    对于groupBy(),您使用value.getAge(),因此Java 在此处插入一个强制转换(它知道预期的类型是Person,因为它是通过KStream&lt;Object, Person&gt; people = ... 指定的。对于print(),只有toString() 被称为在Object 上定义,因此不需要强制转换。

    Java 中的泛型为编译器提供类型提示,并替换为Object(或在编译时需要时强制转换)。因此,对于print()Object 变量可以毫无问题地指向byte[],并且toString() 被成功调用。对于groupBy() 情况,编译器将Object 转换为Person 以便能够调用getAge()——但是,这会失败,因为实际类型是String

    要让您的代码正常工作,您需要创建一个 PersonSerde extend Serde&lt;Person&gt; 类并将其指定为值 serde。

    【讨论】:

    • 感谢您抽出宝贵时间回复并帮助消除我的疑虑。
    • 我尝试了创建 PersonSerde 的选项,但遇到了一些反序列化错误。有什么想法吗?
    • 我不知道jackson 是如何工作的,但您似乎没有正确连接序列化程序。
    • 更新了帖子。将值映射到字符串有效,我得到了计数;但不适用于自定义对象。似乎计数操作需要设置序列化程序,但我不确定为什么或如何。
    • 知道了。 Person 对象没有公开公共访问器是一个问题(我使用的是默认值)。感谢您的帮助。
    猜你喜欢
    • 2023-03-19
    • 2018-10-26
    • 2016-03-29
    • 1970-01-01
    • 2018-02-01
    • 2022-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多