【问题标题】:Read CSV file concurrently using Spring Integration使用 Spring Integration 并发读取 CSV 文件
【发布时间】:2014-11-27 13:34:47
【问题描述】:

我想使用 spring 集成同时处理 CSV 文件。每行都将转换为单独的消息。所以假设我在 CSV 文件中有 10K 行,我想启动 10 个线程,每一行都将传递给这个线程。如果有人向我展示任何示例示例,那就太好了。

谢谢

【问题讨论】:

    标签: spring-integration


    【解决方案1】:

    Spring Integration 4.0开头的<splitter>支持Iterator作为payload拆分。因此,如果<splitter>output-channelExecutorChannel,您可以将入站File 转换为LineIterator 并并行处理每一行的消息:

    <splitter input-channel="splitChannel" output-channel="executorChannel"
              expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
    

    【讨论】:

    【解决方案2】:

    我正在使用 Spring Integration 4.1.0 并尝试了您的建议,但它似乎对我不起作用。 我今天对此进行了一些研究,现在倾向于将其视为 Spring Integration 4.1.0 错误。

    看看我的解释是否有意义。

    如果你尝试这个例子,你会发现它会起作用(注意这不是使用你的 SpEL 例子):

    <?xml version="1.0" encoding="UTF-8"?>
    <beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:task="http://www.springframework.org/schema/task"
        xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
            http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
            http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
            http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
            http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
            http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
            http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
    
        <int:inbound-channel-adapter id="exchangeReplayFileAdapter" ref="exchangeReplayFileReadingMessageSource" method="receive" auto-startup="true" channel="channel1">
            <int:poller fixed-delay="10000000" />
        </int:inbound-channel-adapter>
    
        <bean id="exchangeReplayFileReadingMessageSource" class="org.springframework.integration.file.FileReadingMessageSource">
            <property name="directory" value="/tmp/inputdir" />
        </bean>
    
        <int:channel id="channel1">
            <int:dispatcher task-executor="taskExecutor" />
        </int:channel>
    
        <int:splitter input-channel="channel1" output-channel="channel2">
            <bean class="com.xxx.common.util.springintegration.FileSplitter" />
        </int:splitter>
    
        <int:channel id="channel2"></int:channel>
        <int-stream:stdout-channel-adapter channel="channel2"></int-stream:stdout-channel-adapter>
    
        <task:executor id="taskExecutor" pool-size="1" />
    </beans>
    

    有了这个Splitter 实现...

    package com.xxx.common.util.springintegration;
    
    import java.io.File;
    import java.io.IOException;
    
    import org.slf4j.Logger;
    import org.slf4j.LoggerFactory;
    import org.springframework.integration.splitter.AbstractMessageSplitter;
    import org.springframework.integration.transformer.MessageTransformationException;
    import org.springframework.messaging.Message;
    import org.springframework.util.Assert;
    
    public class FileSplitter extends AbstractMessageSplitter {
        private static final Logger log = LoggerFactory.getLogger(FileSplitterNew.class);
    
        public Object splitMessage(Message<?> message) {
            if (log.isDebugEnabled()) {
                log.debug(message.toString());
            }
            try {
    
                Object payload = message.getPayload();
                Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
                return org.apache.commons.io.FileUtils.lineIterator((File) payload);
            } catch (IOException e) {
                String msg = "Unable to transform file: " + e.getMessage();
                log.error(msg);
                throw new MessageTransformationException(msg, e);
            }
        }
    
    }
    

    使用您的 SpEL 示例:

    <int:splitter input-channel="exchangeReplayFiles" output-channel="exchangeSpringQueueChannel"  
        expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
    

    解析器内部创建的是这个(注意 List.class 类型传递到 ExpressionEvaluatingMessageProcessor 构造函数:

    /**
     * A Message Splitter implementation that evaluates the specified SpEL
     * expression. The result of evaluation will typically be a Collection or
     * Array. If the result is not a Collection or Array, then the single Object
     * will be returned as the payload of a single reply Message.
     *
     * @author Mark Fisher
     * @author Gary Russell
     * @since 2.0
     */
    public class ExpressionEvaluatingSplitter extends AbstractMessageProcessingSplitter {
    
        @SuppressWarnings({"unchecked", "rawtypes"})
        public ExpressionEvaluatingSplitter(Expression expression) {
            super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
        }
    
    }
    

    还有ExpressionEvaluatingMessageProcessor 类:

    /**
     * A {@link MessageProcessor} implementation that evaluates a SpEL expression
     * with the Message itself as the root object within the evaluation context.
     *
     * @author Mark Fisher
     * @author Artem Bilan
     * @since 2.0
     */
    public class ExpressionEvaluatingMessageProcessor<T> extends AbstractMessageProcessor<T> {
    
        private final Expression expression;
    
        private final Class<T> expectedType;
    
    
      ...
        /**
         * Create an {@link ExpressionEvaluatingMessageProcessor} for the given expression
         * and expected type for its evaluation result.
         * @param expression The expression.
         * @param expectedType The expected type.
         */
        public ExpressionEvaluatingMessageProcessor(Expression expression, Class<T> expectedType) {
            Assert.notNull(expression, "The expression must not be null");
            try {
                this.expression = expression;
                this.expectedType = expectedType;
            }
            catch (ParseException e) {
                throw new IllegalArgumentException("Failed to parse expression.", e);
            }
        }
    
        /**
         * Processes the Message by evaluating the expression with that Message as the
         * root object. The expression evaluation result Object will be returned.
         * @param message The message.
         * @return The result of processing the message.
         */
        @Override
        public T processMessage(Message<?> message) {
            return this.evaluateExpression(this.expression, message, this.expectedType);
        }
    ...
    
    }
    

    从提供的示例中返回的内容最终是一个包含单个 LineIterator 元素的 ArrayList(它实现了 Collection 接口)。

    ExpressionEvaluatingSplitterAbstractMessageSplitter 的子类,它不会覆盖handleRequestMessage(Message&lt;?&gt; message) 方法。
    该方法如下所示:

    public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
        protected final Object handleRequestMessage(Message<?> message) {
            Object result = this.splitMessage(message);
            // return null if 'null'
            if (result == null) {
                return null;
            }
    
            Iterator<Object> iterator;
            final int sequenceSize;
            if (result instanceof Collection) {
                Collection<Object> items = (Collection<Object>) result;
                sequenceSize = items.size();
                iterator = items.iterator();
            }
            else if (result.getClass().isArray()) {
                Object[] items = (Object[]) result;
                sequenceSize = items.length;
                iterator = Arrays.asList(items).iterator();
            }
            else if (result instanceof Iterable<?>) {
                sequenceSize = 0;
                iterator = ((Iterable<Object>) result).iterator();
            }
            else if (result instanceof Iterator<?>) {
                sequenceSize = 0;
                iterator = (Iterator<Object>) result;
            }
            else {
                sequenceSize = 1;
                iterator = Collections.singleton(result).iterator();
            }
    
            if (!iterator.hasNext()) {
                return null;
            }
    
            final MessageHeaders headers = message.getHeaders();
            final Object correlationId = headers.getId();
            final AtomicInteger sequenceNumber = new AtomicInteger(1);
    
            return new FunctionIterator<Object, AbstractIntegrationMessageBuilder<?>>(iterator,
                    new Function<Object, AbstractIntegrationMessageBuilder<?>>() {
                        @Override
                        public AbstractIntegrationMessageBuilder<?> apply(Object object) {
                            return createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(),
                                    sequenceSize);
                        }
                    });
        }
    

    由于ArrayList 确实是Collection,它永远不会到达设置迭代器的逻辑,因此永远不会在produceOutput(...) 方法中调用迭代器上的next()

    那么为什么LineIterator 会合并成ArrayList?我相信ExpressionEvaluatingSplitter 有一个缺陷,它总是这样做:

    public ExpressionEvaluatingSplitter(Expression expression) {
        super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
    }
    

    我认为在 Spring Integration 4 中,它现在应该查看表达式评估为的类型(ListIterator)然后调用 super(可能需要重新处理如何完成,因为决定类型将是在调用 JVM 不允许的 super 之前完成)。

    你怎么看?

    【讨论】:

      猜你喜欢
      • 2022-10-05
      • 1970-01-01
      • 2015-03-26
      • 1970-01-01
      • 2014-11-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多