【发布时间】:2014-11-27 13:34:47
【问题描述】:
我想使用 spring 集成同时处理 CSV 文件。每行都将转换为单独的消息。所以假设我在 CSV 文件中有 10K 行,我想启动 10 个线程,每一行都将传递给这个线程。如果有人向我展示任何示例示例,那就太好了。
谢谢
【问题讨论】:
我想使用 spring 集成同时处理 CSV 文件。每行都将转换为单独的消息。所以假设我在 CSV 文件中有 10K 行,我想启动 10 个线程,每一行都将传递给这个线程。如果有人向我展示任何示例示例,那就太好了。
谢谢
【问题讨论】:
以Spring Integration 4.0开头的<splitter>支持Iterator作为payload拆分。因此,如果<splitter> 的output-channel 是ExecutorChannel,您可以将入站File 转换为LineIterator 并并行处理每一行的消息:
<splitter input-channel="splitChannel" output-channel="executorChannel"
expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
【讨论】:
FlatFileItemReader(github.com/spring-projects/spring-batch/blob/master/…)
我正在使用 Spring Integration 4.1.0 并尝试了您的建议,但它似乎对我不起作用。 我今天对此进行了一些研究,现在倾向于将其视为 Spring Integration 4.1.0 错误。
看看我的解释是否有意义。
如果你尝试这个例子,你会发现它会起作用(注意这不是使用你的 SpEL 例子):
<?xml version="1.0" encoding="UTF-8"?>
<beans xmlns="http://www.springframework.org/schema/beans" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xmlns:context="http://www.springframework.org/schema/context" xmlns:int="http://www.springframework.org/schema/integration" xmlns:int-file="http://www.springframework.org/schema/integration/file" xmlns:int-stream="http://www.springframework.org/schema/integration/stream" xmlns:task="http://www.springframework.org/schema/task"
xsi:schemaLocation="http://www.springframework.org/schema/jms http://www.springframework.org/schema/jms/spring-jms.xsd
http://www.springframework.org/schema/integration http://www.springframework.org/schema/integration/spring-integration.xsd
http://www.springframework.org/schema/integration/file http://www.springframework.org/schema/integration/file/spring-integration-file.xsd
http://www.springframework.org/schema/integration/stream http://www.springframework.org/schema/integration/stream/spring-integration-stream.xsd
http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans.xsd
http://www.springframework.org/schema/context http://www.springframework.org/schema/context/spring-context.xsd
http://www.springframework.org/schema/task http://www.springframework.org/schema/task/spring-task.xsd">
<int:inbound-channel-adapter id="exchangeReplayFileAdapter" ref="exchangeReplayFileReadingMessageSource" method="receive" auto-startup="true" channel="channel1">
<int:poller fixed-delay="10000000" />
</int:inbound-channel-adapter>
<bean id="exchangeReplayFileReadingMessageSource" class="org.springframework.integration.file.FileReadingMessageSource">
<property name="directory" value="/tmp/inputdir" />
</bean>
<int:channel id="channel1">
<int:dispatcher task-executor="taskExecutor" />
</int:channel>
<int:splitter input-channel="channel1" output-channel="channel2">
<bean class="com.xxx.common.util.springintegration.FileSplitter" />
</int:splitter>
<int:channel id="channel2"></int:channel>
<int-stream:stdout-channel-adapter channel="channel2"></int-stream:stdout-channel-adapter>
<task:executor id="taskExecutor" pool-size="1" />
</beans>
有了这个Splitter 实现...
package com.xxx.common.util.springintegration;
import java.io.File;
import java.io.IOException;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.integration.splitter.AbstractMessageSplitter;
import org.springframework.integration.transformer.MessageTransformationException;
import org.springframework.messaging.Message;
import org.springframework.util.Assert;
public class FileSplitter extends AbstractMessageSplitter {
private static final Logger log = LoggerFactory.getLogger(FileSplitterNew.class);
public Object splitMessage(Message<?> message) {
if (log.isDebugEnabled()) {
log.debug(message.toString());
}
try {
Object payload = message.getPayload();
Assert.isInstanceOf(File.class, payload, "Expected java.io.File in the message payload");
return org.apache.commons.io.FileUtils.lineIterator((File) payload);
} catch (IOException e) {
String msg = "Unable to transform file: " + e.getMessage();
log.error(msg);
throw new MessageTransformationException(msg, e);
}
}
}
使用您的 SpEL 示例:
<int:splitter input-channel="exchangeReplayFiles" output-channel="exchangeSpringQueueChannel"
expression="T(org.apache.commons.io.FileUtils).lineIterator(payload)"/>
解析器内部创建的是这个(注意 List.class 类型传递到 ExpressionEvaluatingMessageProcessor 构造函数:
/**
* A Message Splitter implementation that evaluates the specified SpEL
* expression. The result of evaluation will typically be a Collection or
* Array. If the result is not a Collection or Array, then the single Object
* will be returned as the payload of a single reply Message.
*
* @author Mark Fisher
* @author Gary Russell
* @since 2.0
*/
public class ExpressionEvaluatingSplitter extends AbstractMessageProcessingSplitter {
@SuppressWarnings({"unchecked", "rawtypes"})
public ExpressionEvaluatingSplitter(Expression expression) {
super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
}
}
还有ExpressionEvaluatingMessageProcessor 类:
/**
* A {@link MessageProcessor} implementation that evaluates a SpEL expression
* with the Message itself as the root object within the evaluation context.
*
* @author Mark Fisher
* @author Artem Bilan
* @since 2.0
*/
public class ExpressionEvaluatingMessageProcessor<T> extends AbstractMessageProcessor<T> {
private final Expression expression;
private final Class<T> expectedType;
...
/**
* Create an {@link ExpressionEvaluatingMessageProcessor} for the given expression
* and expected type for its evaluation result.
* @param expression The expression.
* @param expectedType The expected type.
*/
public ExpressionEvaluatingMessageProcessor(Expression expression, Class<T> expectedType) {
Assert.notNull(expression, "The expression must not be null");
try {
this.expression = expression;
this.expectedType = expectedType;
}
catch (ParseException e) {
throw new IllegalArgumentException("Failed to parse expression.", e);
}
}
/**
* Processes the Message by evaluating the expression with that Message as the
* root object. The expression evaluation result Object will be returned.
* @param message The message.
* @return The result of processing the message.
*/
@Override
public T processMessage(Message<?> message) {
return this.evaluateExpression(this.expression, message, this.expectedType);
}
...
}
从提供的示例中返回的内容最终是一个包含单个 LineIterator 元素的 ArrayList(它实现了 Collection 接口)。
ExpressionEvaluatingSplitter 是AbstractMessageSplitter 的子类,它不会覆盖handleRequestMessage(Message<?> message) 方法。
该方法如下所示:
public abstract class AbstractMessageSplitter extends AbstractReplyProducingMessageHandler {
protected final Object handleRequestMessage(Message<?> message) {
Object result = this.splitMessage(message);
// return null if 'null'
if (result == null) {
return null;
}
Iterator<Object> iterator;
final int sequenceSize;
if (result instanceof Collection) {
Collection<Object> items = (Collection<Object>) result;
sequenceSize = items.size();
iterator = items.iterator();
}
else if (result.getClass().isArray()) {
Object[] items = (Object[]) result;
sequenceSize = items.length;
iterator = Arrays.asList(items).iterator();
}
else if (result instanceof Iterable<?>) {
sequenceSize = 0;
iterator = ((Iterable<Object>) result).iterator();
}
else if (result instanceof Iterator<?>) {
sequenceSize = 0;
iterator = (Iterator<Object>) result;
}
else {
sequenceSize = 1;
iterator = Collections.singleton(result).iterator();
}
if (!iterator.hasNext()) {
return null;
}
final MessageHeaders headers = message.getHeaders();
final Object correlationId = headers.getId();
final AtomicInteger sequenceNumber = new AtomicInteger(1);
return new FunctionIterator<Object, AbstractIntegrationMessageBuilder<?>>(iterator,
new Function<Object, AbstractIntegrationMessageBuilder<?>>() {
@Override
public AbstractIntegrationMessageBuilder<?> apply(Object object) {
return createBuilder(object, headers, correlationId, sequenceNumber.getAndIncrement(),
sequenceSize);
}
});
}
由于ArrayList 确实是Collection,它永远不会到达设置迭代器的逻辑,因此永远不会在produceOutput(...) 方法中调用迭代器上的next()。
那么为什么LineIterator 会合并成ArrayList?我相信ExpressionEvaluatingSplitter 有一个缺陷,它总是这样做:
public ExpressionEvaluatingSplitter(Expression expression) {
super(new ExpressionEvaluatingMessageProcessor(expression, List.class));
}
我认为在 Spring Integration 4 中,它现在应该查看表达式评估为的类型(List 或 Iterator)然后调用 super(可能需要重新处理如何完成,因为决定类型将是在调用 JVM 不允许的 super 之前完成)。
你怎么看?
【讨论】: