【发布时间】:2018-11-22 08:56:33
【问题描述】:
这是Spring Integration Executor Channel using annotations code sample 的后续问题。
我正在尝试通过将消息发布到“公共频道”并读取消息中设置的 REPLY_CHANNEL 来测试以红色突出显示的框。
“公共频道”是发布订阅频道。 REPLY_CHANNEL 是一个 QueueChannel。
由于这是一个 JUnit 测试,我模拟了 jdbcTemplate、数据源和 Impl 以忽略任何 DB 调用。
我的问题是: 当我在“公共频道”上发布消息时,我在 REPLY_CHANNEL 上没有收到任何消息。 junit 一直在等待响应。
我应该更改什么才能在 REPLY_CHANNEL 上获得响应?
@RunWith(SpringRunner.class)
@SpringBootTest
@ContextConfiguration(loader = AnnotationConfigContextLoader.class) --------- 1
@ActiveProfiles("test")
public class QueuetoQueueTest {
@Configuration
static class ContextConfiguration { ------------------------------------- 2
@Bean(name = "jdbcTemplate")
public JdbcTemplate jdbcTemplate() {
JdbcTemplate jdbcTemplateMock = Mockito.mock(JdbcTemplate.class);
return jdbcTemplateMock;
}
@Bean(name = "dataSource")
public DataSource dataSource() {
DataSource dataSourceMock = Mockito.mock(DataSource.class);
return dataSourceMock;
}
@Bean(name = "entityManager")
public EntityManager entityManager() {
EntityManager entityManagerMock = Mockito.mock(EntityManager.class);
return entityManagerMock;
}
@Bean(name = "ResponseChannel")
public QueueChannel getReplyQueueChannel() {
return new QueueChannel();
}
//This channel serves as the 'common channel' in the diagram
@Bean(name = "processRequestSubscribableChannel")
public MessageChannel getPublishSubscribeChannel() {
return new PublishSubscribeChannel();
}
}
@Mock
DBStoreDaoImpl dbStoreDaoImpl;
@Test
public void testDBConnectivity() {
Assert.assertTrue(true);
}
@InjectMocks -------------------------------------------------------------- 3
StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
@Autowired
@Qualifier("ResponseChannel")
QueueChannel ResponseChannel;
@Autowired
@Qualifier("processRequestSubscribableChannel")
MessageChannel processRequestSubscribableChannel;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void outboundtoQueueTest() {
try {
when(dbStoreDaoImpl.storeToDB(any()))
.thenReturn(1); ----------------------------------------------- 4
//create message
Message message = (Message<String>) MessageBuilder
.withPayload("Hello")
.setHeader(MessageHeaders.REPLY_CHANNEL, ResponseChannel)
.build();
//send message
processRequestSubscribableChannel.send(message);
System.out
.println("Listening on InstructionResponseHandlertoEventProcessorQueue");
//wait for response on reply channel
Message<?> response = ResponseChannel.receive(); ----------------------- 5
System.out.println("***************RECEIVED: "
+ response.toString());
} catch (Exception e) {
e.printStackTrace();
}
}
}
为 JUnit 加载“ContextConfiguration”,以便不访问 DB。
这是您在 JUnit 中加载自定义配置的方式,根据 https://spring.io/blog/2011/06/21/spring-3-1-m2-testing-with-configuration-classes-and-profiles
在配置类中,我们模拟 jdbcTemplate、dataSource、entityManager 并定义发布请求的“公共通道”和 ResponseChannel。
将jdbcTemplate、dataSource mock 注入StoretoDBConfig,这样DB就不会被命中
模拟 DaoImpl 类以便忽略 DB 调用
此处测试阻塞,因为 REPLY_CHANNEL 上没有响应
更新代码:
Code inside 5 (the class that reads from common channel):
@Configuration
class HandleRequestConfig {
//Common channel - refer diagram
@Autowired
PublishSubscribeChannel processRequestSubscribableChannel;
//Step 9 - This channel is used to send queue to the downstream system
@Autowired
PublishSubscribeChannel forwardToExternalSystemQueue;
public void handle() {
IntegrationFlows.from("processRequestSubscribableChannel") // Read from 'Common channel'
.wireTap(flow->flow.handle(msg -> System.out.println("Msg received on processRequestSubscribableChannel"+ msg.getPayload())))
.handle(RequestProcessor,"validateMessage") // Perform custom business logic - no logic for now, return the msg as is
.wireTap(flow->flow.handle(msg -> System.out.println("Msg received on RequestProcessor"+ msg.getPayload())))
.channel("forwardToExternalSystemQueue"); // Post to 'Channel to another system'
}
}
//Code inside step 8 - 'Custom Business Logic'
@Configuration
class RequestProcessor {
public Message<?> validateMessage(Message<?> msg) {
return msg;
}
}
我想要达到的目标: 我有用于业务逻辑的单独的 junit 测试用例。我正在尝试测试当请求发布到“公共通道”时,会在“到另一个系统的通道”上收到响应。
为什么我不能使用原来的 ApplicationContext: 因为它连接到数据库,我不希望我的 JUnit 连接到数据库或使用嵌入式数据库。我希望忽略对数据库的任何调用。
我已将回复通道设置为“ResponseChannel”,“自定义业务逻辑”不应该将其响应发送到“ResponseChannel”吗?
如果我必须在不同的频道上收听回复,我愿意这样做。我只想测试我在“公共频道”上发送的消息是否在“到其他系统的频道”上收到。
更新 2: 解决 Artem 的问题。 感谢 Artem 的建议。
'HandlerRequestConfig' 是否包含在测试配置中? - 我们不能直接调用 handle() 方法。相反,我想如果我在“processRequestSubscribableChannel”上发帖,HandleRequestConfig 中的 handle() 方法将被调用,因为它在同一个频道上进行侦听。这是错的吗?那我该如何测试 HandleRequestConfig.handle() 方法呢?
我在 HandleRequestConfig 中每个步骤的末尾添加了窃听(代码已更新)。我发现没有打印任何窃听消息。这意味着我发布的消息甚至没有到达输入通道“processRequestSubscribableChannel”。我做错了什么?
注意:我尝试在 Configuration 中删除 'processRequestSubscribableChannel' bean(以便使用 applicationContext 中的实际 'processRequestSubscribableChannel')。我收到一个不满意的依赖错误 - 预期至少有 1 个配置为 PublishSubscribeChannel 的 bean。
更新 3:要求发布 Artem 的详细信息。
@RunWith(SpringRunner.class)
@SpringBootTest
public class QueuetoQueueTest {
// Step 1 - Mocking jdbcTemplate, dataSource, entityManager so that it doesn't connect to the DB
@MockBean
@Qualifier("jdbcTemplate")
JdbcTemplate jdbcTemplate;
@MockBean
@Qualifier("dataSource")
public DataSource dataSource;
@MockBean
@Qualifier("entityManager")
public EntityManager entityManager;
@Bean(name = "ResponseChannel")
public PublishSubscribeChannel getReplyQueueChannel() {
return new PublishSubscribeChannel();
}
//Mocking the DB class
@MockBean
@Qualifier("dbStoreDaoImpl")
DBStoreDaoImpl dbStoreDaoImpl ;
//Inject the mock objects created above into the flow that stores data into the DB.
@InjectMocks
StoretoDBConfig storetoDBConfig = new StoretoDBConfig();
//Step 2 - Injecting MessageChannel used in the actual ApplicationContext
@Autowired
@Qualifier("processRequestSubscribableChannel")
MessageChannel processRequestSubscribableChannel;
@Before
public void setUp() throws Exception {
MockitoAnnotations.initMocks(this);
}
@Test
public void outboundtoQueueTest() {
try {
when(dbStoreDaoImpl.storeToDB(any()))
.thenReturn(1);
//create message
Message message = (Message<?>) MessageBuilder
.withPayload("Hello")
.build();
//send message - this channel is the actual channel used in ApplicationContext
processRequestSubscribableChannel.send(message);
} catch (Exception e) {
e.printStackTrace();
}
}
}
我得到的错误:代码尝试连接到数据库并引发错误。
更新 1:StoretoDBConfig 中的代码
@Configuration
@EnableIntegration
public class StoretoDBConfig {
@Autowired
DataSource dataSource;
/*
* Below code is irrelevant to our current problem - Including for reference.
*
* storing into DB is delegated to a separate thread.
*
* @Bean
* public TaskExecutor taskExecutor() {
* return new SimpleAsyncTaskExecutor();
* }
*
* @Bean(name="executorChannelToDB")
* public ExecutorChannel outboundRequests() {
* return new ExecutorChannel(taskExecutor());
* }
* @Bean(name = "DBFailureChannel")
* public static MessageChannel getFailureChannel() {
* return new DirectChannel();
* }
* private static final Logger logger = Logger
* .getLogger(InstructionResponseHandlerOutboundtoDBConfig.class);
*/
@Bean
public IntegrationFlow handle() {
/*
* Read from 'common channel' - processRequestSubscribableChannel and send to separate thread that stores into DB.
*
/
return IntegrationFlows
.from("processRequestSubscribableChannel")
.channel("executorChannelToDB").get();
}
}
在单独线程上存储到 DB 中的代码:
@Repository
public class DBStoreDaoImpl implements DBStoreDao {
private JdbcTemplate jdbcTemplate;
@Autowired
public void setJdbcTemplate(DataSource dataSource) {
this.jdbcTemplate = new JdbcTemplate(dataSource);
}
@Override
@Transactional(rollbackFor = Exception.class)
@ServiceActivator(inputChannel = "executorChannelToDB")
public void storetoDB(Message<?> msg) throws Exception {
String insertQuery ="Insert into DBTable(MESSAGE) VALUES(?)";
jdbcTemplate.update(insertQuery, msg.toString());
}
}
【问题讨论】:
-
@GaryRussell,你能帮忙吗?