【问题标题】:How would you implement this "WorkerChain" functionality in .NET?您将如何在 .NET 中实现这个“WorkerChain”功能?
【发布时间】:2010-04-24 16:39:44
【问题描述】:

编辑:我有点太晚了(?)我在第一次更新这个问题时发布的所有代码对于大多数读者来说太多 .实际上,我已经继续前进了,written a blog post about this topic 供任何愿意阅读它的人使用。

与此同时,我将原始问题留在原处,简要介绍一下我想解决的问题。

我还要注意,到目前为止,我发布的代码(在我的博客上)已经很好地经受住了测试。但我仍然对人们愿意就它的干净/稳健/性能*提供的任何和所有反馈感兴趣。

*我喜欢doesn't really mean what we think这个词,但我们开发人员一直都在使用它。


原始问题

抱歉,问题标题含糊不清——不知道如何简洁地概括我在下面提出的问题。 (如果有编辑权限的人能想到一个更具描述性的标题,请随意更改。)

我需要的行为是这样的。我正在设想一个在其构造函数中接受单个委托任务的工作类(为简单起见,我将使其不可变——实例化后不能再添加任务)。我将此任务称为T。该类应该有一个简单的方法,比如GetToWork,它会表现出这种行为:

  1. 如果工作人员当前没有运行T,那么它将立即开始运行。
  2. 如果工作人员正在当前运行T,那么一旦完成,它将立即再次启动T
  3. GetToWork 可以在 worker 运行时调用任意次数 T;简单的规则是,在T 的任何执行期间,如果GetToWork 被调用至少一次T 将在完成后再次运行(然后如果GetToWork 被调用而@ 987654334@ 正在运行那个时间,它会再次重复,等等)。

现在,使用布尔开关非常简单。但是这个类需要线程安全,我的意思是,上面的步骤 1 和 2 需要包含原子操作(至少我认为是这样)。

增加了一层复杂性。我需要一个“工人链”课程,它将由许多连接在一起的工人组成。一旦第一个工作人员完成,它实际上会在 after 工作人员上调用GetToWork;同时,如果它自己的GetToWork 已被调用,它也会重新启动自己。从逻辑上讲,在 上调用 GetToWork 与在链中 的第一个工人上调用 GetToWork 基本相同(我完全打算不公开该链的工人可访问)。

想象这个假设的“工人链”会如何表现的一种方法是将其与接力赛中的团队进行比较。假设有四个跑步者,W1W4,让链被称为C。如果我打电话给C.StartWork(),应该是这样的:

  1. 如果W1在他的起点(即什么都不做),他将开始向W2跑。
  2. 如果W1已经跑向W2(即执行他的任务),那么一旦他到达W2,他就会向W2发信号开始,立即返回到他的起点,因为StartWork已经被调用,再次开始向W2跑。
  3. W1到达W2的起点时,他会立即回到自己的起点。
    1. 如果W2 只是坐在那里,他会立即开始跑向W3
    2. 如果W2 已经跑向W3,那么W2 将在到达W3 并返回起点后再次运行。

上面的内容可能有点令人费解并且写得不好。但希望你能得到基本的想法。显然,这些工作线程将在自己的线程上运行。

另外,我猜这个功能可能已经存在于某个地方?如果是这样,一定告诉我!

【问题讨论】:

  • 如果您尝试过,请发布代码。如果您还没有尝试...好吧..尝试然后发布代码。 ;-)
  • @Sky:很公平。我正在处理这个问题,当我有你知道的东西时,我会发布我所拥有的东西。 (现在它基本上写了一半,所以我发布了这个问题以寻求帮助。)
  • 我的理由是对于基本概念,例如“我如何将类名添加到 html 元素”这个问题足以表明用户需要一些帮助来学习某些东西,并且提供指导的工作很少。在这种情况下,您已经为一个不太重要的构造提供了详细的需求文档,并且能够回答这个问题需要大量的努力,虽然我和其他人不介意提供不太重要的指导,但这不是“请为我的网站编写代码。我绝不是在暗示这是你的意图,只是说......
  • 对于这样的事情,您尝试完成的示例将大大有助于为问题提供上下文,并且老实说,将证明您付出了很多努力将证明另一方付出的努力是合理的。请不要误会;我对这个问题很感兴趣,如果可以的话,我会回答它。
  • @Sky Sanders:我完全理解你来自哪里。代码正在开发中。

标签: .net multithreading synchronization


【解决方案1】:

使用信号量。每个worker都是一个线程,代码如下(伪代码):

WHILE(TRUE)
    WAIT_FOR_SEMAPHORE(WORKER_ID) //The semaphore for the current worker
    RESET_SEMAPHORE(WORKER_ID)
    /* DO WORK */
    POST_SEMAPHORE(NEXT_WORKER_ID) //The semaphore for the next worker
END

