【问题标题】:Download multiple files concurrently from FTP using FluentFTP with a maximum value使用 FluentFTP 从 FTP 并发下载多个文件,最大值
【发布时间】:2021-05-20 07:05:14
【问题描述】:

我想从 FTP 目录递归下载多个下载文件,为此我使用 FluentFTP 库,我的代码是这样的:

private async Task downloadRecursively(string src, string dest, FtpClient ftp)
{

    foreach(var item in ftp.GetListing(src))
    {
        if (item.Type == FtpFileSystemObjectType.Directory)
        {
            if (item.Size != 0)
            {
                System.IO.Directory.CreateDirectory(Path.Combine(dest, item.Name));
                downloadRecursively(Path.Combine(src, item.Name), Path.Combine(dest, item.Name), ftp);
            }
        }
        else if (item.Type == FtpFileSystemObjectType.File)
        {
            await ftp.DownloadFileAsync(Path.Combine(dest, item.Name), Path.Combine(src, item.Name));
        }
    }
}

我知道你每次下载都需要一个 FtpClient,但是我怎样才能使用一定数量的连接作为最大值,我想这个想法是创建、连接、下载和关闭我找到的每个文件,但只是同时有 X 个下载文件。另外我不确定我是否应该使用异步创建任务,线程和我最大的问题,如何实现所有这些。

@Bradley 的回答似乎不错,但问题确实读取了必须从外部文件下载的每个文件,并且它没有最大并发下载值,所以我不确定如何应用这两者要求。

