【问题标题】:How do I queue perl subroutines to a thread queue instead of data?如何将 perl 子例程排队到线程队列而不是数据?
【发布时间】:2013-04-19 09:24:28
【问题描述】:

背景
在阅读如何对我的 perl 脚本进行多线程处理时,我阅读了(来自http://perldoc.perl.org/threads.html#BUGS-AND-LIMITATIONS

在大多数系统上,频繁且持续地创建和销毁 线程可以导致内存占用的不断增长 Perl 解释器。虽然只需启动线程和 然后 ->join() 或 ->detach() 它们,对于长期存在的应用程序,它是 更好地维护一个线程池,并在工作中重用它们 需要,使用队列来通知线程待处理的工作。

我的脚本将长期存在;它是一个始终运行的 PKI LDAP 目录监控守护进程。如果企业监控解决方案因任何原因停止运行,它将生成警报。我的脚本将检查我是否可以访问另一个 PKI LDAP 目录,并验证两者上的吊销列表。

问题:我在 google 上可以找到的所有内容都显示传递变量(例如标量)到线程队列而不是子例程本身......我想我只是不理解如何正确实现线程队列与如何实现线程(没有队列)相比。

问题 1:如何“维护线程池”以避免 perl 解释器慢慢消耗越来越多的内存?
问题 2 strong>:(不相关,但我发布了这段代码)在主程序结束时是否有安全的睡眠量,这样我一分钟内启动线程的次数不会超过一次? 60 似乎很明显,但如果循环很快,或者可能由于处理时间或其他原因错过一分钟,是否会导致它运行多次?

提前致谢!

#!/usr/bin/perl

use feature ":5.10";
use warnings;
use strict;
use threads;
use Proc::Daemon;
#

### Global Variables
use constant false => 0;
use constant true  => 1;
my $app = $0;
my $continue = true;
$SIG{TERM} = sub { $continue = false };

# Directory Server Agent (DSA) info
my @ListOfDSAs = (
    { name => "Myself (inbound)",
      host => "ldap.myco.ca",
      base => "ou=mydir,o=myco,c=ca",
    },
    { name => "Company 2",
      host => "ldap.comp2.ca",
      base => "ou=their-dir,o=comp2,c=ca",
    }
);    
#

### Subroutines

sub checkConnections
{   # runs every 5 minutes
    my (@DSAs, $logfile) = @_;
    # Code to ldapsearch
    threads->detach();
}

sub validateRevocationLists
{   # runs every hour on minute xx:55
    my (@DSAs, $logfile) = @_;
    # Code to validate CRLs haven't expired, etc
    threads->detach();
}

#

### Main program
Proc::Daemon::Init;

while ($continue)
{
    my ($sec,$min,$hour,$mday,$mon,$year,$wday,$yday,$isdst) = localtime(time);

    # Question 1: Queues??

    if ($min % 5 == 0 || $min == 0)
        { threads->create(&checkConnections, @ListOfDSAs, "/var/connect.log"); }

    if ($min % 55 == 0)
        { threads->create(&validateRevocationLists, @ListOfDSAs, "/var/RLs.log"); }

    sleep 60; # Question 2: Safer/better way to prevent multiple threads being started for same check in one matching minute?
}

# TERM RECEIVED
exit 0;
__END__

【问题讨论】:

  • 愚蠢的问题:为什么要使用线程?为什么不一次检查一个 DSA?
  • 有效问题! a)服务器有很多核心,所以我想“为什么不使用它们?” b)我是个十足的极客,也想为未来的脚本学习正确的线程技术(我讨厌采取简单的方法,呵呵)。如果发现问题,子例程会将 SNMP 陷阱发送到集中监控服务器,因此我的主脚本并不真正关心返回值,所以这感觉像是要走的路。
  • 我可以理解这些原因,但我认为在这种情况下你真的不应该增加线程的复杂性,除非有令人信服的理由使用它们。
  • $min % 55 == 0 表示$min == 55(给定范围为$min)
  • $min % 5 == 0 || $min == 0$min % 5 == 0 相同

标签: multithreading perl queue