非零信号量意味着有人向当前线程发出信号来完成工作。在其入口行中获得非零信号量后,它会重置信号量(标记为没有人发出信号),执行工作(同时可以再次发布信号量)并为下一个工作人员发布信号量。故事在下一个工人中重演。

【讨论】:

  • 我有很多关于使用信号量的知识。谢谢你的建议——我现在正在调查他们。
【解决方案2】:

一个幼稚的实现,您可能会从中获得一些好处。

注意:

据我了解,标量类型,r.e.控制执行的布尔标志,具有原子分配,使它们成为您在这种情况下需要/想要的线程安全。

涉及信号量和其他策略的可能性要复杂得多,但如果简单的话......

using System;
using System.Threading;

namespace FlaggedWorkerChain
{
    internal class Program
    {
        private static void Main(string[] args)
        {
            FlaggedChainedWorker innerWorker = new FlaggedChainedWorker("inner", () => Thread.Sleep(1000), null);
            FlaggedChainedWorker outerWorker = new FlaggedChainedWorker("outer", () => Thread.Sleep(500), innerWorker);

            Thread t = new Thread(outerWorker.GetToWork);
            t.Start();

            // flag outer to do work again
            outerWorker.GetToWork();

            Console.WriteLine("press the any key");
            Console.ReadKey();
        }
    }

    public sealed class FlaggedChainedWorker
    {
        private readonly string _id;
        private readonly FlaggedChainedWorker _innerWorker;
        private readonly Action _work;
        private bool _busy;
        private bool _flagged;

        public FlaggedChainedWorker(string id, Action work, FlaggedChainedWorker innerWorker)
        {
            _id = id;
            _work = work;
            _innerWorker = innerWorker;
        }

        public void GetToWork()
        {
            if (_busy)
            {
                _flagged = true;
                return;
            }

            do
            {
                _flagged = false;
                _busy = true;
                Console.WriteLine(String.Format("{0} begin", _id));

                _work.Invoke();

                if (_innerWorker != null)
                {
                    _innerWorker.GetToWork();
                }
                Console.WriteLine(String.Format("{0} end", _id));

                _busy = false;
            } while (_flagged);
        }
    }
}

【讨论】:

  • 两个布尔标志的基本思想很简单,看起来很合理。我喜欢。但是,出于我的目的,此实现存在一个问题:给定的FlaggedChainedWorker 不会报告是免费的(即!_busy),直到链中的所有 工人——它自己_innerWorker,和那个工人的_innerWorker,等等——都完成了。第一个工作人员需要准备好在第二个工作时再次运行。
  • 我认为GetToWork 只需要启动 工人工作,而不是在分配的任务上调用Invoke(这可能是一个长期运行的任务)。所以我认为 BeginInvoke 是要走的路。不过,这是一种复杂的场景,所以我可能会遗漏一些东西。
  • @Dan - 我看到的含义是嵌套任务之间没有真正的关系,例如开始外部,外部开始内部,这需要两倍的时间,同时外部已被标记并执行两次,但只导致内部执行一次。并且执行次数之间的这种差异会随着工作人员的生命周期而增长。这是否描述了您的预期场景?这是一个不寻常的工作流程,但如上所述很容易实现。让我知道。或者,如果经过反思,您决定确实希望对外部 DoWork 的调用应该是原子的,那么... ;-)
  • @Dan - 关于调用与开始调用,实现是使用工作人员外部的线程管理,您在 cmets 中描述的似乎是某种线程池实现。这是我对所读内容的理解。
  • 执行次数的差异确实会增加。我想我所描述的是类似于线程池,但有几个重要的区别。首先,从不与自身重叠的意义上说,每个单独的工作项都需要是原子的(我认为原子是正确的词吗?)。如果我有一个有四个可用线程的线程池,并且我将相同的方法排队运行四次,它将同时在四个地方运行。其次,链应该代表一个固定的方法序列——A 后跟 B 后跟 C——而不是可变队列。
【解决方案3】:

在我看来,您过于复杂了。我以前写过这些“管道”类;你所需要的只是一个工人队列,每个工人都有一个等待句柄,在操作完成后会收到信号。

public class Pipeline : IDisposable
{
    private readonly IEnumerable<Stage> stages;

    public Pipeline(IEnumerable<Action> actions)
    {
        if (actions == null)
            throw new ArgumentNullException("actions");
        stages = actions.Select(a => new Stage(a)).ToList();
    }

    public Pipeline(params Action[] actions)
        : this(actions as IEnumerable<Action>)
    {
    }

    public void Dispose()
    {
        foreach (Stage stage in stages)
            stage.Dispose();
    }

    public void Start()
    {
        foreach (Stage currentStage in stages)
            currentStage.Execute();
    }

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        public void Execute()
        {
            readyEvent.WaitOne();
            action();
            readyEvent.Set();
        }
    }
}

