【发布时间】:2021-08-24 19:35:52
【问题描述】:
上下文:
我有一个 Spring Integration Flux 管理来自 MQTT 代理的数据。
我使用 MqttPahoMessageDrivenChannelAdapter 从 mqtt 获取消息。
所有处理都在单个线程(MQTT 线程)中完成,我没有使用任何轮询器。
我正在使用 JPA/Hibernate/MySQL
我面临的问题
mqtt 事件处理流程涉及多个转换器和路由器,但会创建多个事务。
问题
如何创建一个 uniq 事务来处理每条 mqtt 消息? 我正在使用 Java 配置。
感谢您的帮助。
我的配置摘录:
@Bean
public MqttPahoMessageDrivenChannelAdapter rak7249ChannelAdapter(
MqttConfig rak7249MqttConfig,
MqttPahoClientFactory rak7249MqttClientFactory,
MessageChannel rak7249Channel,
MqttRak7249Converter mqttRak7249Converter) {
var adapter = ConfigUtils.mqttChannelAdapter("application/+/device/+/+", rak7249MqttConfig, rak7249MqttClientFactory, rak7249Channel, mqttRak7249Converter);
return adapter;
}
@Bean
@Transformer(inputChannel = rak7249Channel, outputChannel = loraChannel)
public Rak7249FluxLoraTransformer rak7249ToLoraMessageTransformer(Rak7249Service service) {
return new Rak7249FluxLoraTransformer(service);
}
@Bean
@Splitter(inputChannel = loraChannel)
public AbstractSimpleMessageHandlerFactoryBean<AbstractMessageSplitter> loraSplitter(LoraService service, MessageChannel mixedLoraChannel) {
return new AbstractSimpleMessageHandlerFactoryBean<AbstractMessageSplitter>() {
@Override
protected AbstractMessageSplitter createHandler() {
var splitter = new LoraMessageSplitter(service);
splitter.setOutputChannel(mixedLoraChannel);
return splitter;
}
};
}
@Bean
@Router(inputChannel = mixedLoraChannel)
public PayloadTypeRouter mixedLoraRouter() {
var router = new PayloadTypeRouter();
router.setChannelMapping(ThingEncodedMessage.class.getName(), thingEncodedDataChannel);
router.setChannelMapping(LoraMetricMessage.class.getName(), loraMetricChannel);
return router;
}
@Bean
@ServiceActivator(inputChannel = loraMetricChannel)
public MessageHandler writeLoraMetric(WriteInfluxDB influxDB, WriteSQL sql) {
return new AbstractMessageHandler() {
@Override
@Transactional
protected void handleMessageInternal(Message<?> imessage) {
sql.writePoint(imessage);
// Not a transactional resource
influxDB.writePoint(imessage);
}
};
}
...等等
【问题讨论】:
-
你需要展示你的代码和配置。
-
@GaryRussell ,我确实添加了我的 java 配置的摘录,并更新了问题描述。 tks.
-
将
@Transactional添加到适配器bean 定义中没有任何作用;那只是一个bean定义。服务方法上的@Transactional应该可以正常工作,这就是事务开始的时间;您的“//非事务性”评论是什么意思?我在那里没有看到“多笔交易”。 -
感谢加里的帮助。 Not transactional 注释指出 influxDB 资源不是事务的一部分。但通量涉及其他事务性服务。现在,每个服务都在单独的事务中运行。我想在一个 tx 中运行所有内容,每个 mqtt 消息一个事务。
标签: transactions spring-integration mqtt