【问题标题】:How to use ruby fibers to avoid blocking IO如何使用 ruby​​ 纤程避免阻塞 IO
【发布时间】:2011-01-23 00:00:51
【问题描述】:

我需要将目录中的一堆文件上传到 S3。由于上传所需的 90% 以上的时间都花在等待 http 请求完成上,我想以某种方式一次执行其中的几个。

Fibers 能帮我解决这个问题吗?它们被描述为解决此类问题的一种方法,但我想不出在 http 调用阻塞时可以做任何工作的任何方法。

有什么方法可以在没有线程的情况下解决这个问题?

【问题讨论】:

  • 那么,任何人都可以评论纤维本身吗?我假设纤维没有“在后台做事”的能力是否正确?

标签: ruby ruby-1.9


【解决方案1】:

我不支持 1.9 中的纤维,但 1.8.6 中的常规线程可以解决这个问题。尝试使用队列http://ruby-doc.org/stdlib/libdoc/thread/rdoc/classes/Queue.html

查看文档中的示例,您的消费者是上传的部分。它“使用”一个 URL 和一个文件,并上传数据。生产者是程序的一部分,它继续工作并找到要上传的新文件。

如果您想一次上传多个文件,只需为每个文件启动一个新线程:

t = Thread.new do
  upload_file(param1, param2)
end
@all_threads << t

然后,稍后在您的“生产者”代码中(记住,不必在自己的线程中,它可以是主程序):

@all_threads.each do |t|
  t.join if t.alive?
end

队列可以是@member_variable 或$global。

【讨论】:

  • 虽然我的光纤问题似乎没有得到答复
【解决方案2】:

回答您的实际问题:

Fibers 能帮我解决这个问题吗?

不,他们不能。 Jörg W Mittag explains why best.

不,您不能使用 Fibers 进行并发处理。 Fiber 根本不是并发构造,它们是控制流构造,就像异常一样。这就是 Fibers 的全部意义所在:它们从不并行运行,它们是合作的并且是确定性的。纤维是协程。 (事实上​​,我一直不明白为什么它们不被简单地称为协程。)

Ruby 中唯一的并发构造是线程。

当他说 Ruby 中唯一的并发结构是线程时,请记住 Ruby 有许多不同的实现,并且它们的线程实现各不相同。 Jörg 再次provides a great answer 这些差异;并正确得出结论,只有像 JRuby(使用映射到本机线程的 JVM 线程)或分叉您的进程这样的东西才能实现真正的并行性。

有什么方法可以在没有线程的情况下解决这个问题?

除了分叉你的进程,我还建议你查看EventMachineem-http-request 之类的东西。它是一个事件驱动、非阻塞、基于reactor pattern 的 HTTP 客户端,它是异步的,不会产生线程开销。

【讨论】:

【解决方案3】:

Aaron Patterson (@tenderlove) 使用了一个几乎与您完全相同的示例来准确描述为什么您可以并且应该在您的情况下使用线程来实现并发。

现在大多数 I/O 库都足够智能,可以在执行 IO 时释放 GVL(全局 VM 锁,或者大多数人称之为 GIL 或全局解释器锁)。 C 中有一个简单的函数调用来执行此操作。您无需担心 C 代码,但对您而言,这意味着大多数值得一提的 IO 库将释放 GVL 并允许其他线程执行,而正在执行 IO 的线程等待数据返回.

如果我刚才说的令人困惑,你不必太担心。您需要知道的主要事情是,如果您使用一个像样的库来执行您的 HTTP 请求(或任何其他 I/O 操作...数据库、进程间通信等),Ruby 解释器 (MRI)足够聪明,能够释放解释器上的锁,并允许其他线程在一个线程等待 IO 返回时执行。如果下一个线程有自己的 IO 要抓取,Ruby 解释器也会做同样的事情(假设 IO 库是为利用 Ruby 的这个特性而构建的,我相信现在大多数都是这样)。

所以,总结一下我的意思,使用线程!您应该会看到性能优势。如果没有,请检查您的 http 库是否使用 C 中的 rb_thread_blocking_region() 函数,如果没有,请找出原因。也许有充分的理由,也许您需要考虑使用更好的库。

Aaron Patterson 视频的链接在这里:http://www.youtube.com/watch?v=kufXhNkm5WU

值得一看,即使只是为了搞笑,因为 Aaron Patterson 是互联网上最有趣的人之一。

【讨论】:

    【解决方案4】:

    您可以为此使用单独的进程而不是线程:

    #!/usr/bin/env ruby
    
    $stderr.sync = true
    
    # Number of children to use for uploading
    MAX_CHILDREN = 5
    
    # Hash of PIDs for children that are working along with which file
    # they're working on.
    @child_pids = {}
    
    # Keep track of uploads that failed
    @failed_files = []
    
    # Get the list of files to upload as arguments to the program
    @files = ARGV
    
    
    ### Wait for a child to finish, adding the file to the list of those
    ### that failed if the child indicates there was a problem.
    def wait_for_child
        $stderr.puts "    waiting for a child to finish..."
        pid, status = Process.waitpid2( 0 )
        file = @child_pids.delete( pid )
        @failed_files << file unless status.success?
    end
    
    
    ### Here's where you'd put the particulars of what gets uploaded and
    ### how. I'm just sleeping for the file size in bytes * milliseconds
    ### to simulate the upload, then returning either +true+ or +false+
    ### based on a random factor.
    def upload( file )
        bytes = File.size( file )
        sleep( bytes * 0.00001 )
        return rand( 100 ) > 5
    end
    
    
    ### Start a child uploading the specified +file+.
    def start_child( file )
        if pid = Process.fork
            $stderr.puts "%s: uploaded started by child %d" % [ file, pid ]
            @child_pids[ pid ] = file
        else
            if upload( file )
                $stderr.puts "%s: done." % [ file ]
                exit 0 # success
            else
                $stderr.puts "%s: failed." % [ file ]
                exit 255
            end
        end
    end
    
    
    until @files.empty?
    
        # If there are already the maximum number of children running, wait 
        # for one to finish
        wait_for_child() if @child_pids.length >= MAX_CHILDREN
    
        # Start a new child working on the next file
        start_child( @files.shift )
    
    end
    
    
    # Now we're just waiting on the final few uploads to finish
    wait_for_child() until @child_pids.empty?
    
    if @failed_files.empty?
        exit 0
    else
        $stderr.puts "Some files failed to upload:",
            @failed_files.collect {|file| "  #{file}" }
        exit 255
    end
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-06-25
      • 2014-04-12
      • 2016-07-06
      • 1970-01-01
      相关资源
      最近更新 更多