这是一个测试程序,您可以使用它来验证操作是否始终以正确的顺序执行,并且一次只能执行一个相同的操作:

class Program
{
    static void Main(string[] args)
    {
        Action firstAction = GetTestAction(1);
        Action secondAction = GetTestAction(2);
        Action thirdAction = GetTestAction(3);
        Pipeline pipeline = new Pipeline(firstAction, secondAction, thirdAction);
        for (int i = 0; i < 10; i++)
        {
            ThreadPool.QueueUserWorkItem(s => pipeline.Start());
        }
    }

    static Action GetTestAction(int index)
    {
        return () =>
        {
            Console.WriteLine("Action started: {0}", index);
            Thread.Sleep(100);
            Console.WriteLine("Action finished: {0}", index);
        };
    }
}

简短、简单、完全线程安全。

如果由于某种原因您需要从链中的特定步骤开始工作,那么您只需为Start 添加重载:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        currentStage.Execute();
}

编辑

基于 cmets,我认为内部 Stage 类的一些小改动应该足以获得你想要的那种行为。我们只需要在“ready”事件之外添加一个“queued”事件。

    class Stage : IDisposable
    {
        private readonly Action action;
        private readonly EventWaitHandle readyEvent;
        private readonly EventWaitHandle queuedEvent;

        public Stage(Action action)
        {
            this.action = action;
            this.readyEvent = new AutoResetEvent(true);
            this.queuedEvent = new AutoResetEvent(true);
        }

        public void Dispose()
        {
            readyEvent.Close();
        }

        private bool CanExecute()
        {
            if (readyEvent.WaitOne(0, true))
                return true;
            if (!queuedEvent.WaitOne(0, true))
                return false;
            readyEvent.WaitOne();
            queuedEvent.Set();
            return true;
        }

        public bool Execute()
        {
            if (!CanExecute())
                return false;
            action();
            readyEvent.Set();
            return true;
        }
    }

如果阶段无法执行(即已经排队),还可以更改管道的 Start 方法以中断:

public void Start(int index)
{
    foreach (Stage currentStage in stages.Skip(index + 1))
        if (!currentStage.Execute())
            break;
}

这里的概念很简单,再说一遍:

  • 阶段首先尝试立即获取就绪状态。如果成功,则开始运行。
  • 如果它无法获取就绪状态(即任务已经在运行),那么它会尝试获取排队状态。
    • 如果它获得排队状态,则等待就绪状态变为可用,然后释放排队状态。
    • 如果也无法获得排队状态,则放弃。

我再次阅读了您的问题和 cmets,我很确定这正是您想要做的,并且在安全性、吞吐量和节流之间进行了最佳权衡。

因为ThreadPool 有时可能需要一段时间才能响应,所以如果您想真正看到“跳过”发生,您应该将测试程序中的延迟提高到1000 而不是100

【讨论】:

  • 关于过度复杂化问题:绝对有可能!我现在正在查看此代码,并喜欢您的设置方式。我看到的一个问题是我希望您的Pipeline 类上的Start 方法立即返回。但是,显然,我可以通过将调用放在ThreadPool.QueueUserWorkItem inside Start 方法本身来实现这一点(或者,正如你所做的那样,将它留给调用者是否需要调用阻止与否)。
  • 实际上,我刚刚发现了另一个问题,我很想知道您会建议如何解决它。 (除此之外,我真的很喜欢你在这里拥有的东西,并且我更喜欢它而不是我的实现,因为它绝对更简单、更干净。)问题是我希望链中的一个方法只重复自己一次 如果它在运行时被多次调用。
  • 例如在这个Pipeline类中,如果我在第一个动作执行时调用ThreadPool.QueueUserWorkItem(s =&gt; pipeline.Start());三次,它会在完成后再执行三次(这很容易导致队列增长到一个大得离谱的大小,取决于方法调用的频率)。我希望它只执行 一个 时间。想法?
  • @Dan:这对我来说似乎是一种随意的要求。为什么一个阶段在忙碌时需要恰好排队一次,而不是零次?后者要容易得多。如果队列真的有可能像这样失控,我可能只会使用Semaphore 限制对管道的访问,这样只有有限数量的线程可以同时使用它。
  • 哈,零确实会容易得多。但事情是这样的:这是用于处理市场数据和运行定价算法。从这个角度考虑时,要求并不是真正的任意。假设定价模型当前正在运行,并且一千个市场报价同时出现。这意味着新数据已经到来,我们需要再次运行模型以根据最新数据生成最新的理论价格。我们不想简单地再次运行模型,因为那样我们的 theo 就会过时。另一方面,没有必要再运行一千次!
猜你喜欢
  • 2022-07-28
  • 2021-11-23
  • 2010-09-24
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2010-11-09
  • 2013-04-10
  • 2011-10-16
相关资源
最近更新 更多