【问题标题】:Multithreaded file upload synchronization多线程文件上传同步
【发布时间】:2012-11-19 15:32:24
【问题描述】:

目前我正在开发一个 Delphi XE3 客户端/服务器应用程序来传输文件(使用 Indy FTP 组件)。客户端部分监视一个文件夹,获取其中的文件列表,将它们上传到服务器并删除原始文件。上传是由一个单独的线程完成的,该线程一个一个地处理文件。文件的范围从 0 到几千不等,它们的大小也有很大差异。

这是一个为 OSX 和 Windows 编译的 Firemonkey 应用程序,所以我不得不使用 TThread 而不是我更喜欢的 OmniThreadLibrary。我的客户报告说应用程序随机冻结。我无法复制它,但由于我对 TThread 没有太多经验,我可能在某处放置了死锁条件。我阅读了很多示例,但我仍然不确定某些多线程细节。

应用结构简单:
主线程中的计时器检查文件夹并将有关每个文件的信息放入记录中,该记录进入通用 TList。此列表保存有关文件名称、大小、进度、文件是完全上传还是必须重试的信息。所有这些都显示在带有进度条等的网格中。这个列表只能由主线程访问。 之后,通过调用 AddFile 方法(下面的代码)将列表中的项目发送到线程。该线程将所有文件存储在一个线程安全队列中,例如http://delphihaven.wordpress.com/2011/05/06/using-tmonitor-2/
上传文件后,上传线程会通知主线程调用 Synchronize。
主线程定期调用 Uploader.GetProgress 方法来检查当前文件进度并显示出来。这个函数实际上不是线程安全的,但它会导致死锁,还是只返回错误的数据?

什么是进行进度检查的安全有效的方法?

那么这种方法可以吗还是我错过了什么?你会怎么做呢?
例如,我想创建一个新线程只是为了读取文件夹内容。这意味着我使用的 TList 必须是线程安全的,但必须始终访问它以刷新 GUI 网格中显示的信息。不是所有的同步都会减慢 GUI 的速度吗?

我已经发布了下面的简化代码,以防有人想查看它。如果没有,我很乐意听到一些关于我应该使用什么的意见。主要目标是在 OSX 和 Windows 上工作;能够显示有关所有文件的信息和当前文件的进度;无论文件的数量和大小如何,都能做出响应。

这是上传线程的代码。为了便于阅读,我删除了一些内容:

type
  TFileStatus = (fsToBeQueued, fsUploaded, fsQueued);
  TFileInfo = record
    ID: Integer;
    Path: String;
    Size: Int64;
    UploadedSize: Int64;
    Status: TFileStatus;
  end;

  TUploader = class(TThread)
  private
    FTP: TIdFTP;
    fQueue: TThreadedQueue<TFileInfo>;
    fCurrentFile: TFileInfo;
    FUploading: Boolean;
    procedure ConnectFTP;
    function UploadFile(aFileInfo: TFileInfo): String;
    procedure OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
    procedure SignalComplete;
    procedure SignalError(aError: String);
  protected
    procedure Execute; override;
  public
    property Uploading: Boolean read FUploading;
    constructor Create;
    destructor Destroy; override;
    procedure Terminate;
    procedure AddFile(const aFileInfo: TFileInfo);
    function GetProgress: TFileInfo;
  end;

procedure TUploader.AddFile(const aFileInfo: TFileInfo);
begin
  fQueue.Enqueue(aFileInfo);
end;

procedure TUploader.ConnectFTP;
begin
  ...
    FTP.Connect;
end;

constructor TUploader.Create;
begin
  inherited Create(false);
  FreeOnTerminate := false;
  fQueue := TThreadedQueue<TFileInfo>.Create;
  // Create the TIdFTP and set ports and other params
  ...
end;

destructor TUploader.Destroy;
begin
  fQueue.Close;
  fQueue.Free;
  FTP.Free;
  inherited;
end;

// Process the whole queue and inform the main thread of the progress
procedure TUploader.Execute;
var
  Temp: TFileInfo;
begin
  try
    ConnectFTP;
  except
    on E: Exception do
      SignalError(E.Message);
  end;

  // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
  while fQueue.Peek(fCurrentFile) = wrSignaled do
    try
      if UploadFile(fCurrentFile) = '' then
      begin
        fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
        SignalComplete;
      end;
    except
      on E: Exception do
        SignalError(E.Message);
    end;
end;

