【问题标题】:Dynamic destination in Spring Cloud Stream from Azure Event Hub to KafkaSpring Cloud Stream 中从 Azure Event Hub 到 Kafka 的动态目的地
【发布时间】:2022-01-22 01:07:59
【问题描述】:

我正在尝试使用 Spring Cloud Stream 处理发送到 Azure 事件中心实例的消息。这些消息应路由到 Kafka 集群上运行时根据消息内容确定的租户特定主题。出于开发目的,我通过 Docker 在本地运行 Kafka。 我对配置时未知的绑定进行了一些研究,发现动态目标解析可能正是我在这种情况下所需要的。

但是,让我的解决方案发挥作用的唯一方法是使用 StreamBridge。我宁愿使用动态目标标头spring.cloud.stream.sendto.destination,这样处理器可以写为Function<> 而不是Consumer<>(它不完全是一个接收器)。这种方法的主要问题是,由于最终的解决方案将使用 Spring Data Flow 部署,如果使用 StreamBridge,恐怕我会在配置流时遇到麻烦。

继续代码,这是处理器功能,我剥离了不相关的部分

    private static final String OUTPUT_DESTINATION_TEMPLATE = "%s.gateway-report";
    private static final String STREAM_DESTINATION_HEADER = "spring.cloud.stream.sendto.destination";
    private static final String TENANT_ID_HEADER = "tenant-id";

    @Bean
    public Function<Message<String>, Message<String>>
    routeMessageToTenantDestination(TenantGatewayDeviceService gatewayDeviceService) {
        return msg -> {
            final String tenantId = "test";
            final String destination = String.format(OUTPUT_DESTINATION_TEMPLATE, tenantId);
            return MessageBuilder.withPayload(msg.getPayload())
                    .setHeader(STREAM_DESTINATION_HEADER, destination)
                    .setHeader(TENANT_ID_HEADER, tenantId)
                    .build();
        };
    }

这是我的 application.yml

spring:
  cloud:
    stream:
      bindings:
        routeMessageToTenantDestination-in-0:
          binder: kafka-evthub
          destination: gateway-report
          group: report-processor
      dynamic-destinations:
      binders:
        kafka-ioc:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: localhost:29092
        kafka-evthub:
          type: kafka
          environment:
            spring.cloud.stream.kafka.binder:
              brokers: xxxxxxxxxxx.servicebus.windows.net:9093
              configuration:
                sasl:
                  jaas:
                    config: org.apache.kafka.common.security.plain.PlainLoginModule required username="$ConnectionString" password="Endpoint=sb://xxxxxxxxxxx.servicebus.windows.net/;SharedAccessKeyName=*******;SharedAccessKey=********";
                  mechanism: PLAIN
                security.protocol: SASL_SSL
      default-binder: kafka-ioc

我在 pom.xml 中的相关依赖

<dependency>
<groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-stream-binder-kafka</artifactId>
</dependency>
<dependency>
    <groupId>org.springframework.kafka</groupId>
    <artifactId>spring-kafka</artifactId>
</dependency>

这是每次函数触发时我得到的异常

2022-01-20 10:56:18.848 ERROR 2258917 --- [container-0-C-1] o.s.integration.handler.LoggingHandler   : org.springframework.messaging.MessageHandlingException: error occurred in message handler [... stripped away ...]
    at org.springframework.integration.support.utils.IntegrationUtils.wrapInHandlingExceptionIfNecessary(IntegrationUtils.java:191)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:65)
    at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
    at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
    at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
    at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
    at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
    at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
    at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.sendMessageIfAny(KafkaMessageDrivenChannelAdapter.java:385)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access$300(KafkaMessageDrivenChannelAdapter.java:79)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:442)
    at org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter$IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:416)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.lambda$onMessage$0(RetryingMessageListenerAdapter.java:125)
    at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
    at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:255)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:119)
    at org.springframework.kafka.listener.adapter.RetryingMessageListenerAdapter.onMessage(RetryingMessageListenerAdapter.java:42)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeOnMessage(KafkaMessageListenerContainer.java:2588)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeOnMessage(KafkaMessageListenerContainer.java:2569)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeRecordListener(KafkaMessageListenerContainer.java:2483)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.doInvokeWithRecords(KafkaMessageListenerContainer.java:2405)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeRecordListener(KafkaMessageListenerContainer.java:2284)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:1958)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.invokeIfHaveRecords(KafkaMessageListenerContainer.java:1353)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.pollAndInvoke(KafkaMessageListenerContainer.java:1344)
    at org.springframework.kafka.listener.KafkaMessageListenerContainer$ListenerConsumer.run(KafkaMessageListenerContainer.java:1236)
    at java.base/java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:515)
    at java.base/java.util.concurrent.FutureTask.run(FutureTask.java:264)
    at java.base/java.lang.Thread.run(Thread.java:829)
