【问题标题】:Streaming in datamapper in mule esb在 mule esb 中的 datamapper 中流式传输
【发布时间】:2015-11-30 07:02:23
【问题描述】:

我需要从一个大小在 100MB-200MB 的文件中获取数据(input.xml),并且需要根据某些逻辑写入四个不同的文件。

输入 xml:

            <?xml version="1.0"?>
            <Orders>
                 <Order><OrderId>1</OrderId><Total>10</Total><Name>jon1</Name></Order>
                <Order><OrderId>2</OrderId><Total>20</Total><Name>jon2</Name></Order>
                <Order><OrderId>3</OrderId><Total>30</Total><Name>jon3</Name></Order>
                <Order><OrderId>4</OrderId><Total>40</Total><Name>jon4</Name></Order>
            <Orders>

逻辑是,如果 Total 为 1-10,则写入 file1,如果 Total 为 11-20,则写入 file2.....,

预期输出:

1 10 jon1 -->写入文件1

2 20 jon2 -->写入file2

3 30 jon3 -->写入file3

4 40 jon4 -->写入file4

在这里,我已在配置中的 datamapper 中启用流式传输,但我没有得到正确的输出。问题是我只将一些重新编码到一个文件中,在满足条件后应该进入该文件。

但如果我在 datamapper 中禁用流媒体按钮,它工作正常。由于有大量记录,我必须使用流式传输选项。

有没有其他方法可以配置数据映射器以启用流选项..?

请在这方面给我建议。谢谢。

【问题讨论】:

    标签: streaming mule datamapper


    【解决方案1】:

    如果不详细说明您在做什么,就很难看出问题。 不过,我认为这可能会帮助您尝试另一种方法。

    数据映射器会将完整的 XML 文档加载到内存中,尽管您激活了流,但它必须这样做才能支持 XPATH(它将完整的 xml 输入加载到 DOM 中)。 因此,如果您无法将 200Mb 的文档加载到内存中,您将需要尝试一种解决方法。

    我之前所做的是创建一个 java 组件,它在 stax 解析器的帮助下将输入流转换为迭代器。通过一个非常简单的实现,您可以编写一个从流中提取的迭代器来创建下一个元素(一个 pojo、一个映射、一个字符串......)。在 mule 流程中,在“java 组件”之后,您应该能够使用带有“选择”的“for-each”并应用您的逻辑。

    数据的简单示例:

    package tests;
    
    import java.io.InputStream;
    import java.util.HashMap;
    import java.util.Iterator;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import javax.xml.stream.FactoryConfigurationError;
    import javax.xml.stream.XMLInputFactory;
    import javax.xml.stream.XMLStreamConstants;
    import javax.xml.stream.XMLStreamException;
    import javax.xml.stream.XMLStreamReader;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    
    public class OrdersStreamIterator implements Iterator<Map<String,String>> {
    
        final static Log LOGGER = LogFactory.getLog(OrdersStreamIterator.class);
    
        final InputStream is;
        final XMLStreamReader xmlReader;
    
        boolean end = false;
        HashMap<String,String> next;
    
        public OrdersStreamIterator(InputStream is)
                throws XMLStreamException, FactoryConfigurationError {
            this.is = is;
            xmlReader = XMLInputFactory.newInstance().createXMLStreamReader(is);
        }
    
        protected HashMap<String,String> _next() throws XMLStreamException {
            int event;
            HashMap<String,String> order = null;
            String orderChild = null;
            String orderChildValue = null;
            while (xmlReader.hasNext()) {
                event = xmlReader.getEventType();
                if (event == XMLStreamConstants.START_ELEMENT) {
                    if (order==null) {
                        if (checkOrder()) {
                            order = new HashMap<String,String>();
                        }
                    }
                    else {
                        orderChild = xmlReader.getLocalName();
                    }
                }
                else if (event == XMLStreamConstants.END_ELEMENT) {
                    if (checkOrders()) {
                        end = true;
                        return null;
                    }
                    else if (checkOrder()) {
                        xmlReader.next();
                        return order;
                    }
                    else if (order!=null) {
                        order.put(orderChild, orderChildValue);
                        orderChild = null;
                        orderChildValue = null;
                    }
                }
                else if (order!=null && orderChild!=null){
                    switch (event) {
                    case XMLStreamConstants.SPACE:
                    case XMLStreamConstants.CHARACTERS:
                    case XMLStreamConstants.CDATA:
                        int start = xmlReader.getTextStart();
                        int length = xmlReader.getTextLength();
                        if (orderChildValue==null) {
                            orderChildValue = new String(xmlReader.getTextCharacters(), start, length);
                        }
                        else {
                            orderChildValue += new String(xmlReader.getTextCharacters(), start, length);
                        }
                        break;
                    }
                }
                xmlReader.next();
            }
            end = true;
            return null;
        }
    
        protected boolean checkOrder() {
            return "Order".equals(xmlReader.getLocalName());
        }
    
        protected boolean checkOrders() {
            return "Orders".equals(xmlReader.getLocalName());
        }
    
        @Override
        public boolean hasNext() {
            if (end) {
                return false;
            }
            else if (next==null) {
                try {
                    next = _next();
                } catch (XMLStreamException e) {
                    LOGGER.error(e.getMessage(), e);
                    end = true;
                }
                return !end;
            }
            else {
                return true;
            }
        }
    
    
        @Override
        public Map<String,String> next() {
            if (hasNext()) {
                final HashMap<String,String> n = next;
                next = null;
                return n;
            }
            else {
                return null;
            }
        }
    
    
        @Override
        public void remove() {
            throw new RuntimeException("ReadOnly!");
        }
    
        // Test
    
        public static String dump(Map<String,String> o) {
            String s = "{";
            for (Entry<String,String> e : o.entrySet()) {
                if (s.length()>1) {
                    s+=", ";
                }
                s+= "\"" + e.getKey() + "\" : \"" + e.getValue() + "\"";
            }
            return s + "}";
        }
    
        public static void main(String[] argv) throws XMLStreamException, FactoryConfigurationError {
            final InputStream is = OrdersStreamIterator.class.getClassLoader().getResourceAsStream("orders.xml");
            final OrdersStreamIterator i = new OrdersStreamIterator(is);
            while (i.hasNext()) {
                System.out.println(dump(i.next()));
            }
        }
    }
    

    示例流程:

     <flow name="testsFlow">
            <http:listener config-ref="HTTP_Listener_Configuration" path="/" doc:name="HTTP"/>
            <scripting:component doc:name="Groovy">
                <scripting:script engine="Groovy"><![CDATA[return tests.OrdersStreamIterator.class.getClassLoader().getResourceAsStream("orders.xml");]]></scripting:script>
            </scripting:component>
            <set-payload value="#[new tests.OrdersStreamIterator(payload)]" doc:name="Iterator"/>
            <foreach doc:name="For Each">
                <logger message="#[tests.OrdersStreamIterator.dump(payload)]" level="INFO" doc:name="Logger"/>
            </foreach>
        </flow>
    

    【讨论】:

    • 您好 Jose,我对 mule 有点陌生,如果您不介意可以分享一些示例代码吗?
    • 我已经用一个简单的例子更新了答案。请记住,这只是一种方法的示例,可能还有其他(和更好的)解决方案。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2015-03-14
    • 2015-05-30
    • 2016-02-12
    • 1970-01-01
    • 1970-01-01
    • 2013-05-08
    • 1970-01-01
    相关资源
    最近更新 更多