【问题标题】:Spring Batch - Reading multiple line log messageSpring Batch - 读取多行日志消息
【发布时间】:2013-10-26 21:56:50
【问题描述】:

我在配置了 Spring 集成的 Spring Batch 应用程序中将多行日志消息作为单条消息读取时遇到问题,该应用程序必须将多行日志消息(例如异常堆栈跟踪)作为单条消息读取,稍后再读取必须对消息进行处理和分类以进行进一步索引。 每行由它的时间戳标识(上面提到的模式,即 DATE_PATTERN),它可能会继续多行,我试图继续阅读一条消息,直到我通过覆盖看到另一个时间戳 来自 SimpleRecordSeparatorPolicy 的 isEndOfRecord 方法,当第二行到达 preProcess 方法时,我为 isEndOfRecord 返回 true 但这没有按预期工作,谁能帮助我通过识别时间戳模式来读取提到的日志文件?

我使用 org.springframework.batch.item.file.FlatFileItemReader 和 org.springframework.batch.item.file.mapping.PassThroughLineMapper 作为映射器。

请查看完整消息,

1) 日志消息文件:sample-message-test.log

2013-10-19 07:05:32.253 [My First Class..] LOG LEVEl  first-message-line-1 first-message-line-1 first-message-line-1 first-message-line-1 first-message-line-1 first-message-line-1 
first-message-line-2 first-message-line-2 first-message-line-2 
first-message-line-3 first-message-line-3 first-message-line-3 
first-message-line-4 first-message-line-4 first-message-line-4 
first-message-line-5 first-message-line-5 
first-message-line-6 
2013-10-19 07:05:32.257 [My Second Class..] LOG LEVEl  second-message-line-1 second-message-line-1 second-message-line-1 second-message-line-1 second-message-line-1 second-message-line-1 
second-message-line-2 second-message-line-2 second-message-line-2 
second-message-line-3 second-message-line-3 second-message-line-3 
second-message-line-4 second-message-line-4 second-message-line-4 
second-message-line-5 second-message-line-5 
second-message-line-6
2013-10-19 07:05:32.259 [My Third Class..] LOG LEVEl  third-message-line-1 third-message-line-1 third-message-line-1 third-message-line-1 third-message-line-1 third-message-line-1 
third-message-line-2 third-message-line-2 third-message-line-2 
third-message-line-3 third-message-line-3 third-message-line-3 
third-message-line-4 third-message-line-4 third-message-line-4 
third-message-line-5 third-message-line-5 
third-message-line-6

2) 批量配置文件

<batch:job id="fileReadingJob">
        <batch:step id="flatFileReadingStep">
            <batch:tasklet >
                <batch:chunk reader="reader" writer="writer" commit-interval="10" />
            </batch:tasklet>
        </batch:step>
    </batch:job>

    <bean id="reader" class="org.springframework.batch.item.file.FlatFileItemReader"  scope="step">
        <property name="lineMapper">
            <bean class="org.springframework.batch.item.file.mapping.PassThroughLineMapper"/>
        </property>
        <property name="bufferedReaderFactory">
            <bean class="org.springframework.batch.item.file.DefaultBufferedReaderFactory"/>
        </property>
        <property name="recordSeparatorPolicy" >
            <bean class="com.batchlog.explorer.batchio.FlatFileRecordSeperationPolicy"/>
        </property>
        <property name="resource" value="file:///#{systemProperties['logfolder']}/#{jobParameters['inputfile']}" />
    </bean>
    <bean id="writer" class="com.batchlog.explorer.batchio.FlatFileWriter" scope="step"/>
........

3)

public class FlatFileRecordSeperationPolicy extends SimpleRecordSeparatorPolicy {

    public static final String STARTING_OF_THE_LINE = "-STARTING_OF_THE_LINE-";
    public static final String CONTINUATION_OF_THE_FILE  = "-CONTINUATION_OF_THE_FILE-";
    public static final String END_OF_THE_LINE = "-END_OF_THE_LINE-";
    public static final String END_OF_THE_LINE_CHARACER = " \n ";
    public static final String DATE_PATTERN ="^(?>\\d\\d){1,2}-(?:0?[1-9]|1[0-2])-(\\s)?(?:2[0123]|[01][0-9]):? (?:[0-5][0-9])(?::?(?:(?:[0-5][0-9]|60)(?:[.,][0-9]+)?))?(?:Z|[+-](?:2[0123]|[01][0-9])(?::?(?:[0-5][0-9])))?.*?";


