【问题标题】:Perl - parallel programming - running two external programsPerl - 并行编程 - 运行两个外部程序
【发布时间】:2016-07-07 19:18:02
【问题描述】:

我有一个 perl 脚本,它运行两个外部程序,一个依赖于另一个,用于一系列数据集。目前,我只是一次对每个数据集执行此操作,通过第一个程序运行它,使用 qx 收集结果,然后使用这些结果运行第二个程序。数据与第二个程序的结果一起添加到输出文件中,每个数据集一个文件。我创建了一个简单的可重现示例,希望能捕捉到我当前的方法:

#!/usr/bin/perl
#
# stackoverflow_q_7-7-2016.pl

use warnings;
use strict;

my @queries_list = (2, 4, 3, 1);

foreach my $query (@queries_list) {
    #Command meant to simulate the first, shorter process, and return a list of results for the next process
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-8 -n 3";
    print "Running program_1 on query $query...\n";
    my @results = qx($cmd_1);

    foreach (@results) {
        chomp $_;
        #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query
        my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 5 . " >> $query.output";
        print "\tRunning program_2 on query $query with input param $_...\n";
        system($cmd_2);         }
}

由于第一个程序通常比第二个程序完成得更快,我认为可以通过在 program_2 也在前一个查询上运行的同时继续通过 program_1 运行新查询来加快整个交易的速度。加快速度会很棒,因为目前需要花费数小时才能完成。但是,我不知道该怎么做。像 Parallel::ForkManager 这样的东西有解决方案吗?或者在 Perl 中使用线程?

现在在我的实际代码中,我进行了一些错误处理并为 program_2 设置了超时 - 我使用 fork、exec 和 $SIG{ALRM} 来执行此操作,但我真的不知道我在用这些做什么.重要的是我仍然有能力执行此操作,否则 program_2 可能会卡住或无法充分报告失败的原因。这是带有错误处理的代码的样子。我不认为它在可重现的示例中以应有的方式工作,但至少你会希望看到我正在尝试做的事情。以下是错误处理:

#!/usr/bin/perl
#
# stackoverflow_q_7-7-2016.pl

use warnings;
use strict;

my @queries_list = (2, 4, 3, 1);

foreach my $query (@queries_list) {
    #Command meant to simulate the first, shorter process, and return a list of results for the next process
    my $cmd_1 = "sleep " . $query . "s; shuf -i 4-15 -n 3";
    print "Running program_1 on query $query...\n";
    my @results = qx($cmd_1);

    foreach (@results) {
        chomp $_;
        #Command meant to simulate a longer process whose input depends on program_1; the output I write to a separate file for each query
        my $cmd_2 = "sleep " . $_ . "s; fortune -s | head -c " . $_ * 3 . " >> $query.output";
        print "\tRunning program_2 on query $query with input param $_...\n";

        my $childPid;
        eval {
            local $SIG{ALRM} = sub { die "Timed out" };
            alarm 10;
            if ($childPid = fork()) {
                wait();
            } else {
                exec($cmd_2);
            }
            alarm 0;
        };
        if ($? != 0) {
            my $exitCode = $? >> 8;
            print "Program_2 exited with error code $exitCode. Retry...\n";
        }
        if ($@ =~ /Timed out/) {
            print "\tProgram_2 timed out. Skipping...\n";
            kill 2, $childPid;
            wait;
        };
    }
}

感谢所有帮助。

【问题讨论】:

    标签: perl


    【解决方案1】:

    一种解决方案:

    use threads;
    
    use Thread::Queue;  # 3.01+
    
    sub job1 { ... }
    sub job2 { ... }
    
    {
       my $job1_request_queue = Thread::Queue->new();
       my $job2_request_queue = Thread::Queue->new();
    
       my $job1_thread = async {
          while (my $job = $job1_request_queue->dequeue()) {
             my $result = job1($job);
             $job2_request_queue->enqueue($result);
          }
    
          $job2_request_queue->end();
       };
    
      my $job2_thread = async {
          while (my $job = $job2_request_queue->dequeue()) {
             job2($job);
          }
       };
    
       $job1_request_queue->enqueue($_) for ...;
    
       $job1_request_queue->end();    
       $_->join() for $job1_thread, $job2_thread;
    }
    

    你甚至可以有多个工人,其中一种/两种类型。

    use threads;
    
    use Thread::Queue;  # 3.01+
    
    use constant NUM_JOB1_WORKERS => 1;
    use constant NUM_JOB2_WORKERS => 3;
    
    sub job1 { ... }
    sub job2 { ... }
    
    {
       my $job1_request_queue = Thread::Queue->new();
       my $job2_request_queue = Thread::Queue->new();
    
       my @job1_threads;
       for (1..NUM_JOB1_WORKERS) {
          push @job1_threads, async {
             while (my $job = $job1_request_queue->dequeue()) {
                my $result = job1($job);
                $job2_request_queue->enqueue($result);
             }
          };
       }
    
       my @job2_threads;
       for (1..NUM_JOB2_WORKERS) {
          push @job2_threads, async {
             while (my $job = $job2_request_queue->dequeue()) {
                job2($job);
             }
          };
       }
    
       $job1_request_queue->enqueue($_) for ...;
    
       $job1_request_queue->end();    
       $_->join() for @job1_threads;
       $job2_request_queue->end();
       $_->join() for @job2_threads;
    }
    

    使用IPC::Run 而不是qx 添加超时。不需要信号。

    【讨论】:

    • 您好池上,感谢您的帮助。你能解释线程的结束和连接吗?当我尝试使用多工人方法时,我得到“Perl exited with active threads”错误,大多数正在运行和未加入,有些已完成和未加入。我可以在下面发布我的最新代码作为答案。
    • 这告诉工人之前没有更多工作要做,然后等待他们完成。否则程序会提前结束。
    • 修复了我的代码中的一个错误。 (@job1_threads@job2_threads 未填充)
    猜你喜欢
    • 1970-01-01
    • 2015-10-11
    • 1970-01-01
    • 2012-06-22
    • 1970-01-01
    • 1970-01-01
    • 2014-03-15
    • 2016-05-16
    • 1970-01-01
    相关资源
    最近更新 更多