【发布时间】:2017-07-19 00:47:15
【问题描述】:
我有一个扫描文件以进行处理的集成流程。由于可能有多个处理器扫描同一个目录,因此我添加了“.nioLocker()”以防止其他 JVM 的处理器处理该文件。
这是流程配置:
IntegrationFlows.from( // Scan files from input dir
s -> s.file(new File(fileInputDir))
.preventDuplicates(true)
.nioLocker()
.regexFilter("(.)*\\.[xX][mM][lL]|(.)+\\.[dD][nN][eE]"), // to match any case of the letters XML
p -> p.poller(Pollers.fixedRate(filePollerInterval)
.taskExecutor(new ScheduledThreadPoolExecutor(filePoolSize))
)
现在,问题是即使在我调用 BufferedReader.readLine 时运行一个处理器,我也会收到一个异常,指出文件被锁定
java.io.IOException: The process cannot access the file because another process has locked a portion of the file
at java.io.FileInputStream.readBytes(Native Method)
at java.io.FileInputStream.read(FileInputStream.java:255)
at sun.nio.cs.StreamDecoder.readBytes(StreamDecoder.java:284)
at sun.nio.cs.StreamDecoder.implRead(StreamDecoder.java:326)
at sun.nio.cs.StreamDecoder.read(StreamDecoder.java:178)
at java.io.InputStreamReader.read(InputStreamReader.java:184)
at java.io.BufferedReader.fill(BufferedReader.java:161)
at java.io.BufferedReader.readLine(BufferedReader.java:324)
at java.io.BufferedReader.readLine(BufferedReader.java:389)
我试图通过调用来释放锁
private NioFileLocker fileLocker = new NioFileLocker();
fileLocker.unlock(file);
但这不起作用! (我怀疑是因为它是从不同于储物柜的线程调用的,但我不确定)
获取锁的正确方法是什么?有没有更好的方法来确保只有一个处理器可以访问资源?
--------------编辑------------------ ---------------
所以我采取了额外的步骤来确保锁定文件的线程与从其文件通道读取的线程相同。为此,我使用了直接渠道。 (之前,传递给 fileSplitter 的消息是通过 QueueChannel 执行的,它会在不同的线程上执行 send())。我仍然收到错误
2017-07-21 11:22:03.316 INFO 336488 --- [ main] c.f.e.m.i.MailerInboundApplication : Started MailerInboundApplication in 13.541 seconds (JVM running for 14.419)
2017-07-21 11:22:09.946 INFO 336488 --- [ask-scheduler-5] o.s.i.file.FileReadingMessageSource : Created message: [GenericMessage [payload=input\EMAIL92770.9352177.20170617.xml, headers={id=5dba6d62-b0a5-508e-48a9-cfddfa3b331f, timestamp=1500654129946}]]
2017-07-21 11:22:09.962 DEBUG 336488 --- [ask-scheduler-5] c.f.edd.mailer.inbound.core.FileRouter : fileRouter received message: GenericMessage [payload=input\EMAIL92770.9352177.20170617.xml, headers={CORRELATION_ID=92770.9352177.20170617, id=32a8846d-5425-0b
ee-657e-8767e1fb6105, timestamp=1500654129962}]
2017-07-21 11:22:09.962 DEBUG 336488 --- [ask-scheduler-5] c.f.e.mailer.inbound.core.FileSplitter : fileSplitter received message: GenericMessage [payload=input\EMAIL92770.9352177.20170617.xml, headers={CORRELATION_ID=92770.9352177.20170617, id=32a8846d-5425-
0bee-657e-8767e1fb6105, timestamp=1500654129962}]
java.io.IOException: The process cannot access the file because another process has locked a portion of the file
at sun.nio.ch.FileDispatcherImpl.read0(Native Method)
at sun.nio.ch.FileDispatcherImpl.read(FileDispatcherImpl.java:61)
at sun.nio.ch.IOUtil.readIntoNativeBuffer(IOUtil.java:223)
at sun.nio.ch.IOUtil.read(IOUtil.java:197)
at sun.nio.ch.FileChannelImpl.read(FileChannelImpl.java:159)
at com.fiserv.edd.mailer.inbound.core.FileSplitter.splitMessage(FileSplitter.java:93)
FileSplitter.java 中的代码:
@Override
protected Object splitMessage(Message<?> message) {
String correlationId = (String) message.getHeaders().get("CORRELATION_ID"); //Save the correlation ID so we can use it to send the DNE/CLP file later
File file = (File) message.getPayload();
String inputFileName = file.getName();
log.info(LogEvent.getBuilder().withMessageId(inputFileName)
.withMessage("Processing file: " + inputFileName).build());
long startTime = System.currentTimeMillis();
Optional<InputHeader> inputHeader = Optional.empty();// headerParser.parse(file);
ParsingReport pr = new ParsingReport(inputFileName);
try (RandomAccessFile lfs = new RandomAccessFile(file.getAbsolutePath(), "rw")){
FileChannel fc = lfs.getChannel();
byte[] bytes = new byte[1024];
fc.read(ByteBuffer.wrap(bytes));
System.out.println(new String(bytes));
} catch (FileNotFoundException e) {
e.printStackTrace();
} catch (IOException e) {
e.printStackTrace();
}
return Collections.EMPTY_LIST;
}
【问题讨论】:
-
检查这里,请github.com/artembilan/spring-integration/commit/…。主要技巧是您只能通过该锁中的
FileChannel访问锁定文件的内容 -
@ArtemBilan:谢谢。现在的问题变成了如何满足从 java.io.File 或 InputStream 读取的某些类接口的期望。看来我必须使用实现“FileChannel 读取代码”的 read 方法来实现 InputStream。这是推荐的吗?
-
听起来不错。抱歉,我没有深入研究这个主题。
-
@ArtemBilan:仅允许通过锁定文件的同一线程访问文件的内容。工作流的组件是否在同一个线程上执行?换句话说,如果我的流程由带有 nioFilelocker 的 sftp 适配器组成,该适配器将带有文件作为有效负载的消息传递给需要访问它的文件拆分器,那么文件拆分器不应该在与储物柜相同的线程上执行,所以它可以访问内容吗?
-
如果你不明确地这样做,一切都会在同一个线程上调用
标签: java synchronization spring-integration file-locking