【问题标题】:Apache Camel : GBs of data from database routed to JMS endpointApache Camel:从数据库路由到 JMS 端点的 GB 数据
【发布时间】:2012-04-23 10:14:40
【问题描述】:

我现在用骆驼做过一些小项目,但我很难理解的一件事是在骆驼路线中消费时如何处理大数据(不适合内存)。

我有一个数据库,其中包含几 GB 的数据,我想使用 camel 进行路由。显然,将所有数据读入内存不是一种选择。

如果我是作为一个独立的应用程序执行此操作,我将编写用于分页数据并将块发送到我的 JMS 端点的代码。我想使用骆驼,因为它提供了一个很好的模式。如果我从文件中消费,我可以使用 streaming() 调用。

我还应该使用 camel-sql/camel-jdbc/camel-jpa 还是使用 bean 从我的数据库中读取数据。

希望大家还在我身边。我对 Java DSL 更熟悉,但如果有人能提供任何帮助/建议,我将不胜感激。

更新:2012 年 5 月 2 日

所以我有一些时间来解决这个问题,我认为我实际上在做的是滥用 Producer 的概念,以便我可以在路线中使用它。

public class MyCustomRouteBuilder extends RouteBuilder {

    public void configure(){
         from("timer:foo?period=60s").to("mycustomcomponent:TEST");

         from("direct:msg").process(new Processor() {
               public void process(Exchange ex) throws Exception{
                   System.out.println("Receiving value" : + ex.getIn().getBody() );
               }
         }
    }

}

我的制作人如下所示。为清楚起见,我没有包含 CustomEndpoint 或 CustomComponent,因为它似乎只是一个薄包装器。

public class MyCustomProducer extends DefaultProducer{ 

    Endpoint e;
    CamelContext c;

    public MyCustomProducer(Endpoint epoint){
          super(endpoint)   
          this.e = epoint;
          this.c = e.getCamelContext();
    }

    public void process(Exchange ex) throws Exceptions{

        Endpoint directEndpoint = c.getEndpoint("direct:msg");
        ProducerTemplate t = new DefaultProducerTemplate(c);

        // Simulate streaming operation / chunking of BIG data.
        for (int i=0; i <20 ; i++){
           t.start();
           String s ="Value " + i ;                  
           t.sendBody(directEndpoint, value)
           t.stop();         
        }
    }
} 

首先,上面看起来不是很干净。执行此操作的最简洁方法似乎是通过计划的石英作业填充 jms 队列(代替直接:msg),然后我的骆驼路线会使用该作业,以便我可以更灵活地控制骆驼中收到的消息大小管道。但是,我非常喜欢将基于时间的激活设置为 Route 的一部分的语义。

有没有人对最好的方法有任何想法。

【问题讨论】:

    标签: apache apache-camel


    【解决方案1】:

    在我的理解中,你需要做的就是:

    from("jpa:SomeEntity" + 
        "?consumer.query=select e from SomeEntity e where e.processed = false" +
        "&maximumResults=150" +
        "&consumeDelete=false")
    .to("jms:queue:entities");
    

    maximumResults 定义了每次查询获得的实体数量的限制。

    实体实例处理完毕后,需要设置e.processed = true;persist(),这样实体就不会再被处理了。

    一种方法是使用@Consumed 注释:

    class SomeEntity {
        @Consumed
        public void markAsProcessed() {
            setProcessed(true);
        }
    }
    

    另一件事,您需要注意的是在将实体发送到队列之前如何对其进行序列化。您可能需要在 from 和 to 之间使用 enricher

    【讨论】:

    • 查看骆驼行动手册后,我想我可以先给出这个或使用自定义组件来获得更多控制权。转换应该是一个问题,因为我将使用自定义转换器/转换。
    • 我发现有用的是只序列化实体的主键,让处理逻辑来检索实际实体。因此,您不必进行任何复杂的自定义转换。最后,实体通过其键来区分。
    • 您也可以尝试使用 Apache Nifi,它似乎更适合这种用例。
    猜你喜欢
    • 1970-01-01
    • 2020-02-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-09-29
    • 2019-05-27
    • 1970-01-01
    相关资源
    最近更新 更多