【发布时间】:2021-05-29 15:00:21
【问题描述】:
我想使用 kafka 消费者接收 json。不幸的是,我收到以下错误:java: cannot access com.fasterxml.jackson.core.type.TypeReference class file for com.fasterxml.jackson.core.type.TypeReference not found。
首先,我使用 kafka CLI 生成一些有效载荷:
kafka-console-producer --broker-list localhost:9092 --topic t2
>{"payload":"a", "appId":"b", "userId":"3", "deviceType":"d"}
>{"payload":"aa", "appId":"bb", "userId":"44", "deviceType":"e"}
然后,我有一个带有@KafkaListener 注释的类:
@Service
public class SimpleConsumerService {
public static final Logger LOGGER = LogManager.getLogger(SimpleConsumerService.class);
@KafkaListener(topics = "t2")
public void consumer(SingleUserRequest singleUserRequest) {
LOGGER.info(String.format("Req received: %s", singleUserRequest));
}
}
消费者有以下配置:
@Configuration
@EnableKafka
public class KafkaConfig {
@Autowired
KafkaProperties kafkaProperties;
private Map<String, Object> getConsumerProps() {
var consumerConfig = new HashMap<>(
kafkaProperties.buildConsumerProperties()
);
consumerConfig.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "localhost:9092");
consumerConfig.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class);
consumerConfig.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, JsonDeserializer.class);
consumerConfig.put(ConsumerConfig.GROUP_ID_CONFIG, "g1");
consumerConfig.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, false);
consumerConfig.put(ConsumerConfig.MAX_POLL_INTERVAL_MS_CONFIG, 60000);
consumerConfig.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");
return consumerConfig;
}
@Bean
public ConsumerFactory<String, SingleUserRequest> consumerFactory() {
return new DefaultKafkaConsumerFactory<>(getConsumerProps(), new StringDeserializer(),
new JsonDeserializer<>(SingleUserRequest.class));
}
@Bean
public ConcurrentKafkaListenerContainerFactory<String, SingleUserRequest> kafkaListenerContainerFactory() {
var factory = new ConcurrentKafkaListenerContainerFactory<String, SingleUserRequest>();
factory.setConsumerFactory(consumerFactory());
return factory;
}
还有SingleUserRequest:
public class SingleUserRequest {
String payload;
String appId;
String userId;
String deviceType;
public SingleUserRequest() {
}
public SingleUserRequest(String payload, String appId, String userId, String deviceType) {
this.payload = payload;
this.appId = appId;
this.userId = userId;
this.deviceType = deviceType;
}
public String getPayload() {
return payload;
}
public String getAppId() {
return appId;
}
public String getUserId() {
return userId;
}
public String getDeviceType() {
return deviceType;
}
}
【问题讨论】:
-
发布你的 pom.xml
标签: java spring-boot apache-kafka jackson spring-kafka