【问题标题】:Spring integrtion mqtt and JPA transactionSpring集成mqtt和JPA事务
【发布时间】:2021-08-24 19:35:52
【问题描述】:

上下文:

我有一个 Spring Integration Flux 管理来自 MQTT 代理的数据。

我使用 MqttPahoMessageDrivenChannelAdapter 从 mqtt 获取消息。

所有处理都在单个线程(MQTT 线程)中完成,我没有使用任何轮询器。

我正在使用 JPA/Hibernate/MySQL

我面临的问题

mqtt 事件处理流程涉及多个转换器和路由器,但会创建多个事务。

问题

如何创建一个 uniq 事务来处理每条 mqtt 消息? 我正在使用 Java 配置。

感谢您的帮助。

我的配置摘录:

@Bean
public MqttPahoMessageDrivenChannelAdapter rak7249ChannelAdapter(
        MqttConfig rak7249MqttConfig, 
        MqttPahoClientFactory rak7249MqttClientFactory, 
        MessageChannel rak7249Channel,
        MqttRak7249Converter mqttRak7249Converter) {
    var adapter = ConfigUtils.mqttChannelAdapter("application/+/device/+/+", rak7249MqttConfig, rak7249MqttClientFactory, rak7249Channel, mqttRak7249Converter);

    return adapter;
}

@Bean
@Transformer(inputChannel = rak7249Channel, outputChannel = loraChannel)
public Rak7249FluxLoraTransformer rak7249ToLoraMessageTransformer(Rak7249Service service) {
    return new Rak7249FluxLoraTransformer(service);
}

@Bean
@Splitter(inputChannel = loraChannel)
public AbstractSimpleMessageHandlerFactoryBean<AbstractMessageSplitter> loraSplitter(LoraService service, MessageChannel mixedLoraChannel) {
    return new AbstractSimpleMessageHandlerFactoryBean<AbstractMessageSplitter>() {

        @Override
        protected AbstractMessageSplitter createHandler() {
            var splitter = new LoraMessageSplitter(service);
            splitter.setOutputChannel(mixedLoraChannel);
            return splitter;
        }
    };
}

@Bean
@Router(inputChannel = mixedLoraChannel)
public PayloadTypeRouter mixedLoraRouter() {

    var router = new PayloadTypeRouter();
    router.setChannelMapping(ThingEncodedMessage.class.getName(), thingEncodedDataChannel);
    router.setChannelMapping(LoraMetricMessage.class.getName(), loraMetricChannel);

    return router;
}

@Bean
@ServiceActivator(inputChannel = loraMetricChannel)
public MessageHandler writeLoraMetric(WriteInfluxDB influxDB, WriteSQL sql) {
    return new AbstractMessageHandler() {

        @Override
        @Transactional
        protected void handleMessageInternal(Message<?> imessage) {
            sql.writePoint(imessage);

            // Not a transactional resource
            influxDB.writePoint(imessage);
        }
    };
}

...等等

【问题讨论】:

  • 你需要展示你的代码和配置。
  • @GaryRussell ,我确实添加了我的 java 配置的摘录,并更新了问题描述。 tks.
  • @Transactional 添加到适配器bean 定义中没有任何作用;那只是一个bean定义。服务方法上的@Transactional 应该可以正常工作,这就是事务开始的时间;您的“//非事务性”评论是什么意思?我在那里没有看到“多笔交易”。
  • 感谢加里的帮助。 Not transactional 注释指出 influxDB 资源不是事务的一部分。但通量涉及其他事务性服务。现在,每个服务都在单独的事务中运行。我想在一个 tx 中运行所有内容,每个 mqtt 消息一个事务。

标签: transactions spring-integration mqtt


【解决方案1】:

使用“交易网关”;使用 Java DSL 最容易描述:

@SpringBootApplication
@EnableTransactionManagement
public class So68913184Application {

    public static void main(String[] args) {
        SpringApplication.run(So68913184Application.class, args);
    }

    @Bean
    public MqttPahoClientFactory mqttClientFactory() {
        DefaultMqttPahoClientFactory factory = new DefaultMqttPahoClientFactory();
        MqttConnectOptions options = new MqttConnectOptions();
        options.setServerURIs(new String[] { "tcp://localhost:1883" });
        options.setUserName("guest");
        options.setPassword("guest".toCharArray());
        factory.setConnectionOptions(options);
        return factory;
    }

    @Bean
    public MessageProducerSupport mqttInbound(MqttPahoClientFactory mqttClientFactory) {
        MqttPahoMessageDrivenChannelAdapter adapter = new MqttPahoMessageDrivenChannelAdapter("siSampleConsumer",
                mqttClientFactory, "siSampleTopic");
        adapter.setCompletionTimeout(5000);
        adapter.setConverter(new DefaultPahoMessageConverter());
        adapter.setQos(1);
        return adapter;
    }

    @Bean
    public IntegrationFlow mqttInFlow(MessageProducerSupport mqttInbound, TransactionalGateway gate) {
        return IntegrationFlows.from(mqttInbound)
                .transform(p -> p + ", received from MQTT")
                .log()
                .handle(msg -> gate.sendIntransaction(msg))
                .get();
    }

    @Bean
    public IntegrationFlow mainFlow(Service service) {
        return f -> f.handle(service);
    }

}

@MessagingGateway(defaultRequestChannel = "mainFlow.input")
interface TransactionalGateway {

    @Transactional
    void sendIntransaction(Message<?> msg);

}

@Component
class Service {

    public void process(Message<?> msg) {
        System.out.println(msg);
    }

}

网关下游的所有东西(在同一个线程上)都在同一个事务中运行。

【讨论】:

  • 好的,知道了。感谢您的帮助,加里。
猜你喜欢
  • 2017-07-23
  • 1970-01-01
  • 2017-06-24
  • 1970-01-01
  • 1970-01-01
  • 2018-03-10
  • 2013-04-10
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多