【问题标题】:Importing data through API for thousands of users by using threads使用线程通过API为成千上万的用户导入数据
【发布时间】:2010-09-14 09:51:58
【问题描述】:

在我们的应用程序中,我们需要通过 API 为我的应用程序的用户从 paypal 导入交易数据并存储在数据库中。我有成千上万的用户(现在大约 5k),而且每天都在增加。

此应用程序是一个 .net windows 服务。

这会按小时为所有用户导入数据。目前我们正在为用户一个接一个地导入数据,但有时会发生一个用户的数据可能太大以至于需要大约 5 小时才能获取他的全部数据,因此我们会阻止其他用户,直到此用户数据导入完成。对于所有其他用户来说,这个每小时导入一次完全没有用。

为了避免这种情况,我们考虑为每个用户导入创建线程,并使用 Windows 服务每小时运行一次。在这里,我们需要考虑任何时间点的带宽,因为所有线程将同时启动。这是一个问题吗?

现在,我想知道我们的新实现是否正确?我也想知道它通常是怎么做的?如果有人遇到过这种功能,请告诉我们它是如何完成的。

如果我的问题不够清楚,请告诉我,我会提供更多信息。

编辑:如果我从一个 IP 向 Paypal 发送这么多请求,它是如何处理的?知道它是否限制每个 IP 的请求?

更新:感谢所有建议和反馈。

我想使用 jgauffin 的解决方案,因为它是 ThreadPool 的完美模仿。但是这里我需要更多的特性,比如动态改变线程限制和递归调用回调方法。

经过大量研究和分析线程池,我决定使用SmartThreadPool,它是基于线程池逻辑但具有更多功能的。它非常好,完美地服务于我的目的。

