【问题标题】:How to use Avro serialization with Spring-Kafka如何在 Spring-Kafka 中使用 Avro 序列化
【发布时间】:2019-08-18 04:19:49
【问题描述】:

我正在尝试学习 Kafka 和现在的 Avro,为了保持发送者对象和接收者对象之间的一致性,我们保留了 JSON 模式 (.avsc)。但我找不到任何简单的例子来说明如何使用它。一些例子是使用融合(是 Avro 的融合任务),一些是通过 Avro 工具生成对象。 到目前为止,我有一个有效的 Kafka 设置。

对象类

package com.example.kafka;
public class Hello {
String name;
String age;
public Hello(String name, String age) {
    this.name = name;
    this.age = age;
}
@Override
public String toString() {
    return "Hello{" +
            "name='" + name + '\'' +
            ", date='" + age + '\'' +
            '}';
}
public String getName() {
    return name;
}
public void setName(String name) {
    this.name = name;
}
public String getAge() {
    return age;
}
public void setAge(String age) {
    this.age = age;
}

}

控制器类

    package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.GetMapping;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;
@RestController
@RequestMapping("/")
class KafkaController {
    @Autowired
    KafkaService kafkaService;
    @GetMapping("test")
    public Hello hello() {
        Hello hello = new Hello("shrikant", "25");
        kafkaService.hello(hello);
        return hello;
    }
}

主要应用

package com.example.kafka;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.EnableAutoConfiguration;
import org.springframework.boot.autoconfigure.SpringBootApplication;
@SpringBootApplication
@EnableAutoConfiguration
public class KafkaDemoApplication {
    public static void main(String[] args) {
        SpringApplication.run(KafkaDemoApplication.class, args);
    }
}

KafkaProducerConfig

package com.example.kafka;
import org.apache.kafka.clients.producer.ProducerConfig;
import org.apache.kafka.common.serialization.StringSerializer;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.kafka.core.DefaultKafkaProducerFactory;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.kafka.core.ProducerFactory;
import java.util.HashMap;
import java.util.Map;
@Configuration
public class KafkaProducerConfig {
    @Bean
    public Map<String, Object> producerConfigs() {
        Map<String, Object> props = new HashMap<>();
        props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
        props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, StringSerializer.class);
        props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, KafkaSerializer.class);
        return props;
    }
    @Bean
    public ProducerFactory<String, Hello> producerFactory() {
        return new DefaultKafkaProducerFactory<>(producerConfigs());
    }
    @Bean
    public KafkaTemplate<String, Hello> kafkaTemplate() {
        return new KafkaTemplate<>(producerFactory());
    }
}

KafkaSerializer

package com.example.kafka;
import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.kafka.common.serialization.Serializer;
import java.util.Map;
public class KafkaSerializer implements Serializer<Hello> {
    @Override
    public byte[] serialize(String arg0, Hello developer) {
        byte[] serializedBytes = null;
        ObjectMapper objectMapper = new ObjectMapper();
        try {
            serializedBytes = objectMapper.writeValueAsString(developer).getBytes();
        } catch (Exception e) {
            e.printStackTrace();
        }
        return serializedBytes;
    }
    @Override
    public void close() {
        // TODO Auto-generated method stub
    }
    @Override
    public void configure(Map<String, ?> arg0, boolean arg1) {
        // TODO Auto-generated method stub
    }
}

卡夫卡服务

package com.example.kafka;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.kafka.core.KafkaTemplate;
import org.springframework.stereotype.Service;
@Service
public class KafkaService {
    @Autowired
    private KafkaTemplate<String, Hello> kafkaTemplate;
    public void hello(Hello hello) {
        kafkaTemplate.send("test", hello);
    }
}

你好.avsc

{"namespace": "com.example.kafka",
  "type": "record",
  "name": "Hello",
  "fields": [
    {"name": "name", "type": "string"},
    {"name": "age",  "type": "string"},
  ]
}

build.gradle

    buildscript {
    repositories {
        mavenCentral()
    }
    dependencies {
        classpath 'org.springframework.boot:spring-boot-gradle-plugin:1.5.21.RELEASE'
    }
}
plugins {
    id 'java'
}
apply plugin: 'org.springframework.boot'
group = 'com.example.kafka'
version = '0.0.1-SNAPSHOT'
sourceCompatibility = '1.8'
repositories {
    mavenCentral()
}
ext {
    set('spring-kafka.version', "2.2.8.RELEASE")
}
dependencies {
    compile 'org.springframework.boot:spring-boot-starter'
    compile 'org.springframework.kafka:spring-kafka'
    implementation 'org.springframework.boot:spring-boot-starter-web'
}

这是工作设置,我能够发送和接收数据,我需要进行哪些更改才能使 Avro 正常工作。

【问题讨论】:

    标签: java apache-kafka avro


    【解决方案1】:

    Confluent 为这个用例维护了教程:https://docs.confluent.io/platform/current/tutorials/examples/clients/docs/java-springboot.html


    您当前正在使用 JSON。要在当前设置中轻松使用 Avro,您必须导入 Jackson Avro 数据格式对象映射器。

    https://github.com/FasterXML/jackson-dataformats-binary/blob/master/avro/README.md

    或者您可以安装 Confluent Schema Registry 并使用它们的序列化程序,而无需为您想要的每个对象类编写自己的序列化程序。

    要实际使用 AVSC,您需要从文件系统中读取文件以创建 Schema 对象,或者可以使用 Avro Gradle Plugin 让它为您生成一个对象类,该类会将模式嵌入为多变的。 https://github.com/commercehub-oss/gradle-avro-plugin

    Confluent 示例使用 Avro Maven 插件,但想法类似。

    https://docs.confluent.io/current/schema-registry/schema_registry_tutorial.html#example-producer-code

    请注意,使用 Jackson Avro 编码的消息与 Confluent 不兼容,因为 Confluent 序列化消息本身不包含任何 Avro 架构,因此您不能混合使用这些(反)序列化器

    【讨论】:

    • 这听起来可能很愚蠢,但我不明白你的大部分答案。你能举个例子解释一下吗?
    • 这很难解释,因为实际上你做了很多事情......我将从这里提供的简单示例开始 - docs.confluent.io/current/schema-registry/… (不使用 Spring 或 Gradle,然后尝试查找正确替换您实际想要使用其他工具执行的操作)。
    猜你喜欢
    • 2020-07-06
    • 2019-11-18
    • 2019-04-04
    • 1970-01-01
    • 2020-10-05
    • 2019-07-14
    • 1970-01-01
    • 2019-11-25
    • 1970-01-01
    相关资源
    最近更新 更多