让您的阅读器只返回每一行的原始字符串,而不尝试在分隔符上进行拆分。制作一个处理器(必须是有状态的)来处理解析。唯一棘手的部分是,当您以某种方式到达 EOF 时,您必须向处理器发出信号,这样它就不会等着看它是否应该聚合下一行。像这样的:
public class AggregatingItemProcessor<T> implements ItemProcessor<T, T>, InitializingBean {
private BiPredicate<T, T> aggregatePredicate;
private BiFunction<T, T, T> aggregator;
public void setAggregatePredicate(BiPredicate<T, T> aggregatePredicate) {
this.aggregatePredicate = aggregatePredicate;
}
public void setAggregator(BiFunction<T, T, T> aggregator) {
this.aggregator = aggregator;
}
private T cur;
@Override
public T process(T item) throws Exception {
if(cur == null) {
cur = item;
return null;
}
if(aggregatePredicate.test(cur, item)) {
cur = aggregator.apply(cur, item);
return null;
} else {
T toRet = cur;
cur = item;
return toRet;
}
}
@Override
public void afterPropertiesSet() throws Exception {
Assert.notNull(aggregatePredicate, "Predicate to determine if records should be aggregated must not be null.");
Assert.notNull(aggregator, "Function for aggregating items must not be null.");
}
}
然后配置...
static final String EOF_MARKER = "\0";
@Bean
public FlatFileItemReader<String> reader() {
final FlatFileItemReader<String> reader = new FlatFileItemReader<String>() {
private boolean finished = false;
@Override
public String read() throws Exception, UnexpectedInputException, ParseException {
if(finished) return null;
String next = super.read();
if(next == null) {
finished = true;
return EOF_MARKER;
}
return next;
}
};
reader.setLineMapper((s, i) -> s);
return reader;
}
@Bean
public AggregatingItemProcessor<String> processor() {
final AggregatingItemProcessor<String> processor = new AggregatingItemProcessor<>();
processor.setAggregatePredicate((s1, s2) -> !EOF_MARKER.equals(s2) && StringUtils.countOccurrencesOf(s2, ",") < 2);
processor.setAggregator(String::concat);
return processor;
}