unit ThreadPool;

interface

uses Classes,Windows,SysUtils,common,BaseThread;

type

 
// TAddEvent = procedure(msg:string);


   TBaseTask 
= class   //基础任务
   protected
     Fid:byte;
     FSendType : byte; 
//发送信息类型:0:原ASCII 1 D131或者D101之类的需要有字符替换
     FNumber :string;
     FBuffer:Tbuffer;
     FMsg :string;
     FLog:Tlog;
     FProcessTaskThread:TThread;
     FSendTaskThread:TBaseThread;
     
function GetData:string;
     Function GetCount : integer;
   public
      property id:Byte read Fid;
      property Number:string read FNumber;
      property SendType :byte read FSendtype write FSendType;
      property ProcessTaskThread : TThread read FProcessTaskThread write FProcessTaskThread;
      property SendTaskThread : TBaseThread read FSendTaskThread write FSendTaskThread;
      property Count : integer read GetCount;
      
procedure Run;virtual;
      
procedure Add(Msg:string);virtual;
      Constructor Create(Sid:Byte;Number:string;Log_switch:integer);virtual;    
//对应POOL里面数据头的ID
      destructor Destroy; override;
   
end;

   TBaseTaskThread 
= class(TThread)      //任务线程
   protected
     FName :string;
     Flog: Tlog;
     FList: TList;
     
function GetCount:integer;
     
function GetTask:TBaseTask;
     
procedure Execute;   override;
     
procedure Terminatedevent(sender:Tobject);
   public
     property Count:integer  read Getcount;
     property Name :string read FName;
     
procedure Add(Item: Pointer);
     Constructor Create(Name:string;suspendflag:boolean;logSwitch:integer);
   
end;

   TBasePoolThread 
= class(TThread)               //分派线程
   protected
        FName :string;
        FLog :Tlog;
        FThreadCount : integer;
        FBuffer: TBuffer;
        FThreadList : Tlist;
        FTaskList : Tlist;
        FOnThreadProcessEvent :TOnThreadProcessEvent;
        FOnThreadBeforeEvent :TOnThreadBeforeEvent;
        
function SearchTask(Msg:string):TBaseTask;
        
function SearchThread:TBaseTaskThread;
        
procedure Execute;   override;
        
procedure Terminatedevent(sender:Tobject);
   public
        property ChildThreadCount :integer read FThreadCount;
        
procedure Add(Msg:string);     //传入 ID+MSG ,id是byte类型,一位0-255
        
procedure AddTask(Task:TBaseTask);
        
procedure CreateChildThread;
        property Name :string read FName;
        property OnThreadProcessEvent:TOnThreadProcessEvent read FOnThreadProcessEvent write FOnThreadProcessEvent;
        property OnThreadBeforeEvent :TOnThreadBeforeEvent read FOnThreadBeforeEvent write FOnThreadBeforeEvent;
        Constructor Create(Name:string;ThreadCount:integer;suspendflag:boolean;logSwitch:integer);
  
end;


implementation

{ TBaseTask }

procedure TBaseTask.Add(Msg: string);
begin
  FBuffer.Add(Msg);
end;

constructor TBaseTask.Create(Sid: Byte; Number: string;Log_switch:integer);
begin
  Fid :
= Sid;
  FSendType :
= 1;
  FNumber :
= number;
  FLog :
= Tlog.Create(Number,Log_switch);
  FBuffer :
= TBuffer.Create;
end;

destructor TBaseTask.Destroy;
begin
  FBuffer.Free;
  Flog.Free;
  inherited;
end;

function TBaseTask.GetCount: integer;
begin
  result :
= Fbuffer.Count;
end;

function TBaseTask.GetData: string;
begin
  Result :
= Fbuffer.Getdata;
  FBuffer.DeleteData;
end;

procedure TBaseTask.Run;
begin
 
if FBuffer.Count > 0 then begin
     fmsg:
= GetData;
  
{ToDosomething(msg);}
 
end;
end;

{ TBaseTaskThread }

procedure TBaseTaskThread.Add(Item: Pointer);
begin
  Flist.Add(item);
  TBaseTask(item).ProcessTaskThread :
= self;
  self.Resume;
end;

constructor TBaseTaskThread.Create(Name: string; suspendflag: boolean;
  logSwitch: integer);
begin
  Fname :
= name;
  Flist :
= TList.Create;
  Flog :
= Tlog.Create(name,logswitch);
  onterminate :
= terminatedevent;
  FreeOnTerminate:
=true;
  Flog.Log(name
+'线程对象创建',log_all);
  inherited create(suspendflag);        
//建立后先挂起
end;

procedure TBaseTaskThread.Execute;
//var temp :string;
begin
   
