【问题标题】:How I can start multiple download async in c#如何在 C# 中启动多个异步下载
【发布时间】:2021-09-27 14:30:59
【问题描述】:

我的问题如下:我只想根据用户同时运行 x 个请求。

好吧,当 MaxConcurrentDownloads 变量等于 1 时,它似乎工作正常,但是当我增加它时,比如 10:我必须等待 10taches 完成它才能执行,以便 Console.WriteLine 写,什么时候应该异步运行,对吧?

你能帮帮我吗?这是我的“问题”的极简版本(另外我想指定我没有编译器或语法错误)

main.c

using System;
using System.IO;
using System.Net;
using System.Net.Http;
using System.Threading;
using System.Threading.Tasks;

namespace test_client
{
    class Program
    {
        private static client cli = new client();

        private static readonly string PATH = @Directory.GetCurrentDirectory();

        private static int concurrency = 100;

        private static async Task<bool> MakeJOB(int pos)
        {
            return await cli.NewRequest<bool>((HttpClient client)=>
            {
                try
                {
                    HttpClientHandler handler = null;
                    if (cli.handler != null)
                        handler = cli.GethandlerIndexed(pos);
                    client = new HttpClient(handler);

                    cli.AssignDefaultHeaders(client);

                    using (HttpResponseMessage response = client.GetAsync("https://api.my-ip.io/ip.txt").Result)
                    using (HttpContent content = response.Content)
                        Console.WriteLine(content.ReadAsStringAsync().Result + " / " + Task.CurrentId);
                    return true;
                }
                catch { /* exception .. */ return false; }
            });
        }

        static void Main(string[] args)
        {
            ServicePointManager.DefaultConnectionLimit = 100;
            MainAsync(args).GetAwaiter().GetResult();
            Console.ReadLine();
        }

        static async Task MainAsync(string[] args)
        {
            cli.SetConcurrentDownloads(concurrency);

            var t = new Task[concurrency];
            int pos = 0;
            for (int i = 0; i < t.Length; i++, pos++)
                t[i] = MakeJOB(pos++);
            await Task.WhenAll(t);
        }
    }
}

client.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using System.Net;
using System.Net.Http;
using System.Collections.Concurrent;
using System.Threading;

namespace test_client
{
    public class client
    {
        private readonly ConcurrentDictionary<string, HttpClient> Clients;
        public SemaphoreSlim Locker;
        private CancellationTokenSource TokenSource = new CancellationTokenSource();

        public HttpClientHandler[] handler { get; set; }
        public string[] address { get; set; }
        public string[] port { get; set; }
        public string[] username { get; set; }
        public string[] password { get; set; }
        public int MaxConcurrentDownloads { get; set; }

        private void initializeHandler(string address = "", string port = "", string user = "", string pass = "")
        {
            initializeHandler(new string[] { string.Concat(address, ":", port, ":", user, ":", pass) });
        }

        private void initializeHandler(string[] proxies_client)
        {
            if (proxies_client == null || proxies_client.Length == 0)
                return;

            this.address = new string[proxies_client.Length];
            this.port = new string[proxies_client.Length];
            this.username = new string[proxies_client.Length];
            this.password = new string[proxies_client.Length];
            for (int i = 0; i < proxies_client.Length; i++)
            {
                var split = proxies_client[i].Split(new char[] { ':' });

                this.address[i] = split[0] != "" ? split[0] : "";
                this.port[i] = split[1] != "" ? split[1] : "";
                this.username[i] = split[2] != "" ? split[2] : "";
                this.password[i] = split[3] != "" ? split[3] : "";
            }

            var proxies = new WebProxy[proxies_client.Length];
            NetworkCredential[] credential = new NetworkCredential[proxies_client.Length];
            for (int i = 0; i < proxies_client.Length; i++)
            {
                if (this.username[i] != "")
                    credential[i] = new NetworkCredential(this.username[i], this.password[i]);
                else
                    credential[i] = CredentialCache.DefaultNetworkCredentials;
            }

            const string protocol = "http://";
            for (int i = 0; i < proxies.Length; i++)
            {
                if (this.address[i] != "")
                {
                    var uri = proxies_client[i].Split(new char[] { ':' });
                    if (!uri[0].Contains(protocol))
                        uri[0] = string.Concat(protocol, uri[0]);
                    proxies[i] = new WebProxy()
                    {
                        Address = new Uri(string.Concat(uri[0], ":", uri[1])),
                        Credentials = credential[i],
                    };
                }
            };

            this.handler = new HttpClientHandler[proxies.Length];
            for (int i = 0; i < proxies.Length; i++)
            {
                if (proxies[i].Address.AbsoluteUri != "")
                    this.handler[i] = new HttpClientHandler() { Proxy = proxies[i] };
                else
                    this.handler[i] = new HttpClientHandler();
            }
        }

