【问题标题】:Get current RabbitMQ Channel in Spring Batch SkipListener在 Spring Batch SkipListener 中获取当前 RabbitMQ 通道
【发布时间】:2021-08-18 15:58:16
【问题描述】:

我有一个 RabbitMQ 触发的 Spring Batch 应用程序。

我需要nack异常消息。但是,Spring Batch 不会在 run 方法之外抛出异常。因此,我需要nack SkipListener 中的消息。我已设法将消息标记作为 JobParameter,但我无法将 Channel 作为 JobParameter 发送,因为它不是 可序列化。

有什么方法可以将 Channel 传递给批处理或任何其他错误处理方式?

这是我现在拥有的一个例子。

MessageConsumer.java

import com.project.common.dto.DataImportStartDto;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;

@Service
public class MessageConsumer {

    private final DataImportService dataImportService;

    public MessageConsumer(DataImportService dataImportService) {
        this.dataImportService = dataImportService;
    }

    @RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
    public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
        dataImportService.startImport(dataImportStartDto, tag);
        channel.basicAck(tag, false);
    }

数据导入服务.java

import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;

public class DataImportServiceImpl implements DataImportService {

private final JobLauncher jobLauncher;
private final Job dataImportJob;

@Service
public DataImportServiceImpl(JobLauncher jobLauncher, @Qualifier("dataImport") Job dataImportJob) {
    this.jobLauncher = jobLauncher;
    this.dataImportJob= dataImportJob;
}

@Override
public void startImport(DataImportStartDto dto, long tag) {
    jobLauncher.run(dataImportJob, buildJobParameters(dto, tag));
    }
}

private JobParameters buildJobParameters(DataImportStartDto dto, long tag) {
    return new JobParametersBuilder()
        .addString("unique", String.valueOf(UUID.randomUUID())) // needed if there are jobs with the same parameters running in parallel
        .addLong("tag", tag)
        .toJobParameters();
}

BatchExceptionHandler.java

import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;

@StepScope
@Component
public class BatchExceptionHandler implements SkipListener<IntervalDataWrapper<String>, IntervalDataWrapper<String>> {

    @Value("#{jobParameters['tag']}")
    private long tag; // this value is here on onSkipInRead execution


    @Override
    public void onSkipInRead(Throwable t) {
        System.out.println(t);
        // channel.basicNack(tag, false, true); // this is what I need to do here
    }

    @Override
    public void onSkipInWrite(IntervalDataWrapper<String> item, Throwable t) {
        System.out.println(t);
    }

    @Override
    public void onSkipInProcess(IntervalDataWrapper<String> item, Throwable t) {
        System.out.println(t);
    }
}

IntervalDataWrapper 是我的批量配置自定义数据包装器。

【问题讨论】:

  • “当前 RabbitMQ 通道”是什么意思?您预先知道您的工作是从哪个渠道读取或写入以纠正?为什么不将该通道声明为 bean 并将其注入到您的跳过侦听器中?
  • Channel 被注入到使用@RabbitListener 注解的方法调用中,它不是一个Bean。我需要在 SpringBatch 中的方法调用之外访问这个通道。
  • 我不是,没有这样的bean。我猜它是根据请求生成的

标签: java spring rabbitmq spring-batch channel


【解决方案1】:

我相信您不应该在跳过侦听器中这样做nack。这些调用与 IMO 的抽象级别不同。我从您的实现中了解到,当读取出现错误时,您想nack。您当前的设置是:

@RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

   // run a job
   dataImportService.startImport(dataImportStartDto, tag); // send nack = true from within the job

   // when the job is finished, send nack = false
   channel.basicAck(tag, false);
}

如您所见,一个确认是从作业内部发送的,另一个确认是从作业外部发送的。我会做的是,如果有一个可跳过的异常,我会将作业的退出状态设置为FINISHED_WITH_SKIPS 之类的东西(或者甚至在需要时失败),然后检查作业的状态以了解应该使用哪种类型的确认被发送到频道:

@RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
    public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {

   JobExecution jobExecution = dataImportService.startImport(dataImportStartDto, tag);
   if (jobExecution.getExitStatus().getExitCode().equals("FINISHED_WITH_SKIPS") {
      channel.basicNack(tag, false, true);
   } else {
      channel.basicAck(tag, false);
   };
        
}

这种方法不需要将通道注入到跳过侦听器中。此外,这种方法将批处理逻辑与消息传递逻辑分开,更易于测试、部署和思考。

【讨论】:

  • 哦对了,jobExecution 也给了我getAllFailureExceptions,我不知道。我可以列出所有的例外情况。非常感谢。
猜你喜欢
  • 1970-01-01
  • 2011-11-30
  • 2020-10-22
  • 2019-06-17
  • 1970-01-01
  • 2016-05-14
  • 1970-01-01
  • 1970-01-01
  • 2017-08-30
相关资源
最近更新 更多