【问题标题】:Stop accepting input on pipe but read buffered data停止接受管道输入但读取缓冲数据
【发布时间】:2013-07-26 22:41:49
【问题描述】:

我有一个从$stdin 读取数据并对数据进行一些处理的应用程序。我想放入一个信号处理程序来捕获 SIGINT/SIGTERM 并正常关闭(意味着完成数据处理并在完成后退出)。棘手的部分是我希望它停止从 STDIN 读取但能够处理任何缓冲数据。这样可以启动另一个应用程序并通过相同的 STDIN 管道并从前一个应用程序停止的地方继续处理。

问题是,如果我关闭 STDIN,缓冲的内容都会丢失,或者至少无法访问。

基本上我正在尝试这个:

#!/usr/bin/ruby

Signal.trap('INT') do
    $stdin.close
end

f = File.open('/tmp/out', 'a')
while (data = $stdin.read(4096)) != "" do
    f.write(data)
end

它立即在$stdin.read 调用上给出IOError 异常,即使我知道它读取了一些数据(strace 显示它)。

(我不需要关闭管道,我只是为了打破while 循环。如果有更优雅的方法来打破循环并获取缓冲数据,我会愉快地接受它。)


我知道这种方法适用于操作系统级别(管道缓冲区在传递给另一个应用程序时会保留),因为我可以进行以下测试并且不会丢失任何数据:

# source.rb
i = 0
loop do
    puts "%08d" % (i += 1)
end

.

# reader.rb
$stdout.write($stdin.read(9))
$stdin.close

.

ruby /tmp/source.rb | while true; do ruby reader.rb; sleep 1; done
00000001
00000002
00000003
00000004
00000005

【问题讨论】:

  • 您希望在关闭管道末端和打开新进程之间的这段时间内会发生什么?如果客户端在没有监听的情况下不断将数据推送到管道中怎么办?这不是管道的工作方式。如果您需要更具弹性的机制,您可能不得不改用消息队列系统。
  • @JimGarrison 它应该缓冲数据。这就是完全管道的工作方式(至少在 linux 中,我不能代表其他操作系统)。编辑:见stackoverflow.com/questions/2715324/…

标签: ruby linux


【解决方案1】:

解决这个问题的一种方法是在关闭原始文件之前复制文件描述符,然后错误将打破循环,您可以从未关闭的重复文件句柄中读取其余数据。

(对不起,如果这段代码不好,我不知道 ruby​​)

#!/usr/bin/ruby

require 'fcntl'

stdin_dup = nil

Signal.trap('INT') do
  stdin_dup = File.for_fd($stdout.fcntl(Fcntl::F_DUPFD))
  $stdin.close
end

f = File.open('/tmp/out', 'a')
begin
  while (data = $stdin.read(4096)) != "" do
    f.write(data)
  end
rescue IOError
  # finish stuff with stdin_dup here
end

【讨论】:

  • 您的示例代码让您欺骗了$stdout。但是,当我在 $stdin 上尝试它时,我仍然会遇到异常,因为在管道关闭时之前的 $stdin.read() 调用仍然处于挂起状态。此外,当我复制 STDIN 并使用复制的对象时,我仍然可以从传入的流中读取。我不想再从流中读取任何数据,只需抓取缓冲区中的内容即可。
  • @Patrick 在这种情况下,没有(据我所知)原语可以帮助你做你想做的事。您提供的带有通往多个程序的管道的 shell 示例只是重复的,它实际上并没有做与此示例不同的任何事情。我更希望看到你关于打破循环的评论,但我可能误解了它。
  • @Patrick 如果不尝试尽可能快地读取缓冲数据(非阻塞,因此您知道何时结束),您将无法获取缓冲数据。这有一个问题,它可能永远不会结束,因为管道可能会比它被清空的速度更快地重新填充。
  • 另外,修复了标准输入/标准输出错误,不知道是怎么发生的。
【解决方案2】:

在为此苦苦挣扎了几天之后,我最终不得不放弃 IO.read 并改用 IO.sysread 并自己做缓冲。这个解决方案真的没有那么复杂,下面是实现。

Signal.trap('INT') do
    $stdin.close
end

def myread(bufio, bytes) # `bufio` is a StringIO object, `bytes` is bytes to read
    begin
        while bufio.size < bytes do
            bufio.write($stdin.sysread(bytes - bufio.size))
        end
    rescue SignalException, Interrupt, Errno::EINTR => e
        retry
    rescue SystemCallError, IOError, EOFError => e
        # nothing, we're done
    end
end

我的确切代码与我使用 AWS ruby​​ SDK 时的代码略有不同,因此 myread 方法实际上只是传递给 AWS::S3::S3Object.write 的一个块

【讨论】:

    猜你喜欢
    • 2012-03-27
    • 2022-01-12
    • 1970-01-01
    • 2023-03-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2014-03-13
    • 1970-01-01
    相关资源
    最近更新 更多