        public HttpClientHandler GethandlerIndexed(int index)
        {
            return (this.handler[index % this.handler.Length]);
        }

        public void SetConcurrentDownloads(int nb = 1)
        {
            Locker = new SemaphoreSlim(nb, nb);
        }
        public client(string[] proxies = null)
        {
            Clients = new ConcurrentDictionary<string, HttpClient>();

            if (Locker is null)
                Locker = new SemaphoreSlim(1, 1);
            if (proxies != null)
                initializeHandler(proxies);
        }

        private async Task<HttpClient> CreateClient(string Name, bool persistent, CancellationToken token)
        {
            if (Clients.ContainsKey(Name))
                return Clients[Name];

            HttpClient newClient = new HttpClient();

            if (persistent)
            {
                while (Clients.TryAdd(Name, newClient) is false)
                {
                    token.ThrowIfCancellationRequested();
                    await Task.Delay(1, token);
                }
            }

            return newClient;
        }

        public async Task<T> NewRequest<T>(Func<HttpClient, T> Expression, int? MaxTimeout = 2000, string Id = null)
        {
            await Locker.WaitAsync(MaxTimeout ?? 2000, TokenSource.Token);

            bool persistent = true;
            if (Id is null)
            {
                persistent = false;
                Id = string.Empty;
            }

            try
            {
                HttpClient client = await CreateClient(Id, persistent, TokenSource.Token);
                T result = await Task.Run<T>(() => Expression(client), TokenSource.Token);

                if (persistent is false)
                    client?.Dispose();

                return result;
            }
            finally
            {
                Locker.Release();
            }
        }

        public void AssignDefaultHeaders(HttpClient client)
        {
            client.DefaultRequestHeaders.Add("User-Agent", "Mozilla/5.0 (Windows NT 10; WOW64) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/65.0.3325.181 Safari/537.36");
            //client.Timeout = TimeSpan.FromSeconds(3);
        }

        public async Task Cancel(string Name)
        {
            if (Clients.ContainsKey(Name))
            {
                CancellationToken token = TokenSource.Token;
                HttpClient foundClient;

                while (Clients.TryGetValue(Name, out foundClient) is false)
                {
                    token.ThrowIfCancellationRequested();
                    await Task.Delay(1, token);
                }

                if (foundClient != null)
                {
                    foundClient?.Dispose();
                }
            }
        }

        public void ForceCancelAll()
        {
            TokenSource?.Cancel();
            TokenSource?.Dispose();
            TokenSource = new CancellationTokenSource();

            foreach (var item in Clients)
            {
                item.Value?.Dispose();
            }

            Clients.Clear();
        }
    }
}

【问题讨论】:

  • 作为一般规则,如果您正在访问任务的.Result 属性,那么您做错了什么。你应该awaittasks。

标签: c# asynchronous download


【解决方案1】:

我在快速浏览中发现的一件事:您在main.cs::Program.MakeJOB 中的线路:

using (HttpResponseMessage response = client.GetAsync("https://api.my-ip.io/ip.txt").Result)

应该改为阅读

using (HttpResponseMessage response = await client.GetAsync("https://api.my-ip.io/ip.txt"))

这可能不是您唯一的问题,但是通过不等待 GetAsync 方法,您实际上是在阻塞请求,而不是将控制权交还给调用者,以便它可以在任务之间进行上下文切换或排队其他任务任务。

同样如此

Console.WriteLine(content.ReadAsStringAsync().Result + " / " + Task.CurrentId);

应该是

Console.WriteLine((await content.ReadAsStringAsync()) + " / " + Task.CurrentId);

虽然那个人不太可能在很长一段时间内阻塞。

【讨论】:

  • 嘿,我试过了,还是不行..
猜你喜欢
  • 2012-12-31
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-05-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多