【问题标题】:How to read file using Apache Camel, process the file using Spring batch, read each line and route it back through Apache Camel如何使用 Apache Camel 读取文件,使用 Spring 批处理文件,读取每一行并通过 Apache Camel 将其路由回
【发布时间】:2018-04-24 23:30:29
【问题描述】:

想知道如何将 JAXBElement 传递给 Camel 路由,该路由从通过 Camel Route 加载的 Spring 批处理读取的批处理文件的每一行进行处理。

下面给出的代码 sn-ps 使用 customerWriter 方法调用 JMSTemplate 将消息写入队列。相反,我需要将消息路由到另一个 Camel 路由。

当前: CamelRoute -> ReadFile -> Spring Batch -> 处理每一行 -> Queue

预期: CamelRoute -> ReadFile -> Spring Batch -> 处理每一行 -> Camel Route

读取文件的骆驼路线:

@Override
public void configure()  {

    String fromUri = batchLoadPath + "?" + batchFileOptions;
    from(fromUri).process(new Processor() {
        public void process(Exchange msg)  {
            File file = msg.getIn().getBody(File.class);
            String fileName = file.getAbsolutePath();

            try {
                JobParameters jobParameters = new JobParametersBuilder().addString("input.file.name", fileName).addDate("dateTime", new Date()).toJobParameters();
                jobLauncher.run(importCustomerJob, jobParameters);
            } catch (Exception e) {
                log.error(Process file encountered error:" + e.getMessage(), e);
            }
        }

    })
    .to("log:EndBatch");

批量配置:

@Bean
public JmsItemWriter<String> customerWriter() {
    JmsItemWriter<String> writer = new JmsItemWriter<String>();
    writer.setJmsTemplate(jmsTemplate);
    return writer;
}

public Job importCustomerJob(JobCompletionNotificationListener listener, JobBuilderFactory jobBuilderFactory, Step step1) {
    JobBuilder builder = jobBuilderFactory.get("importCustomerJob");
    builder.incrementer(new RunIdIncrementer());
    builder.listener(listener);
    JobFlowBuilder jfb = builder.flow(step1);
    jfb.end();
    Job job = jfb.build().build();
    return job;
}

@Bean
public Step step1(StepBuilderFactory stepBuilderFactory) {
    // Read chunk of 10 records and writing those as messages to queue
    return stepBuilderFactory.get("step1")
            .<Customer, String>chunk(10)
            .reader(customerReader())
            .faultTolerant()
            .skipPolicy(fileVerificationSkipper())
            .processor(customerItemProcessor())
            .writer(customerWriter())
            .listener(customerReader())
            .build();
}

批处理器:

public class CustomerItemProcessor implements ItemProcessor<Customer, String> {
    @Autowired
    JaxbUtil jaxbUtil;

    public String process(Customer item) throws Exception {
        // Mapping code goes here
        JAXBElement<CustomerX> mobj = customerFactory.createCustomerX(cp);
        return jaxbUtil.objectToXml(mobj);
    }
}

【问题讨论】:

    标签: java apache-camel spring-batch reroute


    【解决方案1】:

    好吧,从骆驼的角度来看,要调用另一条骆驼路线,您只需添加.to() 语句。例如同步调用内存中的路由,可以使用direct:

    from(fromUri).process(new Processor() {
        public void process(Exchange msg)  {
            ... your processor impl
        }
    })
    .to("direct:yourOtherRoute")
    .to("log:EndBatch"); 
    
    from("direct:yourOtherRoute")
    ...
    

    要将处理器的结果传递到下一个路由,处理器必须将此结果设置到交换体中。

    【讨论】:

    • 我正在尝试将处理过的线从春季批次发送到骆驼路线,而不是骆驼路线到另一条骆驼路线。
    • 如果你想从你的 Java 代码中调用 Camel 路由,你必须使用 Remoting/Proxy 如Spring RemotingCamel-Proxy 或者设置一个具有可达端点(HTTP 或随便)。
    • 感谢@burki 的回复。让我试试那个解决方案。同时,如果您能分享任何示例代码,那就太好了
    • @Vijai 您也可以使用 ProducerTemplate 将 Java 代码中的任意数据发送到骆驼路由
    • @RomanVottner 谢谢!通过用 CamelItemWriter 替换 JMSItemWriter 并使用 ProducerTemplate 找到了解决方案。今天晚些时候将提供代码供参考。
    【解决方案2】:

    感谢@Burki 和@Roman Vottner 的建议。这是我修改的代码,它可以工作。

    解决办法:

    Batch Config 中添加了一个writer方法并调用它而不是JMSWriter

    @Bean
    public CamelItemWriter<String> customerCamelWriter() {
        ProducerTemplate producerTemplate = camelContext.createProducerTemplate();
        CamelItemWriter<String> writer = new CamelItemWriter<String>(producerTemplate, "direct:process");
        return writer;
    }
    
    @Bean
    public Step step1(StepBuilderFactory stepBuilderFactory) {
        // Read chunk of 10 records and writing those as messages to CAS.TRF.MDM queue
        return stepBuilderFactory.get("step1")
                .<Customer, String>chunk(10)
                .reader(customerReader())
                .faultTolerant()
                .skipPolicy(fileVerificationSkipper())
                .processor(customerItemProcessor())
                .writer(customerCamelWriter())
                .listener(customerReader())
                .build();
    }
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-05-09
      • 2021-05-26
      相关资源
      最近更新 更多