【问题标题】:Consumer Producer- Producer thread never executes assigned function消费者生产者 - 生产者线程从不执行分配的功能
【发布时间】:2018-11-21 06:54:14
【问题描述】:

我有 .NET Core Web API 解决方案。在每次调用中,我都需要执行一些数据库操作。
问题是一次打开和关闭多个数据库连接。所以为了避免它,我想实现要发送到数据库的对象队列,然后想要一个单独的线程来执行数据库操作。
我尝试了如下代码。但是在这里,消费者线程从不执行分配的功能。 Producer 没有单独的线程,我只是在为队列提供对象。
我应该做哪些修改?需要一些指导,因为我是线程方面的新手。

  public static class BlockingQueue
{
    public static Queue<WebServiceLogModel> queue;
    static BlockingQueue()
    {
        queue = new Queue<WebServiceLogModel>();

    }

    public static object Dequeue()
    {
        lock (queue)
        {
            while (queue.Count == 0)
            {
                Monitor.Wait(queue);
            }
            return queue.Dequeue();
        }
    }
    public static void Enqueue(WebServiceLogModel webServiceLog)
    {
        lock (queue)
        {
            queue.Enqueue(webServiceLog);
            Monitor.Pulse(queue);
        }
    }

    public static void ConsumerThread(IConfiguration configuration)
    {
        WebServiceLogModel webServiceLog = (WebServiceLogModel)Dequeue();
        webServiceLog.SaveWebServiceLog(configuration);
    }

   public static void ProducerThread(WebServiceLogModel webServiceLog)
    {
        Enqueue(webServiceLog);
         Thread.Sleep(100);
    }
}

我在 StartUp.cs 中创建并启动了线程:

    public Startup(IConfiguration configuration)
    {
        Thread t = new Thread(() => BlockingQueue.ConsumerThread(configuration));
        t.Start();
    }

在 Controller 中,我编写了代码来提供队列:

    [HttpGet]
    [Route("abc")]
    public IActionResult GetData()
    {
        BlockingQueue.ProducerThread(logModel);
        return StatusCode(HttpContext.Response.StatusCode = (int)HttpStatusCode.NotFound, ApplicationConstants.Message.NoBatchHistoryInfo);
    }

【问题讨论】:

  • "问题是一次打开和关闭多个数据库连接。所以要避免它"你为什么要这样做,你有并发问题吗?
  • 是的。我面临的并发问题。有什么建议吗?
  • 那我也去看看TPL,如果有帮助的话。
  • 两个建议,首先不要将您用作锁的对象公开。要确保在不让其他对象加入聚会的情况下不会使某些东西陷入僵局,这已经够难的了。其次,看一下 BlockingCollection,它为您实现了生产者消费者模式。
  • 我听说过阻止收集。我会检查它是否对我有帮助。谢谢。

标签: c# multithreading .net-core queue thread-synchronization


【解决方案1】:

首先,尽量避免static类和方法。在这种情况下使用模式单例(如果你真的需要这个)。 其次,尽量避免lockMonitor - 这些并发原语会显着降低您的性能。 在这种情况下,您可以使用 BlockingCollection 作为上面提到的“Adam G”,或者您可以开发自己的解决方案。

public class Service : IDisposable
{
    private readonly BlockingCollection<WebServiceLogModel> _packets =
        new BlockingCollection<WebServiceLogModel>();
    private Task _task;
    private volatile bool _active;
    private static readonly TimeSpan WaitTimeout = TimeSpan.FromSeconds(1);

    public Service()
    {
        _active = true;
        _task = ExecTaskInternal();
    }

    public void Enqueue(WebServiceLogModel model)
    {
        _packets.Add(model);
    }

    public void Dispose()
    {
        _active = false;
    }

    private async Task ExecTaskInternal()
    {
        while (_active)
        {
            if (_packets.TryTake(out WebServiceLogModel model))
            {
                // TODO: whatever you need
            }
            else
            {
                await Task.Delay(WaitTimeout);
            }
        }
    }
}

public class MyController : Controller
{
    [HttpGet]
    [Route("abc")]
    public IActionResult GetData([FromServices] Service service)
    {
        // receive model form somewhere
        WebServiceLogModel model = FetchModel();
        // enqueue model
        service.Enqueue(model);
        // TODO: return what you need
    }
}

在启动中:

public class Startup
{
    public void ConfigureServices(IServiceCollection services)
    {
        services.AddSingleton<Service>();
        // TODO: other init staffs
    }
}

您甚至可以向服务添加 Start/Stop 方法,而不是实现 IDisposable,并在方法 Configure(IApplicationBuilder app) 的启动类中启动您的服务。

【讨论】:

  • 嗨,伊戈尔。感谢您提供解决方案。我已经根据我的要求进行了少量修改。我以前没有使用过线程。所以被卡住了。顺便说一句,如果多个端点尝试同时添加 WebServiceLog,这会完美地工作吗?再次感谢您的帮助! :)
  • 根据文档:link 这个类是为并发访问而设计的。
【解决方案2】:

如果队列中有东西,我认为您的消费者线程只执行一次,然后立即返回。如果你想让一个线程在后台工作,它只启动一次,它不应该返回并且应该捕获所有异常。您来自BlockingQueue.ConsumerThread 的线程在Stratup 中被调用一次并返回。

另外请注意,这样做是不安全的。如果没有请求进入,ASP.NET 不保证后台线程正在运行。您的应用程序池可以回收(默认情况下,它会在 20 分钟不活动或每 27 小时回收一次),因此您的后台有可能某些队列项不会执行代码。

另外,虽然它不能解决所有问题,但我建议使用https://www.hangfire.io/ 在 ASP.NET 服务器中执行后台任务。它具有持久层,可以重试作业并具有简单的 API。在您的请求处理程序中,您可以将新作业推送到 Hangfire,然后只有 1 个作业处理器线程。

【讨论】:

  • 解决此类问题的有效方法是什么?我面临并发问题。所以想将对象存储在队列中,然后想要一个单独的线程来执行数据库的东西。
  • 理想和最可靠的解决方案是拥有单独的队列(RabbitMQ/ServiceBus 等)和不在 ASP.Net 中运行的单独服务(另一台机器/Windows 服务等)将处理该队列。原因:在应用程序域重新启动后幸存下来,因为如果您在内存中有大量任务并且应用程序域重新启动,那么您将失去一切。
  • 根据您的情况,使用 HangFire 是足够好的解决方案。它将为您处理持久性、线程管理、锁等。您只需将项目推送到它,然后它将调用您的作业处理程序。
  • 不幸的是,在纯 .net 概念的帮助下,我无法使用任何第三方解决方案来解决此问题。 :|
  • 实际上,我有点不确定并发数据库操作是否是问题的根本原因,因为数据库旨在处理大量查询。但是,如果您确实认为是这种情况,也许您可​​以调整数据库连接池设置以仅允许整个应用程序的 1 个连接 docs.microsoft.com/en-us/dotnet/framework/data/adonet/… ,但是您应该为每个数据库访问添加重试和错误处理实现,因为现在它可能会失败超时/没有可用的连接。
猜你喜欢
  • 1970-01-01
  • 2018-09-24
  • 2017-02-01
  • 2012-04-30
  • 1970-01-01
  • 1970-01-01
  • 2012-02-02
  • 1970-01-01
  • 2013-11-10
相关资源
最近更新 更多