【问题标题】:Abstracting Spring Cloud Stream Producer and Consumer code抽象 Spring Cloud Stream 生产者和消费者代码
【发布时间】:2020-04-20 07:27:03
【问题描述】:

我有一个服务,它正在生产和消费来自不同 Spring Cloud Stream Channels 的消息(绑定到 EventHub/Kafka 主题)。有几个这样的服务设置类似。

配置如下所示

 public interface MessageStreams {
      String WORKSPACE = "workspace";
      String UPLOADNOTIFICATION = "uploadnotification";
      String BLOBNOTIFICATION = "blobnotification";
      String INGESTIONSTATUS = "ingestionstatusproducer";

      @Input(WORKSPACE)
      SubscribableChannel workspaceChannel();

      @Output(UPLOADNOTIFICATION)
      MessageChannel uploadNotificationChannel();

      @Input(BLOBNOTIFICATION)
      SubscribableChannel blobNotificationChannel();

      @Output(INGESTIONSTATUS)
      MessageChannel ingestionStatusChannel();
    }


    @EnableBinding(MessageStreams.class)
    public class EventHubStreamsConfiguration {
    }

生产者/发布者代码如下所示

    @Service
    @Slf4j
    public class IngestionStatusEventPublisher {
      private final MessageStreams messageStreams;

      public IngestionStatusEventPublisher(MessageStreams messageStreams) {
        this.messageStreams = messageStreams;
      }

      public void sendIngestionStatusEvent() {
        log.info("Sending ingestion status event");
        System.out.println("Sending ingestion status event");
        MessageChannel messageChannel = messageStreams.ingestionStatusChannel();
        boolean messageSent = messageChannel.send(MessageBuilder
            .withPayload(IngestionStatusMessage.builder()
                .correlationId("some-correlation-id")
                .status("done")
                .source("some-source")
                .eventTime(OffsetDateTime.now())
                .build())
            .setHeader("tenant-id", "some-tenant")
            .build());
        log.info("Ingestion status event sent successfully {}", messageSent);
      }
    }

同样,我有多个其他发布者发布到不同的事件中心/主题。请注意,为每个发布的消息设置了一个租户 ID 标头。这是我的多租户应用程序特有的,用于跟踪租户上下文。另请注意,我在发送消息时正在获取要发布到的频道。

我的消费者代码如下所示

    @Component
    @Slf4j
    public class IngestionStatusEventHandler {
      private AtomicInteger eventCount = new AtomicInteger();

      @StreamListener(TestMessageStreams.INGESTIONSTATUS)
      public void handleEvent(@Payload IngestionStatusMessage message, @Header(name = "tenant-id") String tenantId) throws Exception {
        log.info("New ingestion status event received: {} in Consumer: {}", message, Thread.currentThread().getName());

        // set the tenant context as thread local from the header.

      }

我有几个这样的消费者,并且根据发布者发送的传入租户 ID 标头在每个消费者中设置一个租户上下文。

我的问题是

我如何摆脱在 Publisher 中设置租户 ID 标头和在 Consumer 中设置租户上下文的样板代码,方法是将其抽象到一个库中,该库可以包含在我拥有的所有不同服务中。

另外,有没有一种方法可以根据正在发布的消息的类型动态识别频道。对于给定场景中的前 IngestionStatusMessage.class

【问题讨论】:

  • 我可能读错了,但这不就是两行“样板”吗?至于通道/类连接,您是否考虑过使用枚举将通道、名称和类保持在一起?

标签: spring-integration spring-cloud spring-cloud-stream spring-cloud-stream-binder-kafka


【解决方案1】:

要在公共代码中设置和tenant-id 标头并避免在每个微服务中复制/粘贴它,您可以使用ChannelInterceptor 并使用@GlobalChannelInterceptor 及其patterns 选项使其成为全局标头。

在 Spring 集成中查看更多信息:https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/core.html#channel-interceptors

https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/overview.html#configuration-enable-integration

您无法通过负载类型进行频道选择,因为负载类型实际上是由@StreamListener 方法签名确定的。

您可以尝试使用具有Message<?> 期望的一般@Router,然后根据该请求消息上下文返回特定通道名称以进行路由。

https://docs.spring.io/spring-integration/docs/5.3.0.BUILD-SNAPSHOT/reference/html/message-routing.html#messaging-routing-chapter

【讨论】:

  • ChannelInterceptor 似乎不适用于 SubscribableChannel。有没有办法为可订阅频道处理这个问题
  • 我实际上已经尝试过另一种处理方式;我有一个名为 EventHandler 的抽象基类,我所有不同的 EventHandler 都从该基类扩展。每当通过 AOP 调用 handleEvent 时,我都尝试在我的 Base 类中调用 preProcess 和 postProcess 方法。但是,似乎 Handler 实际上是通过反射调用的,实际上并不是代理,因此建议没有得到应用。
  • public abstract class EventHandler { protected EventHandler() { } public void preProcess(Message message) { // 提取标题并进行一些预处理 } protected abstract void handleEvent( Message message) 抛出异常; public void postProcess(Message message) { //做一些后期处理 } }
  • 将您的频道投射到InterceptableChannel 以添加ChannelInterceptor
  • 我尝试将我的频道投射到 InterceptableChannel 并在 ApplicationContext 初始化期间添加了一个拦截器;但是拦截器没有被调用
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-12-30
  • 1970-01-01
  • 1970-01-01
  • 2016-06-21
  • 2017-06-22
相关资源
最近更新 更多