【问题讨论】:

    标签: c# multithreading concurrency ftp fluentftp


    【解决方案1】:

    用途:

    var clients = new ConcurrentBag<FtpClient>();
    
    var opts = new ParallelOptions { MaxDegreeOfParallelism = maxConnections };
    Parallel.ForEach(files, opts, file =>
    {
        file = Path.GetFileName(file);
    
        string thread = $"Thread {Thread.CurrentThread.ManagedThreadId}";
        if (!clients.TryTake(out var client))
        {
            Console.WriteLine($"{thread} Opening connection...");
            client = new FtpClient(host, user, pass);
            client.Connect();
            Console.WriteLine($"{thread} Opened connection {client.GetHashCode()}.");
        }
    
        string remotePath = sourcePath + "/" + file;
        string localPath = Path.Combine(destPath, file);
        string desc =
            $"{thread}, Connection {client.GetHashCode()}, " +
            $"File {remotePath} => {localPath}";
        Console.WriteLine($"{desc} - Starting...");
        client.DownloadFile(localPath, remotePath);
        Console.WriteLine($"{desc} - Done.");
    
        clients.Add(client);
    });
    
    Console.WriteLine($"Closing {clients.Count} connections");
    foreach (var client in clients)
    {
        Console.WriteLine($"Closing connection {client.GetHashCode()}");
        client.Dispose();
    }
    

    另一种方法是启动固定数量的线程,每个线程都有一个连接,然后让它们从队列中挑选文件。

    有关实现的示例,请参阅我关于 WinSCP .NET 程序集的文章:
    Automating transfers in parallel connections over SFTP/FTP protocol


    关于 SFTP 的类似问题:
    Processing SFTP files using C# Parallel.ForEach loop not processing downloads

    【讨论】:

    • +1 用于正确选择 rarely useful,并被大量误用 ConcurrentBag&lt;T&gt; 类!
    【解决方案2】:

    这是TPL Dataflow 方法。 BufferBlock&lt;FtpClient&gt; 用作FtpClient 对象池。递归枚举采用IEnumerable&lt;string&gt; 类型的参数,该参数包含一个文件路径的段。在构建本地和远程文件路径时,这些段的组合方式不同。作为调用递归枚举的副作用,远程文件的路径被发送到ActionBlock&lt;IEnumerable&lt;string&gt;&gt;。该块处理文件的并行下载。它的Completion 属性最终包含了整个操作过程中可能发生的所有异常。

    public static Task FtpDownloadDeep(string ftpHost, string ftpRoot,
        string targetDirectory, string username = null, string password = null,
        int maximumConnections = 1)
    {
        // Arguments validation omitted            
        if (!Directory.Exists(targetDirectory))
            throw new DirectoryNotFoundException(targetDirectory);
        var fsLocker = new object();
    
        var ftpClientPool = new BufferBlock<FtpClient>();
    
        async Task<TResult> UsingFtpAsync<TResult>(Func<FtpClient, Task<TResult>> action)
        {
            var client = await ftpClientPool.ReceiveAsync();
            try { return await action(client); }
            finally { ftpClientPool.Post(client); } // Return to the pool
        }
    
        var downloader = new ActionBlock<IEnumerable<string>>(async path =>
        {
            var remotePath = String.Join("/", path);
            var localPath = Path.Combine(path.Prepend(targetDirectory).ToArray());
            var localDir = Path.GetDirectoryName(localPath);
            lock (fsLocker) Directory.CreateDirectory(localDir);
            var status = await UsingFtpAsync(client =>
                client.DownloadFileAsync(localPath, remotePath));
            if (status == FtpStatus.Failed) throw new InvalidOperationException(
                $"Download of '{remotePath}' failed.");
        }, new ExecutionDataflowBlockOptions()
        {
            MaxDegreeOfParallelism = maximumConnections,
            BoundedCapacity = maximumConnections,
        });
    
        async Task Recurse(IEnumerable<string> path)
        {
            if (downloader.Completion.IsCompleted) return; // The downloader has failed
            var listing = await UsingFtpAsync(client =>
                client.GetListingAsync(String.Join("/", path)));
            foreach (var item in listing)
            {
                if (item.Type == FtpFileSystemObjectType.Directory)
                {
                    if (item.Size != 0) await Recurse(path.Append(item.Name));
                }
                else if (item.Type == FtpFileSystemObjectType.File)
                {
                    var accepted = await downloader.SendAsync(path.Append(item.Name));
                    if (!accepted) break; // The downloader has failed
                }
            }
        }
    
        // Move on to the thread pool, to avoid ConfigureAwait(false) everywhere
        return Task.Run(async () =>
        {
            // Fill the FtpClient pool
            for (int i = 0; i < maximumConnections; i++)
            {
                var client = new FtpClient(ftpHost);
                if (username != null && password != null)
                    client.Credentials = new NetworkCredential(username, password);
                ftpClientPool.Post(client);
            }
    
            try
            {
                // Enumerate the files to download
                await Recurse(new[] { ftpRoot });
                downloader.Complete();
            }
            catch (Exception ex) { ((IDataflowBlock)downloader).Fault(ex); }
    
            try
            {
                // Await the downloader to complete
                await downloader.Completion;
            }
            catch (OperationCanceledException)
                when (downloader.Completion.IsCanceled) { throw; }
            catch { downloader.Completion.Wait(); } // Propagate AggregateException
            finally
            {
                // Clean up
                if (ftpClientPool.TryReceiveAll(out var clients))
                    foreach (var client in clients) client.Dispose();
            }
        });
    }
    

    使用示例:

    await FtpDownloadDeep("ftp://ftp.test.com", "", @"C:\FtpTest",
        "username", "password", maximumConnections: 10);
    

    注意: 上面的实现会按照下载过程的节奏懒惰地枚举远程目录。如果您喜欢急切地枚举它,尽快收集有关远程列表的所有可用信息,只需从下载文件的ActionBlock 中删除BoundedCapacity = maximumConnections 配置。请注意,这样做可能会导致高内存消耗,以防远程目录具有较深的子文件夹层次结构,累积包含大量小文件。

    【讨论】:

      【解决方案3】:

      我会把它分成三个部分。

      1. 递归构建源和目标对列表。
      2. 创建所需的目录。
      3. 同时下载文件。

      这是最后一部分,速度很慢,应该并行完成。

      代码如下:

      private async Task DownloadRecursively(string src, string dest, FtpClient ftp)
      {
          /* 1 */
          IEnumerable<(string source, string destination)> Recurse(string s, string d)
          {
              foreach (var item in ftp.GetListing(s))
              {
                  if (item.Type == FtpFileSystemObjectType.Directory)
                  {
                      if (item.Size != 0)
                      {
                          foreach(var pair in Recurse(Path.Combine(s, item.Name), Path.Combine(d, item.Name)))
                          {
                              yield return pair;
                          }
                      }
                  }
                  else if (item.Type == FtpFileSystemObjectType.File)
                  {
                      yield return (Path.Combine(s, item.Name), Path.Combine(d, item.Name));
                  }
              }
          }
      
          var pairs = Recurse(src, dest).ToArray();
          
          /* 2 */
          foreach (var d in pairs.Select(x => x.destination).Distinct())
          {
              System.IO.Directory.CreateDirectory(d);
          }
      
          /* 3 */
          var downloads =
              pairs
                  .AsParallel()
                  .Select(x => ftp.DownloadFileAsync(x.source, x.destination))
                  .ToArray();
          
          await Task.WhenAll(downloads);
      }
      

      代码应该干净、整洁且易于推理。

      【讨论】:

      • 除非我错了,否则此解决方案将同时下载所有文件。但是 OP 只想同时下载 X 个文件。顺便说一句,PLINQ 对异步不友好。
      • @TheodorZoulias - 当然,我需要添加最大并行选项。在任何情况下,我都没有在我的代码中使用带有 PLINQ 的 async
      • 没错,这不是async void 代表的情况。问题是DownloadFileAsync 返回一个Task,而PLINQ 对任务一无所知。所以DownloadFileAsync方法创建的所有任务都被忽略了,它们不是awaited,所以它们变成了即发即弃的任务。
      • 默认情况下,您不能在一个FtpClient 实例上运行多个并行传输。你可以,如果你设置EnableThreadSafeDataConnections,但是每次文件传输都会打开一个新的连接,这将是非常低效的(尤其是在下载大量小文件时)。
      • @TheodorZoulias - 是的,但我不会等待他们,直到我创建了最终的任务数组。我已经将 PLINQ 和 await 完全分开了。
      猜你喜欢
      • 1970-01-01
      • 2017-05-18
      • 2017-04-12
      • 1970-01-01
      • 2012-02-02
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多