【发布时间】:2020-03-20 16:53:47
【问题描述】:
我正在尝试使用 Spring Boot 2.0 将来自 kafka 主题的融合 avro 消息作为 Kstream 使用。
我能够以MessageChannel 的身份使用该消息,但不能以KStream 的身份使用。
@Input(ORGANIZATION)
KStream<String, Organization> organizationMessageChannel();
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION)KStream<String, Organization> organization) {
log.info("Organization Received:" + organization);
}
例外:
线程异常 “pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3” org.apache.kafka.streams.errors.StreamsException:流线程 [pcs-7bb7b444-044d-41bb-945d-450c902337ff-StreamThread-3] 失败 再平衡。在 org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:860) 在 org.apache.kafka.streams.processor.internals.StreamThread.runOnce(StreamThread.java:808) 在 org.apache.kafka.streams.processor.internals.StreamThread.runLoop(StreamThread.java:774) 在 org.apache.kafka.streams.processor.internals.StreamThread.run(StreamThread.java:744) 引起:org.apache.kafka.streams.errors.StreamsException:失败 配置值 serde 类 io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde 在 org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:859) 在 org.apache.kafka.streams.processor.internals.AbstractProcessorContext.(AbstractProcessorContext.java:59) 在 org.apache.kafka.streams.processor.internals.ProcessorContextImpl.(ProcessorContextImpl.java:42) 在 org.apache.kafka.streams.processor.internals.StreamTask.(StreamTask.java:134) 在 org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:404) 在 org.apache.kafka.streams.processor.internals.StreamThread$TaskCreator.createTask(StreamThread.java:365) 在 org.apache.kafka.streams.processor.internals.StreamThread$AbstractTaskCreator.createTasks(StreamThread.java:350) 在 org.apache.kafka.streams.processor.internals.TaskManager.addStreamTasks(TaskManager.java:137) 在 org.apache.kafka.streams.processor.internals.TaskManager.createTasks(TaskManager.java:88) 在 org.apache.kafka.streams.processor.internals.StreamThread$RebalanceListener.onPartitionsAssigned(StreamThread.java:259) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.onJoinComplete(ConsumerCoordinator.java:264) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.joinGroupIfNeeded(AbstractCoordinator.java:367) 在 org.apache.kafka.clients.consumer.internals.AbstractCoordinator.ensureActiveGroup(AbstractCoordinator.java:316) 在 org.apache.kafka.clients.consumer.internals.ConsumerCoordinator.poll(ConsumerCoordinator.java:295) 在 org.apache.kafka.clients.consumer.KafkaConsumer.pollOnce(KafkaConsumer.java:1146) 在 org.apache.kafka.clients.consumer.KafkaConsumer.poll(KafkaConsumer.java:1111) 在 org.apache.kafka.streams.processor.internals.StreamThread.pollRequests(StreamThread.java:851) ... 3 更多原因:io.confluent.common.config.ConfigException: 缺少所需的配置“schema.registry.url”,其中没有 默认值。在 io.confluent.common.config.ConfigDef.parse(ConfigDef.java:243) 在 io.confluent.common.config.AbstractConfig.(AbstractConfig.java:78) 在 io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig.(AbstractKafkaAvroSerDeConfig.java:61) 在 io.confluent.kafka.serializers.KafkaAvroSerializerConfig.(KafkaAvroSerializerConfig.java:32) 在 io.confluent.kafka.serializers.KafkaAvroSerializer.configure(KafkaAvroSerializer.java:48) 在 io.confluent.kafka.streams.serdes.avro.SpecificAvroSerializer.configure(SpecificAvroSerializer.java:58) 在 io.confluent.kafka.streams.serdes.avro.SpecificAvroSerde.configure(SpecificAvroSerde.java:107) 在 org.apache.kafka.streams.StreamsConfig.defaultValueSerde(StreamsConfig.java:855) ... 19 更多
基于错误,我认为我缺少配置 schema.registry.url 以进行融合。
我快速浏览了示例here
关于如何使用 streamListener 对 Spring Cloud Stream 执行相同操作,有点迷失了
这需要单独配置吗?或者有没有办法在 application.yml 本身中配置 schema.registry.url confluent 正在寻找?
这里是代码仓库https://github.com/naveenpop/springboot-kstream-confluent
Organization.avsc
{
"namespace":"com.test.demo.avro",
"type":"record",
"name":"Organization",
"fields":[
{
"name":"orgId",
"type":"string",
"default":"null"
},
{
"name":"orgName",
"type":"string",
"default":"null"
},
{
"name":"orgType",
"type":"string",
"default":"null"
},
{
"name":"parentOrgId",
"type":"string",
"default":"null"
}
]
}
DemokstreamApplication.java
@SpringBootApplication
@EnableSchemaRegistryClient
@Slf4j
public class DemokstreamApplication {
public static void main(String[] args) {
SpringApplication.run(DemokstreamApplication.class, args);
}
@Component
public static class organizationProducer implements ApplicationRunner {
@Autowired
private KafkaProducer kafkaProducer;
@Override
public void run(ApplicationArguments args) throws Exception {
log.info("Starting: Run method");
List<String> names = Arrays.asList("blue", "red", "green", "black", "white");
List<String> pages = Arrays.asList("whiskey", "wine", "rum", "jin", "beer");
Runnable runnable = () -> {
String rPage = pages.get(new Random().nextInt(pages.size()));
String rName = names.get(new Random().nextInt(names.size()));
try {
this.kafkaProducer.produceOrganization(rPage, rName, "PARENT", "111");
} catch (Exception e) {
log.info("Exception :" +e);
}
};
Executors.newScheduledThreadPool(1).scheduleAtFixedRate(runnable ,1 ,1, TimeUnit.SECONDS);
}
}
}
KafkaConfig.java
@Configuration
public class KafkaConfig {
@Value("${spring.cloud.stream.schemaRegistryClient.endpoint}")
private String endpoint;
@Bean
public SchemaRegistryClient confluentSchemaRegistryClient() {
ConfluentSchemaRegistryClient client = new ConfluentSchemaRegistryClient();
client.setEndpoint(endpoint);
return client;
}
}
KafkaConsumer.java
@Slf4j
@EnableBinding(KstreamBinding.class)
public class KafkaConsumer {
@StreamListener
public void processOrganization(@Input(KstreamBinding.ORGANIZATION_INPUT) KStream<String, Organization> organization) {
organization.foreach((s, organization1) -> log.info("KStream Organization Received:" + organization1));
}
}
KafkaProducer.java
@EnableBinding(KstreamBinding.class)
public class KafkaProducer {
@Autowired
private KstreamBinding kstreamBinding;
public void produceOrganization(String orgId, String orgName, String orgType, String parentOrgId) {
try {
Organization organization = Organization.newBuilder()
.setOrgId(orgId)
.setOrgName(orgName)
.setOrgType(orgType)
.setParentOrgId(parentOrgId)
.build();
kstreamBinding.organizationOutputMessageChannel()
.send(MessageBuilder.withPayload(organization)
.setHeader(KafkaHeaders.MESSAGE_KEY, orgName)
.build());
} catch (Exception e){
log.error("Failed to produce Organization Message:" +e);
}
}
}
KstreamBinding.java
public interface KstreamBinding {
String ORGANIZATION_INPUT= "organizationInput";
String ORGANIZATION_OUTPUT= "organizationOutput";
@Input(ORGANIZATION_INPUT)
KStream<String, Organization> organizationInputMessageChannel();
@Output(ORGANIZATION_OUTPUT)
MessageChannel organizationOutputMessageChannel();
}
更新 1:
我应用了 dturanski here 的建议,错误消失了。但是仍然无法将消息作为KStream<String, Organization> 使用控制台中没有错误。
更新 2:
应用了来自 sobychacko here 的建议,并且该消息可用于对象中的空值。
我在 GitHub 示例中创建了一个 commit,以从 Spring Boot 本身生成消息,并且仍然将其作为空值获取。
感谢您在此问题上的宝贵时间。
【问题讨论】:
标签: spring-boot apache-kafka apache-kafka-streams spring-cloud-stream confluent-schema-registry