【问题标题】:Parallel::Forkmanager -> populate a hash of hashes of arrays from child processesParallel::Forkmanager -> 填充来自子进程的数组哈希值
【发布时间】:2016-10-31 15:05:52
【问题描述】:

我有这段代码想要并行化(供参考):

my (%fastas, %counts);

foreach my $sample ( sort keys %AC2 )
{
    foreach my $chrom (sort keys %{ $AC2{$sample} } )
    {
        foreach my $pos ( sort { $a <=> $b } (@{ $allAC2{$chrom} }) )
        {
            my $allele;

            #position was genotyped in sample
            # or is AC=1, but was also found in AC=2
            if( grep(/\b$pos\b/, @{ $AC2{$sample}{$chrom} }) || grep(/\b$pos\b/, @{ $finalAC1{$sample}{$chrom} }) ) #"\b" is for word boundary -> exact word match
            {
                $allele = @{ $vcfs{$sample}{$chrom}{$pos} }[2]; #ALT allele
            }
            #Make sure all SNP positions are in all samples
            #Fill with reference genome allele information
            else
            {
                #Fill with reference genome allele information
                $allele = substr( @{ $ref{$chrom} }[0], $pos-1, 1); #or die "$sample, $chrom, $pos";
            }
            push ( @{ $fastas{$sample}{$chrom}{$pos} }, $allele);
            push ( @{ $counts{$chrom}{$pos} }, $allele) unless (grep {$_ eq $allele} @{ $counts{$chrom}{$pos} } );
        }
    }
}

基本上,子进程需要填充两个哈希值。我已经搜索并只找到了几个示例,展示了如何使用“run_on_finish”从子进程返回变量。 “问题”是我发现的所有示例/教程总是返回标量。

是否可以从子进程中传递一个(或 2 个)哈希值?

谢谢, 马可

【问题讨论】:

  • 引用是标量。
  • 参见:search.cpan.org/~yanick/Parallel-ForkManager-1.19/lib/Parallel/… PFM 使用 Storable 将结构传递回父进程。第二个例子展示了孩子发回任意数据。
  • 鉴于传回大量数据,我倾向于为此使用(工作)线程类型模型。 Thread::Queue 和并行性。
  • @Sobrique,我正在使用的 VCF 文件(我的脚本输入)并没有那么大(细菌基因组)。但我会尽量记住这一点。
  • @Ben Grimm,该链接很有帮助。那里有一个哈希的引用。摆弄了几天的 FM 并回顾了这些示例,现在它们都变得更有意义了。

标签: perl parallel-processing parent-child hash-of-hashes


【解决方案1】:

不返回哈希,返回哈希引用。

Perl 中的引用是标量值。所以您需要做的就是返回对%fastas%counts 的引用。

这是一个取自 Parallel::Forkmanager 文档的 hacky 示例。它在每个子进程中构建一个散列,其中包含输入数据建议的尽可能多的元素。它将对该哈希的引用返回给父级,回调将其拾取并将其插入到$overall 数据结构中。

use strict;
use warnings;
use Data::Printer;
use Parallel::ForkManager;

my $pm = Parallel::ForkManager->new(2);

my $overall; # will hold all results in the parent
$pm->run_on_finish( sub {
    my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;

    $overall->{$pid} = $data_structure_reference;
});

DATA_LOOP:
foreach my $data (1 .. 10) {
  # Forks and returns the pid for the child:
  my $pid = $pm->start and next DATA_LOOP;

  my %child_result = map { $_ => 1 } 1 .. $data;

  $pm->finish( 0, \%child_result );
}

$pm->wait_all_children;
p $overall;

输出如下:

\ {
    1224   {
        1   1
    },
    1225   {
        1   1,
        2   1
    },
    1226   {
        1   1,
        2   1,
        3   1
    },
    1228   {
        1   1,
        2   1,
        3   1,
        4   1
    },
    1230   {
        1   1,
        2   1,
        3   1,
        4   1,
        5   1
    },
    1231   {
        1   1,
        2   1,
        3   1,
        4   1,
        5   1,
        6   1
    },
    1232   {
        1   1,
        2   1,
        3   1,
        4   1,
        5   1,
        6   1,
        7   1
    },
    1233   {
        1   1,
        2   1,
        3   1,
        4   1,
        5   1,
        6   1,
        7   1,
        8   1
    },
    1234   {
        1   1,
        2   1,
        3   1,
        4   1,
        5   1,
        6   1,
        7   1,
        8   1,
        9   1
    },
    1235   {
        1    1,
        2    1,
        3    1,
        4    1,
        5    1,
        6    1,
        7    1,
        8    1,
        9    1,
        10   1
    }
}

如果要返回两个数据结构,请将它们包装在数组引用中。

$pm->finish( 0, [ \%fastas, \%counts ] ); 

【讨论】:

    【解决方案2】:

    我只是想我要发布我的解决方案:

    my (%fastas, %counts);
    
    #setting up the forking process
    my $nCPU = Sys::CPU::cpu_count();
    my $pm = Parallel::ForkManager -> new($nCPU);
    
    $pm->run_on_finish(sub {
        my ($pid, $exit_code, $ident, $exit_signal, $core_dump, $data_structure_reference) = @_;
    
        %fastas = (%fastas, %{ $data_structure_reference->{fas} });
    });
    
    my @mySamples = sort keys %AC2;
    my $s = $mySamples[0];
    
    foreach my $sample (@mySamples)
    {
        my $pid = $pm->start and next;
    
        my %allSeqs;
    
        foreach my $chrom (sort keys %{ $AC2{$sample} } )
        {
            foreach my $pos ( sort { $a <=> $b } (@{ $allAC2{$chrom} }) )
            {
                my $allele;
    
                #position was genotyped in sample
                # or is AC=1, but was also found in AC=2
                if( grep(/\b$pos\b/, @{ $AC2{$sample}{$chrom} }) || grep(/\b$pos\b/, @{ $finalAC1{$sample}{$chrom} }) ) #"\b is for word boundary -> exact word match"
                {
                    $allele = @{ $vcfs{$sample}{$chrom}{$pos} }[2]; #ALT allele
                }
                #Make sure all SNP positions are in all samples
                #Fill with reference genome allele information
                else
                {
                    #Fill with reference genome allele information
                    $allele = substr( @{ $ref{$chrom} }[0], $pos-1, 1); #or die "$sample, $chrom, $pos";
                }
    
                push ( @{ $allSeqs{$sample}{$chrom}{$pos} }, $allele);
            }
        }
        $pm -> finish(0, { fas => \%allSeqs });
    }
    
    $pm -> wait_all_children();
    
    
    #List ALT alleles found at each position
    foreach my $sample ( sort keys %fastas )
    {
        foreach my $chrom ( sort keys %{ $fastas{$sample} } )
        {
            foreach my $pos ( sort keys %{ $fastas{$sample}{$chrom} } )
            {
                my $allele = @{ $fastas{$sample}{$chrom}{$pos} }[0];
                push ( @{ $counts{$chrom}{$pos} }, $allele) unless (grep {$_ eq $allele} @{ $counts{$chrom}{$pos} } );
            }
        }
    }
    

    我必须从主循环中删除 %counts 并单独计算它,因为它必须在子进程中引用它自己的值(来自父进程)(我希望这个解释有意义!)。

    感谢大家的帮助,非常感谢! 马可

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2014-11-25
      • 2017-03-14
      • 2014-05-11
      • 2017-06-12
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-01-31
      相关资源
      最近更新 更多