while True do begin
      
if Terminated then break;
      
if Flist.Count > 0 then begin
        try
          GetTask.Run;
        except
          on e: Exception 
do
            FLog.Log(
'异常:'+e.Message,log_fail);
        
end;
      
end else
        suspend;
   
end;
end;

function TBaseTaskThread.GetCount: integer;
begin
  result:
=  Flist.Count;
end;

function TBaseTaskThread.GetTask: TBaseTask;
begin
  result :
= Flist.items[0];
  Flist.Delete(
0);
end;

procedure TBaseTaskThread.Terminatedevent(sender: Tobject);
begin
  Flist.Free;
  Flog.log(FName
+'线程释放',log_all);
  Flog.Free;
end;

{ TBasePoolThread }

procedure TBasePoolThread.Add(Msg: string);
begin
  Fbuffer.Add(msg);
  resume;
end;


procedure TBasePoolThread.AddTask(Task: TBaseTask);
begin
  FTaskList.Add(task);
end;

constructor TBasePoolThread.Create(Name: string; ThreadCount: integer;
  suspendflag: boolean; logSwitch: integer);
begin
  Fname :
= name;
  FBuffer :
= TBuffer.Create;
  FThreadCount :
= ThreadCount;
  FThreadList :
= Tlist.Create;
  Flog :
= Tlog.Create(name,logswitch);
  CreateChildThread;
  onterminate :
= terminatedevent;
  FreeOnTerminate:
=true;
  Flog.Log(name
+'线程对象创建',log_all);
  inherited create(suspendflag);        
//建立后先挂起
end;

procedure TBasePoolThread.CreateChildThread;
var tempThread : TBaseTaskThread;
     i:integer;
begin
    
for i:=0 to FThreadCount -1 do begin
      tempThread :
= TBaseTaskThread.Create(Fname+inttostr(i), true,log_all);
      FThreadList.Add(tempThread);
    
end;
end;

procedure TBasePoolThread.Execute;
var temp :string;
   
// temptask : TBaseTask;
   
// tempThread : TBasePoolThread;
begin
   
while True do begin
      
if Terminated then break;
      
if Fbuffer.Count > 0 then begin
         temp :
= Fbuffer.Getdata; Fbuffer.deletedata;
         
if Assigned(FOnThreadBeforeEvent) then temp:=FOnThreadBeforeEvent(Flog,temp);
            SearchTask(temp);
           flog.Log(
'处理数据:'+temp,log_all);
         
if Assigned(FOnThreadProcessEvent) then FOnThreadProcessEvent(Flog,temp);
      
end else
        suspend;
   
end;
end;

function TBasePoolThread.SearchTask(Msg: string): TBaseTask;
var fid  : byte;
    i:integer;
begin
  fid :
= byte(msg[1]);
  result :
= nil;
  
for i:=0 to FTasklist.Count-1 do begin
    
if TBaseTask(FTasklist.Items[i]).Fid = fid then begin
          
if (TBaseTask(FTasklist.Items[i]).Count > 0and
               assigned( TBaseTask(FTasklist.Items[i]).ProcessTaskThread) 
then begin
                  TBaseTask(FTasklist.Items[i]).Add(copy(msg,
2,length(msg)-1));
                 TBaseTaskThread(TBaseTask(FTasklist.Items[i]).ProcessTaskThread).Add(FTasklist.Items[i])   
//只有原任务有线程,且有未完成的任务
               
end  else begin
                 TBaseTask(FTasklist.Items[i]).Add(copy(msg,
2,length(msg)-1));
                 SearchThread.Add(FTasklist.Items[i]);
               
end;
       break;
    
end;
  
end;
end;

function TBasePoolThread.SearchThread: TBaseTaskThread;
var i,pole:integer;
begin
  result :
= nil;
  pole :
= 9999;
  
for i:=0 to FThreadlist.Count -1 do begin
    
if TBaseTaskThread(FThreadList.Items[i]).Count = 0 then begin
      result :
= FThreadList.Items[i];
      break;
    
end;
    
if pole < TBaseTaskThread(FThreadList.Items[i]).Count then
       result :
= FThreadList.Items[i];
  
end;

end;

procedure TBasePoolThread.Terminatedevent(sender: Tobject);
var i:integer;
begin
    
for i:= 0 to FThreadList.Count-1 do begin
       TBaseTaskThread(FThreadList.items[i]).Terminate;
       TBaseTaskThread(FThreadList.items[i]).Resume;
    
end;
    FThreadList.Free;
    fbuffer.Free;
    Flog.log(FName
+'线程释放',log_all);
    Flog.Free;
end;

end.

相关文章: