【问题标题】:How to create a custom aggregator in Mule?如何在 Mule 中创建自定义聚合器?
【发布时间】:2011-10-01 18:42:21
【问题描述】:

在 mule 3.x 中创建完全自定义聚合器的推荐方法是什么?完全自定义,我的意思是根据我自己的逻辑,不使用相关 ID、消息计数等。

mulesoft 网站上的文档已经过时,说要使用 3.x 中不存在的 AbstractEventAggregator:

http://www.mulesoft.org/documentation/display/MULE3USER/Message+Splitting+and+Aggregatio

深入挖掘,看起来这个类在 3.x 中已重命名为 AbstractAggregator:

http://www.mulesoft.org/docs/site/3.2.0/apidocs/org/mule/routing/AbstractAggregator.html

但是,没有示例说明如何使用它。上面第一个链接中描述的 LoanBroker 示例实际上使用了关联聚合器(在 2.x 示例中,我假设这是文档所指的内容)。

曾经有一个抽象类具有抽象方法shouldAggregate 和doAggregate。这是我想扩展的那种类。

【问题讨论】:

    标签: mule aggregator


    【解决方案1】:

    查看下面的TestAggregator 以了解抽象聚合器的子类化示例。

    import org.mule.DefaultMuleEvent;
    import org.mule.DefaultMuleMessage;
    import org.mule.api.MuleContext;
    import org.mule.api.MuleEvent;
    import org.mule.api.store.ObjectStoreException;
    import org.mule.api.transformer.TransformerException;
    import org.mule.routing.AbstractAggregator;
    import org.mule.routing.AggregationException;
    import org.mule.routing.EventGroup;
    import org.mule.routing.correlation.CollectionCorrelatorCallback;
    import org.mule.routing.correlation.EventCorrelatorCallback;
    import org.mule.util.concurrent.ThreadNameHelper;
    
    import java.util.Iterator;
    
    public class TestAggregator extends AbstractAggregator
    {
        @Override
        protected EventCorrelatorCallback getCorrelatorCallback(MuleContext muleContext)
        {
            return new CollectionCorrelatorCallback(muleContext,false,storePrefix)
            {
                @Override
                public MuleEvent aggregateEvents(EventGroup events) throws AggregationException
                {
                    StringBuffer buffer = new StringBuffer(128);
    
                    try
                    {
                        for (Iterator<MuleEvent> iterator = events.iterator(); iterator.hasNext();)
                        {
                            MuleEvent event = iterator.next();
                            try
                            {
                                buffer.append(event.transformMessageToString());
                            }
                            catch (TransformerException e)
                            {
                                throw new AggregationException(events, null, e);
                            }
                        }
                    }
                    catch (ObjectStoreException e)
                    {
                        throw new AggregationException(events,null,e);
                    }
    
                    logger.debug("event payload is: " + buffer.toString());
                    return new DefaultMuleEvent(new DefaultMuleMessage(buffer.toString(), muleContext), events.getMessageCollectionEvent());
                }
            };
        }
    }
    

    【讨论】:

    • 谢谢!我会试试这个。我应该重写 CollectionCorrelatorCallback.shouldAggregateEvents() 以提供我自己的逻辑,还是建议创建我自己的实现 EventCorrelatorCallback 的类?
    • 您宁愿实现自己的 EventCorrelatorCallback,因为您根本不依赖消息计数(我理解为:您不会依赖 event.getMessage().getCorrelationGroupSize()确定要聚合的事件数)。
    • 我发现 EventCorrelatorCallback 由 EventCorrelator 调用,它假定按相关 ID 对事件进行分组。所以我认为我需要通过“人为”将它设置在某处的传入消息上来使用correlationId。其他选择是维护我自己的数据结构来保存事件。有没有更好的办法?
    • 其实你需要实现一个与你正在创建的自定义EventCorrelatorCallback一致的MessageInfoMapping,并在EventCorrelator上进行配置。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-11-24
    • 2018-10-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2022-01-12
    相关资源
    最近更新 更多