【发布时间】:2021-08-18 15:58:16
【问题描述】:
我有一个 RabbitMQ 触发的 Spring Batch 应用程序。
我需要nack异常消息。但是,Spring Batch 不会在 run 方法之外抛出异常。因此,我需要nack SkipListener 中的消息。我已设法将消息标记作为 JobParameter,但我无法将 Channel 作为 JobParameter 发送,因为它不是 可序列化。
有什么方法可以将 Channel 传递给批处理或任何其他错误处理方式?
这是我现在拥有的一个例子。
MessageConsumer.java
import com.project.common.dto.DataImportStartDto;
import com.rabbitmq.client.Channel;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.amqp.support.AmqpHeaders;
import org.springframework.messaging.handler.annotation.Header;
import org.springframework.stereotype.Service;
@Service
public class MessageConsumer {
private final DataImportService dataImportService;
public MessageConsumer(DataImportService dataImportService) {
this.dataImportService = dataImportService;
}
@RabbitListener(queues = "etl_queue", concurrency = "1", ackMode = "MANUAL")
public void receiveDataImportMessage(DataImportStartDto dataImportStartDto, Channel channel, @Header(AmqpHeaders.DELIVERY_TAG) long tag) throws Exception {
dataImportService.startImport(dataImportStartDto, tag);
channel.basicAck(tag, false);
}
数据导入服务.java
import org.springframework.batch.core.*;
import org.springframework.batch.core.launch.JobLauncher;
import org.springframework.beans.factory.annotation.Qualifier;
import org.springframework.stereotype.Service;
public class DataImportServiceImpl implements DataImportService {
private final JobLauncher jobLauncher;
private final Job dataImportJob;
@Service
public DataImportServiceImpl(JobLauncher jobLauncher, @Qualifier("dataImport") Job dataImportJob) {
this.jobLauncher = jobLauncher;
this.dataImportJob= dataImportJob;
}
@Override
public void startImport(DataImportStartDto dto, long tag) {
jobLauncher.run(dataImportJob, buildJobParameters(dto, tag));
}
}
private JobParameters buildJobParameters(DataImportStartDto dto, long tag) {
return new JobParametersBuilder()
.addString("unique", String.valueOf(UUID.randomUUID())) // needed if there are jobs with the same parameters running in parallel
.addLong("tag", tag)
.toJobParameters();
}
BatchExceptionHandler.java
import org.springframework.batch.core.SkipListener;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
@StepScope
@Component
public class BatchExceptionHandler implements SkipListener<IntervalDataWrapper<String>, IntervalDataWrapper<String>> {
@Value("#{jobParameters['tag']}")
private long tag; // this value is here on onSkipInRead execution
@Override
public void onSkipInRead(Throwable t) {
System.out.println(t);
// channel.basicNack(tag, false, true); // this is what I need to do here
}
@Override
public void onSkipInWrite(IntervalDataWrapper<String> item, Throwable t) {
System.out.println(t);
}
@Override
public void onSkipInProcess(IntervalDataWrapper<String> item, Throwable t) {
System.out.println(t);
}
}
IntervalDataWrapper 是我的批量配置自定义数据包装器。
【问题讨论】:
-
“当前 RabbitMQ 通道”是什么意思?您预先知道您的工作是从哪个渠道读取或写入以纠正?为什么不将该通道声明为 bean 并将其注入到您的跳过侦听器中?
-
Channel 被注入到使用@RabbitListener 注解的方法调用中,它不是一个Bean。我需要在 SpringBatch 中的方法调用之外访问这个通道。
-
我不是,没有这样的bean。我猜它是根据请求生成的
标签: java spring rabbitmq spring-batch channel