// Return the current file's info to the main thread. Used to update the progress indicators
function TUploader.GetProgress: TFileInfo;
begin
  Result := fCurrentFile;
end;

// Update the uploaded size for the current file. This information is retrieved by a timer from the main thread to update the progress bar
procedure TUploader.OnFTPWork(ASender: TObject; AWorkMode: TWorkMode; AWorkCount: Int64);
begin
  fCurrentFile.UploadedSize := AWorkCount;
end;

procedure TUploader.SignalComplete;
begin
  Synchronize(
    procedure
    begin
      frmClientMain.OnCompleteFile(fCurrentFile);
    end);
end;

procedure TUploader.SignalError(aError: String);
begin
  try
    FTP.Disconnect;
  except
  end;
  if fQueue.Closed then
    Exit;

  Synchronize(
    procedure
    begin
      frmClientMain.OnUploadError(aError);
    end);
end;

// Clear the queue and terminate the thread
procedure TUploader.Terminate;
begin
  fQueue.Close;
  inherited;
end;

function TUploader.UploadFile(aFileInfo: TFileInfo): String;
begin
  Result := 'Error';
  try
    if not FTP.Connected then
      ConnectFTP;
    FUploading := true;
    FTP.Put(aFileInfo.Path, ExtractFileName(aFileInfo.Path));     
    Result := '';
  finally
    FUploading := false;
  end;
end;

以及与上传者交互的部分主线程:

......
// Main form
    fUniqueID: Integer;  // This is a unique number given to each file, because there might be several with the same names(after one is uploaded and deleted)
    fUploader: TUploader;         // The uploader thread
    fFiles: TList<TFileInfo>;
    fCurrentFileName: String;     // Used to display the progress
    function IndexOfFile(aID: Integer): Integer;    //Return the index of the record inside the fFiles given the file ID
  public
    procedure OnCompleteFile(aFileInfo: TFileInfo);
    procedure OnUploadError(aError: String);
  end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnUploadError(aError: String);
begin
  // show and log the error
end;

// This is called by the uploader with Synchronize
procedure TfrmClientMain.OnCompleteFile(aFileInfo: TFileInfo);
var
  I: Integer;
