【问题标题】:How do you create a queue of threads in Delphi Xe2?如何在 Delphi Xe2 中创建线程队列?
【发布时间】:2016-11-07 07:53:17
【问题描述】:

有一种情况,用户可能会触发一些耗时的作业,这些作业需要按照用户执行它们的顺序运行。

我一直在看TThreadedQueue 类来存储线程。这是一个很好的起点吗?还是有更合适的方法来做到这一点?

【问题讨论】:

  • 如果只有一个生产者输入作业,并且必须按顺序调用作业,并且在前者准备好之前无法启动新作业,则队列不需要是线程安全的。作业完成后,只需在主线程中触发一个事件即可开始队列中的下一个作业。
  • 为什么要存储线程。存储任务。让一个线程一个一个地完成新任务。

标签: multithreading delphi queue delphi-xe2


【解决方案1】:

您可以在下面找到 WorkerThreadPool 的示例:

  {~~~~~~~~~~~~~~~~~~~~~~~~~~~~}
  Twin_WorkerThreadPool = class;

  {~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~}
  Twin_WorkerThread = class(TThread)
  private
    FProc: TProc;
    FProcReadySignal: Tevent;
    FProcFinishedSignal: Tevent;
  protected
    procedure Execute; override;
  public
    constructor Create;
    destructor Destroy; override;
    procedure ExecuteAndWaitProc(const AProc: TProc);
    property ProcFinishedSignal: Tevent read FProcFinishedSignal;
  end;

  {~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~}
  Twin_WorkerThreadPool = class(TObject)
  private
    fPool: TObjectList<Twin_WorkerThread>;
    fSignal: Tevent;
  public
    constructor Create(const aThreadCount: integer);
    destructor Destroy; override;
    procedure ExecuteAndWaitProc(const AProc: TProc);
    procedure Enqueue(const Value: Twin_WorkerThread);
    function Dequeue: Twin_WorkerThread;
  end;


{***********************************}
constructor Twin_WorkerThread.Create;
begin
  FProc := nil;
  FProcReadySignal := TEvent.Create(nil, false{ManualReset}, false, '');
  FProcFinishedSignal := TEvent.Create(nil, false{ManualReset}, false, '');
  inherited Create(False); // see http://www.gerixsoft.com/blog/delphi/fixing-symbol-resume-deprecated-warning-delphi-2010
end;

{***********************************}
destructor Twin_WorkerThread.Destroy;
begin
  Terminate;
  FProcReadySignal.setevent;
  WaitFor;
  ALFreeAndNil(FProcReadySignal);
  ALFreeAndNil(FProcFinishedSignal);
  inherited;
end;

{**********************************}
procedure Twin_WorkerThread.Execute;
begin
  while True do begin
    try

      //wait the signal
      FProcReadySignal.WaitFor(INFINITE);

      //if terminated then exit
      if Terminated then Break;

      //execute fProc
      if assigned(FProc) then FProc();

      //signal the proc is finished
      FProcFinishedSignal.SetEvent;

    except
      //hide the exception
    end;
  end;
end;

{*****************************************************************}
procedure Twin_WorkerThread.ExecuteAndWaitProc(const AProc: TProc);
begin
  fProc := AProc;
  FProcFinishedSignal.ResetEvent;
  FProcReadySignal.SetEvent;
  FProcFinishedSignal.WaitFor(INFINITE);
  fProc := nil;
end;

{********************************************************************}
constructor Twin_WorkerThreadPool.Create(const aThreadCount: integer);
var i: integer;
begin
  fPool := TObjectList<Twin_WorkerThread>.create(false{aOwnObjects});
  fSignal := TEvent.Create(nil, true{ManualReset}, false, '');
  for I := 0 to aThreadCount - 1 do
    fPool.Add(Twin_WorkerThread.Create)
end;

{***************************************}
destructor Twin_WorkerThreadPool.Destroy;
var aWorkerThread: Twin_WorkerThread;
    i: integer;
begin
  for I := 0 to fPool.Count - 1 do begin
    aWorkerThread := fPool[i];
    fPool[i] := nil;
    ALFreeAndNil(aWorkerThread);
  end;
  ALFreeAndNil(fPool);
  ALFreeAndNil(fSignal);
  inherited Destroy;
end;

{*********************************************************************}
procedure Twin_WorkerThreadPool.ExecuteAndWaitProc(const AProc: TProc);
var aThread: Twin_WorkerThread;
begin
  aThread := Dequeue;
  try
    aThread.ExecuteAndWaitProc(aProc);
  finally
    Enqueue(aThread);
  end;
end;

{**********************************************************************}
procedure Twin_WorkerThreadPool.Enqueue(const Value: Twin_WorkerThread);
begin
  Tmonitor.Enter(fPool);
  try
    fPool.Add(Value);
    if fPool.Count = 1 then fSignal.SetEvent;
  finally
    Tmonitor.Exit(fPool);
  end;
end;

{********************************************************}
function Twin_WorkerThreadPool.Dequeue: Twin_WorkerThread;
begin
  Tmonitor.Enter(self); // << only one thread can process the code below
  try

    Tmonitor.Enter(fPool);
    try
      if Fpool.Count > 0 then begin
        result := fPool[Fpool.Count - 1];
        fPool.Delete(Fpool.Count - 1);
        exit;
      end;
      fSignal.ResetEvent;
    finally
      Tmonitor.Exit(fPool);
    end;

    while True do begin // << their is a bug on ios with tevent - http://stackoverflow.com/questions/39884521/why-i-get-an-exception-argument-out-of-range
      fSignal.WaitFor(Infinite);
      Tmonitor.Enter(fPool);
      try
        if fPool.Count > 0 then begin
          result := fPool[Fpool.Count - 1];
          fPool.Delete(Fpool.Count - 1);
          exit;
        end
        else begin
          {$IFDEF DEBUG}
          ALLog('Twin_WorkerThreadPool.Dequeue', 'fSignal.ResetEvent didn''t work as expected!', TalLogType.Error);
          {$ENDIF}
        end;
      finally
        Tmonitor.Exit(fPool);
      end;
    end;

  finally
    Tmonitor.exit(self);
  end;
end;

【讨论】:

  • 感谢您提供的代码。我去编译它,发现编译器错误。 ALFreeAndNil 来自哪里?它有什么作用?
  • 啊抱歉 alfreeandnil 来自 alcinoe(你可以在互联网上找到代码)... alfreeandnil 与 freeandnil 完全一样,只是它使用 dispose 而不是 free 以避免内存泄漏
猜你喜欢
  • 2013-04-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2011-12-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多