【问题标题】:Testing Kafka Processor API that uses SpecificAvroSerde测试使用 SpecificAvroSerde 的 Kafka 处理器 API
【发布时间】:2018-12-27 23:07:18
【问题描述】:

我正在尝试为自定义流处理器编写单元测试,但无法序列化我需要发送以进行测试的消息。我通过 kafka 跟随这个例子:https://kafka.apache.org/11/documentation/streams/developer-guide/testing.html。我在流中为自定义类(自动生成的 avro 类)使用 SpecificAvroSerde,但我无法在测试中使用 MockSchemaRegistryClient() 对其进行配置,我只能指向 SR 的 URL。

    Serde<MyCustomObject> valueSerde = new SpecificAvroSerde<>();
    Map<String, String> valueSerdeConfig = new HashMap<>();
    valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, "fake");
    valueSerdeConfig.put(AbstractKafkaAvroSerDeConfig.AUTO_REGISTER_SCHEMAS, "true");
    valueSerde.configure(valueSerdeConfig, false);
    ConsumerRecordFactory<Long, MyCustomObject> recordFactory = new ConsumerRecordFactory<>(new LongSerializer(), valueSerde.serializer());

使用 KafkaAvroSerializer 我可以像这样初始化它:

KafkaAvroSerializer serializer = new KafkaAvroSerializer(schemaRegistryClient);

但 ConsumerRecordFactory 不会将 KafkaAvroSerializer 作为参数。

是否有任何替代方法或我不知道这样做的方法?

如果有任何帮助,我将不胜感激,谢谢。

【问题讨论】:

    标签: apache-kafka avro apache-kafka-streams confluent-schema-registry


    【解决方案1】:

    感谢您提出的解决方案,但我今天确实找到了一个。 Serdes 由序列化器和反序列化器组成:https://kafka.apache.org/11/javadoc/org/apache/kafka/common/serialization/Serdes.html#serdeFrom-org.apache.kafka.common.serialization.Serializer-org.apache.kafka.common.serialization.Deserializer- 所以我从 KafkaAvroSerializer 和 KafkaAvroDeserializer 构建了我的 Serde,如下所示:

    Serde serde = Serdes.serdeFrom(new KafkaAvroSerializer(client), new KafkaAvroDeserializer(client));
    

    每个实现 Serializer 的类都可以成为 Serde 的一部分。

    【讨论】:

      【解决方案2】:

      我正在为所有 Avro 定义生成 Java 模型:

      #!/usr/bin/env bash
      
      if [ ! -f avro-tools-1.8.2.jar ]; then
          wget http://tux.rainside.sk/apache/avro/avro-1.8.2/java/avro-tools-1.8.2.jar
          chmod +x avro-tools-1.8.2.jar
      fi
      
      java -jar avro-tools-1.8.2.jar compile schema ../avro/raw/* ../../java/
      

      然后我只在模拟的 Kafka 集群中生成一些消息

      public abstract class ViewPageEventGenerator {
      
          @NotNull
          public static KeyValue<List, HashMap<String, String>> getSimpleViewPages() {
              List<KeyValue<CustomerKey, ViewPage>> inputValues = new ArrayList<>();
              HashMap<String, String> requestExpectedValuePairs = new HashMap<>();
      
              inputValues = Arrays.asList(
                      new KeyValue<>(
                              new CustomerKey(1912, "Alan Turing"),
                              new ViewPage("Alan Turing", false,
                                      Double.parseDouble(String.valueOf(System.currentTimeMillis())),
                                      "https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
                              )
                      ),
                      new KeyValue<>(
                              new CustomerKey(1912, "Alan Turing"),
                              new ViewPage("Alan Turing", false,
                                      Double.parseDouble(String.valueOf(System.currentTimeMillis() + 100)),
                                      "https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
                              )
                      ),
                      new KeyValue<>(
                              new CustomerKey(1912, "Alan Turing"),
                              new ViewPage("Alan Turing", false,
                                      Double.parseDouble(String.valueOf(System.currentTimeMillis() + 200)),
                                      "https://alan.turing/", "192.168.0.1", "Turing Machine", "Punch card"
                              )
                      )
              );
              requestExpectedValuePairs.put(
                      "{project_id: 1912}",
                      "{\"success\":true,\"data\":{\"count\":3}}"
              );
      
              return new KeyValue<>(inputValues, requestExpectedValuePairs);
          }
      }
      

      就是这样。在拓扑中,我使用基于 Avro 定义生成的 Java 模型(类)。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2021-10-02
        • 2018-04-07
        • 2020-07-07
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多