【问题标题】:Spring Integration - no response on reply channelSpring Integration - 回复通道无响应
【发布时间】:2018-11-22 08:56:33
【问题描述】:

这是Spring Integration Executor Channel using annotations code sample 的后续问题。

系统图附

我正在尝试通过将消息发布到“公共频道”并读取消息中设置的 REPLY_CHANNEL 来测试以红色突出显示的框。

“公共频道”是发布订阅频道。 REPLY_CHANNEL 是一个 QueueChannel。

由于这是一个 JUnit 测试,我模拟了 jdbcTemplate、数据源和 Impl 以忽略任何 DB 调用。

我的问题是: 当我在“公共频道”上发布消息时,我在 REPLY_CHANNEL 上没有收到任何消息。 junit 一直在等待响应。

我应该更改什么才能在 REPLY_CHANNEL 上获得响应?

@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(loader = AnnotationConfigContextLoader.class) --------- 1 
@ActiveProfiles("test")
public class QueuetoQueueTest {
    @Configuration
    static class ContextConfiguration { ------------------------------------- 2
        @Bean(name = "jdbcTemplate")
        public JdbcTemplate jdbcTemplate() {
            JdbcTemplate jdbcTemplateMock = Mockito.mock(JdbcTemplate.class);
            return jdbcTemplateMock;
        }

        @Bean(name = "dataSource")
        public DataSource dataSource() {
            DataSource dataSourceMock = Mockito.mock(DataSource.class);
            return dataSourceMock;
        }

        @Bean(name = "entityManager")
        public EntityManager entityManager() {
            EntityManager entityManagerMock = Mockito.mock(EntityManager.class);
            return entityManagerMock;
        }

        @Bean(name = "ResponseChannel")
        public QueueChannel getReplyQueueChannel() {
            return new QueueChannel();
        }

//This channel serves as the 'common channel' in the diagram
        @Bean(name = "processRequestSubscribableChannel")
        public MessageChannel getPublishSubscribeChannel() {
            return new PublishSubscribeChannel();
        }
    }

    @Mock
    DBStoreDaoImpl dbStoreDaoImpl;

    @Test
    public void testDBConnectivity() {
        Assert.assertTrue(true);
    }

    @InjectMocks -------------------------------------------------------------- 3
    StoretoDBConfig storetoDBConfig = new StoretoDBConfig();

    @Autowired
    @Qualifier("ResponseChannel")
    QueueChannel ResponseChannel;

    @Autowired
    @Qualifier("processRequestSubscribableChannel")
    MessageChannel processRequestSubscribableChannel;

    @Before
    public void setUp() throws Exception {
        MockitoAnnotations.initMocks(this);
    }