begin
  I := IndexOfFile(aFileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    aFileInfo.Status := fsUploaded;
    aFileInfo.UploadedSize := aFileInfo.Size;
    FFiles.Items[I] := aFileInfo;
    Inc(FFilesUploaded);
    TFile.Delete(aFileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

procedure TfrmClientMain.ProcessFolder;
var
  NewFiles: TStringDynArray;
  I, J: Integer;
  FileInfo: TFileInfo;
begin
    // Remove completed files from the list if it contains more than XX files
    while FFiles.Count > 1000 do
      if FFiles[0].Status = fsUploaded then
      begin
        Dec(FFilesUploaded);
        FFiles.Delete(0);
      end else
        Break;

    NewFiles := TDirectory.GetFiles(WatchFolder, '*.*',TSearchOption.soAllDirectories);
    for I := 0 to Length(NewFiles) - 1 do
    begin
          FileInfo.ID := FUniqueID;
          Inc(FUniqueID);
          FileInfo.Path := NewFiles[I];
          FileInfo.Size := GetFileSizeByName(NewFiles[I]);
          FileInfo.UploadedSize := 0;
          FileInfo.Status := fsToBeQueued;
          FFiles.Add(FileInfo);

      if (I mod 100) = 0 then
      begin
        UpdateStatusLabel;
        grFiles.RowCount := FFiles.Count;
        Application.ProcessMessages;
        if fUploader = nil then
          break;
      end;
    end;

    // Send the new files and resend failed to the uploader thread
    for I := 0 to FFiles.Count - 1 do
      if (FFiles[I].Status = fsToBeQueued) then
      begin
        if fUploader = nil then
          Break;
        FileInfo := FFiles[I];
        FileInfo.Status := fsQueued;
        FFiles[I] := FileInfo;
        SaveDebug(1, 'Add:    ' + ExtractFileName(FFiles[I].Path));
        FUploader.AddFile(FFiles[I]);
      end;
end;

procedure TfrmClientMain.tmrGUITimer(Sender: TObject);
var
  FileInfo: TFileInfo;
  I: Integer;
begin
  if (fUploader = nil) or not fUploader.Uploading then
    Exit;
  FileInfo := fUploader.GetProgress;
  I := IndexOfFile(FileInfo.ID);
  if (I >= 0) and (I < fFiles.Count) then
  begin
    fFiles.Items[I] := FileInfo;
    fCurrentFileName := ExtractFileName(FileInfo.Path);
    colProgressImg.UpdateCell(I);
  end;
end;

function TfrmClientMain.IndexOfFile(aID: Integer): Integer;
var
  I: Integer;
begin
  Result := -1;
  for I := 0 to FFiles.Count - 1 do
    if FFiles[I].ID = aID then
      Exit(I);
end;

【问题讨论】:

  • 我不确定也没有测试过.. 但是您是否尝试添加 TIdAntiFreeze 并检查行为是否相同? (FMX.IdAntiFreeze)
  • TIdAntiFreeze 旨在防止当您从主线程使用 Indy 组件时(例如放在表单上)冻结 GUI。我在一个单独的线程中使用它,所以我看不出它会有什么帮助。至少据我所知...
  • 乍一看,您的错误处理在我看来是错误的。例如,在 Execute 方法中,如果 ConnectFTP 调用失败,你异常(在通知错误之后),你仍然发出对 UploadFile 的调用。恕我直言,您必须 clean 并让线程死于 FatalException 或正确处理 Execute 方法中的异常,例如,重试连接多次,可能取决于错误的类型.另一方面,如果您在主线程中有一个列表,我看不出为什么您需要在各个线程中使用一个队列。
  • 为什么上传文件"function"返回一个字符串?并且总是一个空字符串?嗯嗯..
  • madExcept 可以检测到冻结的主线程,然后写入所有正在运行的线程的堆栈跟踪

标签: multithreading macos delphi upload firemonkey


【解决方案1】:

这可能不是问题,但 TFileInfo 是一条记录。

这意味着当作为(非 const/var)参数传递时,它会被复制。这可能会导致记录中的字符串等问题在复制记录时不会更新引用计数。

尝试将其设为一个类并将实例作为参数传递(即指向堆上数据的指针)。

其他需要注意的是在线程 32 位系统上共享 Int64(例如您的大小值)。

更新/读取这些不是原子完成的,您没有任何特定的保护措施,因此读取值可能会由于线程而导致高低 32 位不匹配。 (例如,读取高 32 位、写入高 32 位、写入低 32 位、读取低 32 位,在不同线程中读取和写入)。这可能不会导致您看到的问题,除非您正在处理大于 4GB 的文件传输,否则不太可能给您带来任何问题。

【讨论】:

    【解决方案2】:

    死锁肯定很难发现,但这可能是问题所在。 在您的代码中,我没有看到您在 enqueue、peek 或 dequeue 中添加了任何超时 - 这意味着它将采用 Infinite 的默认值。

    enqueue 中有这一行 - 这意味着,与任何同步对象一样,它将阻塞直到 Enter 完成(它锁定监视器)或超时发生(因为您没有超时,它将永远等待)

    TSimpleThreadedQueue.Enqueue(const Item: T; Timeout: LongWord): TWaitResult;
    ...    
    if not TMonitor.Enter(FQueue, Timeout)
    

    我还将假设您自己基于 Dequeue 实现了 PEEK - 只是您实际上并未删除该项目。

    这似乎实现了自己的超时 - 但是,您仍然有以下内容:

    function TSimpleThreadedQueue.Peek/Dequeue(var Item: T; Timeout: LongWord): TWaitResult;
    ...
    if not TMonitor.Enter(FQueue, Timeout)
    

    超时是无限的——所以,如果你在 peek 方法中等待它发出无限超时信号,那么你不能在不阻塞等待 peek 方法的线程的情况下从第二个线程排队无限超时完成。

    这是来自 TMonitor 的评论的 sn-p

    Enter locks the monitor object with an optional timeout (in ms) value. 
    Enter without a timeout will wait until the lock is obtained. 
    If the procedure returns it can be assumed that the lock was acquired. 
    Enter with a timeout will return a boolean status indicating whether or 
    not the lock was obtained (True) or the attempt timed out prior to 
    acquire the lock (False). Calling Enter with an INFINITE timeout 
    is the same as calling Enter without a timeout.
    

    由于实现默认使用 Infinite,并且没有提供 TMonitor.Spinlock 值,这将阻塞线程,直到它可以获取 FQueue 对象。

    我的建议是将您的代码更改如下:

      // Use Peek instead of Dequeue, because the item should not be removed from the queue if it fails
      while true do
        case fQueue.Peek(fCurrentFile,10) 
          wrSignaled:
            try
              if UploadFile(fCurrentFile) = '' then
              begin
                fQueue.Dequeue(Temp);  // Delete the item from the queue if succesful
                SignalComplete;
              end;
            except
              on E: Exception do
                SignalError(E.Message);
            end;
          wrTimeout: sleep(10);
          wrIOCompletion,
          wrAbandoned,
          wrError: break;
        end; //case
    

    这样,peek 不会无限期地持有 FQueue 上的锁,而是为 Enqueue 留下一个窗口来获取它并从主 (UI) 线程添加文件。

    【讨论】:

    • 感谢您的详细回答。我同意这两条 TMonitor.Enter() 行可能会导致死锁。 TSimpleThreadedQueue.Peek/Dequeue 中的 TMonitor.Enter() 后面是 TMonitor.Wait()。如果我理解正确,Wait 会暂时释放锁并允许其他线程在 Enqueue 方法中放置锁,因此它不应该导致死锁。 Wait 然后尝试再次放置锁。我很少发生的死锁,而如果是您描述的情况,几乎每次都会发生,因为线程在队列中没有任何数据之前就启动了。
    • Hummm .. 查看 TMonitor.Enter 的源代码,如果您不设置 SpinCount,我认为情况并非如此 - 大多数旋转代码在 SpinCount= 处被跳过0 最终到达这一行的地方:Result := MonitorSupport.WaitOrSignalObject(nil, GetEvent, Timeout) = WAIT_OBJECT_0;
    • 我相信是这样的——但是,根据我的阅读并试图理解显示器的作用,它会旋转一段特定的时间(应该很短)——当它变成再长一点,你就有可能陷入死锁 - 看看这篇关于 SpinLock 的 Wiki 文章 -- en.wikipedia.org/wiki/Spinlock
    【解决方案3】:

    这可能是一个长镜头,但这里有另一种可能性[前一个答案可能更有可能](我刚刚遇到过,但以前知道的):使用 Synchronize 可能导致死锁。这是一篇关于为什么会发生这种情况的博客: Delphi-Workaround-for-TThread-SynchronizeWaitFor-.aspx

    文章中的相关点:

    线程 A 调用 Synchronize(MethodA)

    线程 B 调用 Synchronize(MethodB)

    然后,在主线程的上下文中:

    主线程在处理消息时调用 CheckSynchronize()

    CheckSynchronize 用于批处理所有等待的调用 (*)。所以它拿起 等待调用队列(包含 MethodA 和 MethodB)和循环 一一通过。

    MethodA 在主线程中执行 语境。假设 MethodA 调用 ThreadB.WaitFor

    等待来电 CheckSynchronize 处理任何等待的 Synchronize 调用

    理论上,这应该然后处理ThreadB的Synchronize(MethodB), 允许线程 B 完成。但是,MethodB 已经是一个 拥有第一个 CheckSynchronize 调用,所以它永远不会得到 调用。

    死锁!

    Embarcadero QC article 更详细地描述了问题。

    虽然我在上面的代码中没有看到任何 ProcessMessages 调用,或者就此而言,在同步期间将调用 WaitFor,但在调用同步时,另一个线程调用仍然可能是一个问题同步也是如此 - 但主线程已经同步并且正在阻塞。

    一开始我并没有对此感到满意,因为我倾向于避免像瘟疫一样的同步调用,并且通常使用其他方法(例如消息传递和带有消息通知的线程安全列表而不是同步调用)从线程设计 UI 更新。

    【讨论】:

    • 再次感谢您详细介绍此问题。对于延迟的回复,我很抱歉,这些天我正在旅行......你在这里描述的内容也发生在我身上,我认为同步是问题所在。我使用它是因为 OSX 上没有 SendMessage/PostMessage,或者至少我不知道是否有替代方案。所以当时同步是一个简单的解决方案。前段时间我重写了很多代码,我没有再出现这种冻结,但我不知道问题出在哪里。可能与我使用的 Indy TCP 组件有关,因为它们在 OSX 上不是很稳定...
    • 没问题。我正在寻找其他东西,并遇到了这个没有答案的帖子。我将它用作学习练习,以查看监视器类会做什么(我从未使用过它)。我总是对可以改进我的线程代码的不同技术感兴趣(主要是降低 CPU 使用率,但也有不同的通信方法)。这是一次有趣的课堂学习,希望其他人也能从讨论中受益。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-11-15
    • 1970-01-01
    • 1970-01-01
    • 2021-10-25
    • 1970-01-01
    相关资源
    最近更新 更多