【解决方案1】:
use threads;
use Thread::Queue 3.01 qw( );

my $check_conn_q      = Thread::Queue->new();
my $validate_revoke_q = Thread::Queue->new();

my @threads;
push @threads, async {
   while (my $job = $check_conn_q->dequeue()) {
      check_conn(@$job);
   }
};
push @threads, async {
   while (my $job = $validate_revoke_q->dequeue()) {
      validate_revoke(@$job);
   }
};

while ($continue) {
   my ($S,$M,$H,$m,$d,$Y) = localtime; $m+=1; $Y+=1900;

   $check_conn_q->enqueue([ @ListOfDSAs, "/var/connect.log" ])
      if $M % 5 == 0;

   $validate_revoke_q->enqueue([ @ListOfDSAs, "/var/RLs.log" ])
      if $M == 55;

   sleep 30;
}

$check_conn_q->end();
$validate_revoke_q->end();
$_->join for @threads;

我不确定这里是否需要并行化。如果不是,您可以简单地使用

use List::Util qw( min );

sub sleep_until {
   my ($until) = @_;
   my $time = time;
   return if $time >= $until;
   sleep($until - $time);
}

my $next_check_conn = my $next_validate_revoke = time;
while ($continue) {
   sleep_until min $next_check_conn, $next_validate_revoke;
   last if !$continue;

   my $time = time;
   if ($time >= $next_check_conn) {
      check_conn(@ListOfDSAs, "/var/connect.log");
      $next_check_conn = time + 5*60;
   }

   if ($time >= $next_validate_revoke) {
      validate_revoke(@ListOfDSAs, "/var/RLs.log");
      $next_validate_revoke = time + 60*60;
   }
}

【讨论】:

  • 谢谢!我开始理解许多示例......您正在将参数/参数发送到队列!与您的第一个代码块相关的快速问题:您在哪里推送 @threads,异步 {} 我猜 check_conn 和 validate_revoke 正在调用我的潜艇? (在我的代码中称为 checkConnections 和 validateRevocationLists?)
  • async 返回线程(如thread->create)。这允许我们稍后join他们。使用可连接线程而不是使用分离线程可以使您在线程正在执行某些操作时不会退出。 (不过,->end 将确保线程不会开始新工作。)
  • check_conncheckConnections。我重命名了您的潜艇(更常规地命名并节省打字)。我坏了。
  • 一切顺利!只是确保我(或未来的人们搜索)理解:)
【解决方案2】:

我建议一次只运行一项检查,因为在这里使用线程似乎没有令人信服的理由,而且您不想为将一直运行的程序增加不必要的复杂性.

如果你想学习如何使用线程池,有examples included with the threads module。还有一个Thread::Pool module 可能有用。

至于确保您不在同一分钟内重复检查,您是正确的,sleeping 60 秒是不够的。无论您选择睡眠的值是什么,您都会遇到它失败的边缘情况:或者它会略短于一分钟,并且您偶尔会在同一分钟内进行两次检查,或者它会略长于一分钟,而且您有时会完全错过支票。

相反,使用变量来记住上次完成任务的时间。然后,您可以使用更短的睡眠时间,而不必担心每分钟进行多次检查。

my $last_task_time = -1;
while ($continue)
{
    my $min = (localtime(time))[1];

    if ($last_task_time != $min && 
          ($min % 5 == 0 || $min > ($last_task_time+5)%60))
    { 
        #Check connections here.

        if ($min == 55 || ($last_task_time < 55 && $min > 55))
        { 
           #Validate revocation lists here.
        }

        $last_task_time = $min;
    }
    else
    {
        sleep 55; #Ensures there is at least one check per minute.
    }
}

更新:我修复了代码,以便在最后一个任务运行时间过长时恢复。如果偶尔需要很长时间,这会很好。但是,如果任务经常花费超过 5 分钟,您需要一个不同的解决方案(在这种情况下线程可能会有意义)。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2013-02-16
    • 2016-05-11
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-11-05
    • 1970-01-01
    • 2013-08-16
    相关资源
    最近更新 更多