    @Override
        public boolean isEndOfRecord(String line) {
            if(line.matches(DATE_PATTERN) || line.startsWith(STARTING_OF_THE_LINE)
                            || line.contains(CONTINUATION_OF_THE_FILE) || line.startsWith(END_OF_THE_LINE)){
                if(isNextLineStarts(line) || line.startsWith(END_OF_THE_LINE)){
                    return true;//to break line
                }
        }
        return false; //to conitnue line

    private boolean isNextLineStarts(String preProcessOfLine){
            if(preProcessOfLine.contains(CONTINUATION_OF_THE_FILE) && !preProcessOfLine.endsWith(CONTINUATION_OF_THE_FILE)){
                String[] lines = preProcessOfLine.split(CONTINUATION_OF_THE_FILE);
                if(lines[1].trim().matches(DATE_PATTERN)){
                    return true;
                }
            }
            return false;
    }
    @Override
        public String preProcess(String line) {
            if(line.matches(DATE_PATTERN) && !line.contains(CONTINUATION_OF_THE_FILE)){
                line = new StringBuilder(STARTING_OF_THE_LINE).append(line).toString();
            }else if(line.startsWith(STARTING_OF_THE_LINE) && !line.contains(CONTINUATION_OF_THE_FILE)){
                line =  new StringBuilder(line.substring(STARTING_OF_THE_LINE.length())).append(CONTINUATION_OF_THE_FILE).toString();
            }else if(line.contains(CONTINUATION_OF_THE_FILE) && !line.endsWith(CONTINUATION_OF_THE_FILE)){
                String[] lines = line.split(CONTINUATION_OF_THE_FILE);
                if(lines[1].trim().matches(DATE_PATTERN)){
                    line = new StringBuilder(END_OF_THE_LINE).append(lines[0]).toString();//.append(lines[1]).toString();
                }else{
                    line = new StringBuilder(lines[0]).append(lines[1]).append(CONTINUATION_OF_THE_FILE).toString();
                }
            }
                return super.preProcess(line);
    }
    @Override
        public String postProcess(String record) {
            if(record.startsWith(END_OF_THE_LINE)){
                record = new StringBuilder(record.substring(END_OF_THE_LINE.length())).toString();
            }else if(record.contains(CONTINUATION_OF_THE_FILE) && !record.endsWith(CONTINUATION_OF_THE_FILE)){
                String[] lines = record.split(CONTINUATION_OF_THE_FILE);
                if(lines[1].trim().matches(DATE_PATTERN)){
                    record = new StringBuilder(END_OF_THE_LINE).append(lines[0]).toString();
                }else{
                    record = new StringBuilder(lines[0]).append(lines[1]).toString();
                }
            }
            return super.postProcess(record);
    }

【问题讨论】:

    标签: spring spring-integration spring-batch


    【解决方案1】:

    按照multiorder-line example 中的描述或this post 中的描述编写您自己的ItemReader。

    【讨论】:

    • 嗨贝拉,感谢您的回复,当我在 GIT 中搜索时,我是如何结束做同样的事情的。问候 Ashok G
    • 嗨 Bellabax,感谢您的建议,直到现在我才花时间在这上面,因为这是我在空闲时间做的 POC。我刚刚按照您的建议编写了自己的 ItemWriter & Reader 并且效果很好。我在 read Method 中处理了所有这些逻辑,为了获得更大的灵活性,我也实现了 Chunk 监听器,以便在我的阅读器中获取上下文。感谢您的帮助。
    【解决方案2】:

    您的问题不属于 RecordSeparatorPolicy.isEndOfRecord(String) 范式。 isEndOfRecored 可以很好地在最后一行放置带衬里的结尾。
    例如在 DefaultRecordSeparatorPolicy 它确保你有一个偶数 引号。最后的报价包含在所需的记录中。在您的情况下,您将过度阅读一行。

    您使用 postProcess 和 preProcess 的基本想法可能有效,但是当您到达 EOL 并且 readline 返回 null 时,您仍然会在最后一行从 FlatFileItemReader 获得 FlatFileParseException,请参阅 FlatFileItemReader 中的 applyRecordSeparatorPolicy(String line)。

      private String applyRecordSeparatorPolicy(String line) throws IOException {
    
            String record = line;
            while (line != null && !recordSeparatorPolicy.isEndOfRecord(record)) {
                line = this.reader.readLine();
                if (line == null) {
    
                    if (StringUtils.hasText(record)) {
                        // A record was partially complete since it hasn't ended but
                        // the line is null
                        throw new FlatFileParseException("Unexpected end of file before record complete", record, lineCount);
                    }
                    else {
                        // Record has no text but it might still be post processed
                        // to something (skipping preProcess since that was already
                        // done)
                        break;
                    }
                }
                else {
                    lineCount++;
                }
                record = recordSeparatorPolicy.preProcess(record) + line;
            }
    
            return recordSeparatorPolicy.postProcess(record);
    
        }
    

    在这种情况下,您的输出文件将丢失基于提交间隔和 isEndOfRecord 逻辑的行。

    所以基本上我建议使用不同的方法,bellabax 解决方案对您有用吗?

    【讨论】:

    • 嗨,Haim,很抱歉回复晚了,赶上了其他一些优先事项,没有机会再次处理此问题。你是对的,实际上这种方法不适合我(我的意思是 isEndOfTheRecord),正如你所说。我必须尝试 bellabax 解决方案,我会尽快更新您。问候 Ashok Gudise
    猜你喜欢
    • 2016-02-19
    • 2012-04-13
    • 1970-01-01
    • 2019-01-31
    • 2015-01-06
    • 1970-01-01
    • 2023-03-29
    • 2020-05-26
    • 1970-01-01
    相关资源
    最近更新 更多