【问题讨论】:

    标签: c# .net windows-services import paypal


    【解决方案1】:

    不要为每个用户使用一个线程。为每个用户在线程池中放置一个 WORK ITEM。这样您就可以两全其美 - 不是 5000 个线程的内存开销,以及更多的负载控制,因为您可以确定 ThreadPool 使用多少线程来处理工作项。

    【讨论】:

    • 假设您的线程池大小为 10,在前 100 个用户中,有 15 个用户拥有如此多的事务数据,因此导入每个用户的数据至少需要 2 小时。这不会阻塞至少 2 小时吗?
    • 当然。根本没有办法。
    • 我读过关于线程池的文章,总是建议如果线程要运行很长时间,那么在这种情况下不建议使用线程池。就我而言,是的,每个请求都可能需要时间,因为它正在向贝宝发送请求并从那里获得响应。如果我在该池中限制 10 个线程,这种情况是否适用于线程池?我现在对这个解决方案有点困惑。
    • 然后重新实现并使用自己的线程池。也就是说,我认为应该没问题 - 贝宝的交易运行时间并不长。
    【解决方案2】:

    我要做的是从一个线程池(比如 10 个)开始,然后让每个线程执行一次导入。完成后,它将从队列中取出下一个项目。您利用现有的ThreadPool 类并将所有导入请求排队到该线程池。您可以控制此 ThreadPool 的最大线程数。

    创建数千个线程是一个坏主意,原因有几个,它曾经对于 Windows 操作系统来说太多了,而且正如您自己指出的那样,您可能会淹没网络(或者可能是 paypal 服务)。

    为了获得极高的可扩展性,您可以执行异步 IO,在请求进行时不会阻塞线程,但该 API 的学习曲线陡峭,您的场景可能不需要。

    【讨论】:

    • 很抱歉,我没有得到你关于异步 IO 和 API 依赖的最后一点。你能给我更多的想法吗?
    • 假设您的线程池大小为 10,在前 100 个用户中,有 15 个用户拥有如此多的事务数据,因此导入每个用户的数据至少需要 2 小时。这不会阻塞至少 2 小时吗?
    【解决方案3】:

    我会使用一个队列,比如说五个线程。每次线程完成时,它都会从队列中获取一个新用户。

    示例代码:

    public class Example
    {
    
        public static void Main(string[] argv)
        {
            //setup
            DownloadQueue personQueue = new DownloadQueue();
            personQueue.JobTriggered += OnHandlePerson;
            personQueue.ThreadLimit = 10; //can be changed at any time and will be adjusted when a job completed (or a new one is enqueued)
    
            // enqueue as many persons as you like
            personQueue.Enqueue(new Person());
    
            Console.ReadLine();
        }
    
        public static void OnHandlePerson(object source, PersonEventArgs e)
        {
            //download persno here.
        }
    }
    
    public class DownloadQueue
    {
        Queue<Person> _queue = new Queue<Person>();
        int _runningThreads = 0;
    
        public int ThreadLimit { get; set; }
    
        /// <summary>
        /// Enqueue a new user.
        /// </summary>
        /// <param name="person"></param>
        public void Enqueue(Person person)
        {
            lock (_queue)
            {
                _queue.Enqueue(person);
                if (_runningThreads < ThreadLimit)
                    ThreadPool.QueueUserWorkItem(DownloadUser);
            }
        }
    
        /// <summary>
        /// Running using a ThreadPool thread.
        /// </summary>
        /// <param name="state"></param>
        private void DownloadUser(object state)
        {
            lock (_queue)
                ++_runningThreads;
    
            while (true)
            {
                Person person;
                lock (_queue)
                {
                    if (_queue.Count == 0)
                    {
                        --_runningThreads;
                        return; // nothing more in the queue. Lets exit
                    }
                    person = _queue.Dequeue();
                }
    
                JobTriggered(this, new PersonEventArgs(person));
            }
        }
    
        public event EventHandler<PersonEventArgs> JobTriggered = delegate { };
    }
    
    
    public class PersonEventArgs : EventArgs
    {
        Person _person;
    
        public PersonEventArgs(Person person)
        {
            _person = person;
        }
    
        public Person Person { get { return _person; } }
    }
    public class Person
    {
        public Person(string fName, string lName)
        {
            this.firstName = fName;
            this.lastName = lName;
        }
    
        public string firstName;
        public string lastName;
    }
    

    【讨论】:

      【解决方案4】:

      在代码中创建 5000 个线程并不是一件好事,它可能会大大降低服务器速度,甚至可能导致服务器崩溃。

      您需要的是这里的负载平衡。

      如果您在 .net 平台和 quequ 用户请求上,请尝试考虑基于 MSMQ 的解决方案,然后必须有一些 dispather 会在服务器之间分发用户请求。

      【讨论】:

      • 我认为这里不需要负载平衡。问题不在于他的机器太忙,而在于单线程设计中无法控制的网络延迟。
      【解决方案5】:

      我会避免为每个用户创建一个线程。这种方法的可扩展性不是很高。我假设 API 没有异步下载的机制。如果是这样,那么这可能就是要走的路。

      生产者-消费者模式在这里可能很好用。这个想法是创建固定大小的线程池,这些线程池使用共享队列中的工作项。在您的情况下,最好避免使用ThreadPool,因为它主要是为短期任务而设计的。您不希望您的长期任务耗尽它,因为它在 .NET BCL 中用于许多不同的事情。

      如果您使用的是 .NET 4.0,则可以利用 BlockingCollection。作为Reactive Extensions 下载的一部分,还提供了一个反向移植。这是您的代码的样子。

      注意:您必须自己强化代码以使其更健壮、正常关闭等。

      public class Importer
      {
        private BlockingCollection<Person> m_Queue = new BlockingCollection<Person>();
      
        public Importer(int poolSize)
        {
          for (int i = 0; i < poolSize; i++)
          {
            var thread = new Thread(Download);
            thread.IsBackground = true;
            thread.Start();
          }
        }
      
        public void Add(Person person)
        {
          m_Queue.Add(person);
        }
      
        private void Download()
        {
          while (true)
          {
            Person person = m_Queue.Take();
            // Add your code for downloading this person's data here.
          }
        }
      }
      

      【讨论】:

        猜你喜欢
        • 2014-12-29
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多