Caused by: java.lang.NullPointerException
    at org.springframework.cloud.stream.function.StreamBridge.resolveDestination(StreamBridge.java:276)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.doSendMessage(FunctionConfiguration.java:604)
    at org.springframework.cloud.stream.function.FunctionConfiguration$FunctionToDestinationBinder$1.handleMessageInternal(FunctionConfiguration.java:597)
    at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:56)
    ... 32 more

我尝试了不同的方法,f.i.手动创建目标主题,使用分配给标题的相同名称配置显式目标绑定(不是最终解决方案,仅用于测试),但我不断收到此异常。我也尝试过提供NewDestinationBindingCallback&lt;&gt;,我可以从打印日志中看到框架进入了该方法,但仍然出现同样的错误。

将 Spring Cloud Stream 与事件中心集成的另一种方法也会发生这种情况,即库 azure-spring-cloud-stream-binder-eventhubs

正如我之前所说,我找到了依赖 StreamBridge 的解决方法,但这种解决方案对我来说似乎不太理想,我想了解我缺少什么。

编辑:我向前迈出了一小步,并设法通过将 spring boot 启动器版本从 2.6.2 降级到 2.4.4 来使其工作

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.4.4</version>
        <relativePath/> <!-- lookup parent from repository -->
    </parent>

和设置

    <properties>
        <spring-cloud.version>2020.0.2</spring-cloud.version>
    </properties>

而不是 pom.xml 中的 2021.0.0,如 sobychacko 提供的示例中所示。但是,这似乎是一种回归,或者我的配置中缺少某些内容以使其适用于最新版本?

【问题讨论】:

    标签: apache-kafka spring-cloud-stream azure-eventhub


    【解决方案1】:

    不确定究竟是什么导致了您遇到的问题。我刚刚创建了一个基本的sample app 来演示sendto.destination 标头,并验证了该应用程序是否按预期工作。它是一个连接了两个 Kafka 集群的多绑定器应用程序。该函数将从第一个集群消耗,然后使用sendto 标头,将输出生成到第二个集群。将此示例中的代码/配置与您的应用进行比较,看看缺少什么。

    我在您共享的堆栈跟踪中看到了对 StreamBridge 的引用。但是,当使用sendto.destination 标头时,它不应该通过StreamBridge

    【讨论】:

    • 非常感谢,我会尝试用你的样本来做实验,看看我缺少什么。运行您的示例只是一个澄清:数据也写入第一个集群上的主题“dataOut”中是否正确,应该是“源”?我希望第一个集群只有 dataIn 主题,而 dataOut 主题位于第二个集群中。
    • dataIn 在 cluster-1 上,dataOut 在 cluster-2 上。
    • 我问是因为在运行 docker-compose 和示例中的 jar 时,不做任何更改,发出命令“kafkacat -b localhost:port -L”我可以在 kafka1 上看到(端口 9092)列出的主题是 dataIn、dataOut 和 process-out-0。在第二个集群 kafka2(端口 9093)上,仅存在主题 dataOut。两个集群的 dataOut 主题中都存在大写数据(通过使用 kafkacat 监听主题进行检查)。我只是想知道为什么 dataOut 主题似乎也在集群 kafka1 上创建(并写入),该示例运行良好:)
    • 似乎将 spring boot 版本更改为 2.6.2 并将 spring cloud 版本更改为 2021.0.0 以与我的程序完全相同的方式破坏了示例代码。有关详细信息,请参阅我的编辑。是否发生了某些变化并且必须以某种方式调整配置,或者它是一个错误/回归?我使用带有最新版本框架的 spring initializr 并检查了最新版本的文档。
    • 这听起来绝对像是一个回归问题。我会检查一下然后回到这里。
    猜你喜欢
    • 2021-08-03
    • 2020-12-12
    • 1970-01-01
    • 1970-01-01
    • 2018-04-28
    • 1970-01-01
    • 2021-07-30
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多