    @Test
    public void outboundtoQueueTest() {
        try {
            when(dbStoreDaoImpl.storeToDB(any()))
                    .thenReturn(1); ----------------------------------------------- 4

            //create message
            Message message = (Message<String>) MessageBuilder
                    .withPayload("Hello")
                    .setHeader(MessageHeaders.REPLY_CHANNEL, ResponseChannel)
                    .build();

            //send message
            processRequestSubscribableChannel.send(message);
            System.out
                    .println("Listening on InstructionResponseHandlertoEventProcessorQueue");

            //wait for response on reply channel
            Message<?> response = ResponseChannel.receive(); ----------------------- 5
            System.out.println("***************RECEIVED: "
                    + response.toString());
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}
  1. 为 JUnit 加载“ContextConfiguration”,以便不访问 DB。

  2. 这是您在 JUnit 中加载自定义配置的方式,根据 https://spring.io/blog/2011/06/21/spring-3-1-m2-testing-with-configuration-classes-and-profiles

在配置类中,我们模拟 jdbcTemplate、dataSource、entityManager 并定义发布请求的“公共通道”和 ResponseChannel。

  1. 将jdbcTemplate、dataSource mock 注入StoretoDBConfig,这样DB就不会被命中

  2. 模拟 DaoImpl 类以便忽略 DB 调用

  3. 此处测试阻塞,因为 REPLY_CHANNEL 上没有响应

更新代码:

Code inside 5 (the class that reads from common channel):

    @Configuration
    class HandleRequestConfig {

        //Common channel - refer diagram
        @Autowired
        PublishSubscribeChannel processRequestSubscribableChannel;

        //Step 9 - This channel is used to send queue to the downstream system
        @Autowired
        PublishSubscribeChannel forwardToExternalSystemQueue;

        public void handle() {
            IntegrationFlows.from("processRequestSubscribableChannel")          // Read from 'Common channel'
            .wireTap(flow->flow.handle(msg -> System.out.println("Msg received on processRequestSubscribableChannel"+ msg.getPayload())))
            .handle(RequestProcessor,"validateMessage")                         // Perform custom business logic - no logic for now, return the msg as is   
            .wireTap(flow->flow.handle(msg -> System.out.println("Msg received on RequestProcessor"+ msg.getPayload())))
            .channel("forwardToExternalSystemQueue");                           // Post to 'Channel to another system' 
            }

    }

    //Code inside step 8 - 'Custom Business Logic' 
    @Configuration
    class RequestProcessor {
        public Message<?> validateMessage(Message<?> msg) {
            return msg;
        }
    }

我想要达到的目标: 我有用于业务逻辑的单独的 junit 测试用例。我正在尝试测试当请求发布到“公共通道”时,会在“到另一个系统的通道”上收到响应。

为什么我不能使用原来的 ApplicationContext: 因为它连接到数据库,我不希望我的 JUnit 连接到数据库或使用嵌入式数据库。我希望忽略对数据库的任何调用。

我已将回复通道设置为“ResponseChannel”,“自定义业务逻辑”不应该将其响应发送到“ResponseChannel”吗?

如果我必须在不同的频道上收听回复,我愿意这样做。我只想测试我在“公共频道”上发送的消息是否在“到其他系统的频道”上收到。

更新 2: 解决 Artem 的问题。 感谢 Artem 的建议。

'HandlerRequestConfig' 是否包含在测试配置中? - 我们不能直接调用 handle() 方法。相反,我想如果我在“processRequestSubscribableChannel”上发帖,HandleRequestConfig 中的 handle() 方法将被调用,因为它在同一个频道上进行侦听。这是错的吗?那我该如何测试 HandleRequestConfig.handle() 方法呢?

我在 HandleRequestConfig 中每个步骤的末尾添加了窃听(代码已更新)。我发现没有打印任何窃听消息。这意味着我发布的消息甚至没有到达输入通道“processRequestSubscribableChannel”。我做错了什么?

注意:我尝试在 Configuration 中删除 'processRequestSubscribableChannel' bean(以便使用 applicationContext 中的实际 'processRequestSubscribableChannel')。我收到一个不满意的依赖错误 - 预期至少有 1 个配置为 PublishSubscribeChannel 的 bean。

更新 3:要求发布 Artem 的详细信息。

@RunWith(SpringRunner.class)
@SpringBootTest
public class QueuetoQueueTest  {

//  Step 1 - Mocking jdbcTemplate, dataSource, entityManager so that it doesn't connect to the DB
        @MockBean
        @Qualifier("jdbcTemplate")
        JdbcTemplate jdbcTemplate;

        @MockBean
        @Qualifier("dataSource")
        public DataSource dataSource;

        @MockBean
        @Qualifier("entityManager")
        public EntityManager entityManager;

        @Bean(name = "ResponseChannel")
        public PublishSubscribeChannel getReplyQueueChannel() {
            return new PublishSubscribeChannel();
        }

        //Mocking the DB class
        @MockBean
        @Qualifier("dbStoreDaoImpl")
        DBStoreDaoImpl  dbStoreDaoImpl ;


        //Inject the mock objects created above into the flow that stores data into the DB.
        @InjectMocks
        StoretoDBConfig storetoDBConfig = new StoretoDBConfig();

//Step 2 - Injecting MessageChannel used in the actual ApplicationContext
        @Autowired
        @Qualifier("processRequestSubscribableChannel")
        MessageChannel processRequestSubscribableChannel;

        @Before
        public void setUp() throws Exception {
            MockitoAnnotations.initMocks(this);
        }

        @Test
        public void outboundtoQueueTest() {
        try {
            when(dbStoreDaoImpl.storeToDB(any()))
                    .thenReturn(1);

            //create message
            Message message = (Message<?>) MessageBuilder
                    .withPayload("Hello")
                    .build();
            //send message - this channel is the actual channel used in ApplicationContext
            processRequestSubscribableChannel.send(message);
        } catch (Exception e) {
            e.printStackTrace();
        }
    }
}

我得到的错误:代码尝试连接到数据库并引发错误。

更新 1:StoretoDBConfig 中的代码

@Configuration
@EnableIntegration
public class StoretoDBConfig {
    @Autowired
    DataSource dataSource;

/*
 * Below code is irrelevant to our current problem - Including for reference.
 * 
 * storing into DB is delegated to a separate thread.
 *
 *  @Bean
 *  public TaskExecutor taskExecutor() {
 *      return new SimpleAsyncTaskExecutor();
 *  }
 *  
 *  @Bean(name="executorChannelToDB")
 *  public ExecutorChannel outboundRequests() {
 *      return new ExecutorChannel(taskExecutor());
 *  }
 *  @Bean(name = "DBFailureChannel")
 *  public static MessageChannel getFailureChannel() {
 *      return new DirectChannel();
 *  }
 *  private static final Logger logger = Logger
 *              .getLogger(InstructionResponseHandlerOutboundtoDBConfig.class);
*/
    @Bean
    public IntegrationFlow handle() {
    /*
     * Read from 'common channel' - processRequestSubscribableChannel and send to separate thread that stores into DB.
     *
     /
        return IntegrationFlows
                .from("processRequestSubscribableChannel")
                .channel("executorChannelToDB").get();
    }
}

在单独线程上存储到 DB 中的代码:

@Repository
public class DBStoreDaoImpl implements DBStoreDao {
    private JdbcTemplate jdbcTemplate;

    @Autowired
    public void setJdbcTemplate(DataSource dataSource) {
        this.jdbcTemplate = new JdbcTemplate(dataSource);
    }

    @Override
    @Transactional(rollbackFor = Exception.class)
    @ServiceActivator(inputChannel = "executorChannelToDB")
    public void storetoDB(Message<?> msg) throws Exception {
            String insertQuery ="Insert into DBTable(MESSAGE) VALUES(?)";
            jdbcTemplate.update(insertQuery, msg.toString());
    }
}

【问题讨论】:

  • @GaryRussell,你能帮忙吗?

标签: junit spring-integration


【解决方案1】:

请告诉我们订阅了Common channel 的内容。您的图表不知何故与您向我们展示的内容无关。你演示的代码不全。

replyChannel 的真正问题是必须向它发送消息。如果您的流程只是单向 - 发送、存储并且没有任何返回,那么您确实不会为此获得任何东西。这就是为什么要显示那些通道适配器。

观察消息旅程的最佳方式是为org.springframework.integration 类别打开调试日志记录。

虽然我看到您在ContextConfiguration 中声明了这些频道,但getRequestChannel 确实没有任何订阅者。因此没有人会消费你的消息,当然也没有人会给你回复。

请重新考虑您的测试类以使用真实的应用程序上下文。否则,如果您真的不测试您的流程,您将完全不清楚您想要实现什么......

【讨论】:

  • 感谢您的回复。添加了您需要的信息。
  • HandleRequestConfig 是否包含在测试配置中?你能确定你的RequestProcessor 被调用了吗?您可以将ChannelInterceptor 添加到forwardToExternalSystemQueue 频道并在测试中验证其交互。还要注意我们建议的测试框架:docs.spring.io/spring-integration/docs/5.0.5.RELEASE/reference/…
  • 感谢 Artem 的建议。
  • 我已将我的回复作为单独的答案输入,因为它太长,无法放入 cmets。
  • 您的方法和想法是正确的,只是我不确定的是,当您从测试用例向processRequestSubscribableChannel 发送消息时,是否真的调用了HandleRequestConfig
猜你喜欢
  • 2013-07-11
  • 2022-01-17
  • 2016-01-16
  • 1970-01-01
  • 2021-11-20
  • 2019-07-20
  • 2023-04-02
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多