【问题标题】:Processing intermittent IO jobs with TPL and/or BackgroundWorker使用 TPL 和/或 BackgroundWorker 处理间歇性 IO 作业
【发布时间】:2015-08-07 22:10:38
【问题描述】:

我有一个与一些串行和 USB 设备交互的 C# 应用程序。我正在尝试找到一种以并行方式控制这些设备的好方法,以便一次控制许多设备。我将通过用户输入和脚本命令向设备发起命令。

我目前正在尝试找出一种干净的方式来“收集”并与我的 UI 线程并行运行命令。我有多种表单,每个设备一个,每个都有一些可以发出命令的控件。

我目前看到的方式是,用户将单击一个按钮,该按钮将在表单中触发一个事件。另一个类让我们称之为CommandManager 将挂钩所有这些事件;每个事件都会传递必要的信息以形成发送到设备的命令。

当事件由CommandManager 处理时,它会形成命令并将其添加到名为DeviceCommandWorker 的子类BlockingCollection<Command> 中,该DeviceCommandWorker 在应用程序打开时启动。它所做的只是遍历包含Task.Factory.StartNew() 调用的代码块。

StartNew 块中TakeBlockingCollection 上被调用,等待输入命令。一旦命令出现在集合中,Take 就会返回,Task 会以愉快的方式消失。 BackgroundWorker 循环返回并重复该过程,直到它被取消。

// Event handler running on the UI Thread
public void ProcessCommand(DeviceCommand command)
{
    // I assume I cannot add to Commands (BlockingCollection) from here?
    DeviceCommandWorker.Instance.Commands.Add(command);
}

// ....
// BackgroundWorker started upon Application startup
// ...

public class DeviceCommandWorker : BackgroundWorker
{
    public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
    private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());

    public BlockingCollection<DeviceCommand> Commands { get; set; } 

    private DeviceCommandWorker()
    {
        WorkerSupportsCancellation = true;
        Commands = new BlockingCollection<DeviceCommand>();
    }

    protected override void OnDoWork(DoWorkEventArgs e)
    {
        while (!CancellationPending)
        {
            var commandTask = Task.Factory.StartNew(() =>
            {
                // If the BackGroundWorker is cancelled, how to esacpe this blocking call?
                DeviceCommand command = commandQueue.Take(); 

                DeviceCommandResult result;
                command.Process(out result);

                if(result == DeviceCommandResult.Error)
                    ; // How would I inform the UI of this result?
            });
        }
        e.Cancel = true;
    }
}

我的问题已在上面的代码中说明,但我会重申它们。

我的第一个问题是,我认为我不能从正在运行的BackGroundWorker 之外添加到BlockingCollection? (通过阅读有关 TPL 的信息,添加时不应该有一个我可以锁定的同步对象吗?)

假设我可以添加到集合中,当 Take 方法被阻塞时,有没有办法逃脱它,特别是如果 BackgroundWorker 被取消,它不会永远被阻塞吗? (可能在取消之前我可以发送一个“自定义命令”来简单地创建一个什么都不做的任务,然后我就可以退出 while 循环)

最后,我如何将命令执行的成功或错误报告回 UI 线程?我看到了这个SO answer,这是正确的方向吗?

【问题讨论】:

  • 使用TPL Dataflow
  • 我查看了 TPL 数据流。限制使我无法在任何合理的时间内拿到包裹。但是当我得到它时,我会尝试以这种方式重新设计一个解决方案,并确定我更喜欢哪个。 // 你能评论一下上面的解决方案吗?

标签: c# multithreading task-parallel-library


【解决方案1】:

所以经过一些修改后,看起来我上面的想法最初对我来说效果很好。我不确定当需要做一些真正的工作时它会如何表现,但到目前为止我很高兴。

所以我重新编写了代码。我发现如果没有TakeTask.Factory.StartNew() 之外,我实际上只是在尽可能快地完成任务,每个等待从BlockingCollection 消耗。所以我将语句移到循环之外。我还确认要打破阻塞Take,我需要发送某种特殊命令,以便我可以停止后台工作人员。最后(未显示)我计划使用Control.Invoke 将任何失败返回给 UI 线程。

public Boolean StartCommandWorker()
{
    if (DeviceCommandWorker.Instance.IsBusy)
        return false;
    else
    {
        Console.Out.WriteLine("Called start command worker!");
        DeviceCommandWorker.Instance.RunWorkerAsync();
        return DeviceCommandWorker.Instance.IsBusy;
    }
}

public void StopCommandWorker()
{
    Console.Out.WriteLine("Called stop command worker!");
    ProcessCommand("QUIT");
    DeviceCommandWorker.Instance.CancelAsync();
}

public Boolean ProcessCommand(String command)
{
    DeviceCommandWorker.Instance.Commands.Add(command);
    Console.Out.WriteLine("Enqueued command \"" + command + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);

    return true;
}

internal class DeviceCommandWorker : BackgroundWorker
{
    public static DeviceCommandWorker Instance { get { return lazyInstance.Value; } }
    private static readonly Lazy<DeviceCommandWorker> lazyInstance = new Lazy<DeviceCommandWorker>(() => new DeviceCommandWorker());

    public BlockingCollection<String> Commands { get; set; }

    private DeviceCommandWorker()
    {
        WorkerSupportsCancellation = true;
        Commands = new BlockingCollection<String>();
    }

    protected override void OnDoWork(DoWorkEventArgs e)
    {
        Console.Out.WriteLine("Background Worker Started ThreadID: " + Thread.CurrentThread.ManagedThreadId);

        while (!CancellationPending)
        {
            Console.Out.WriteLine("Waiting for command on ThreadID: " + Thread.CurrentThread.ManagedThreadId);
            String aString = Commands.Take();

            var commandTask = Task.Factory.StartNew(() =>
            {
                Console.Out.WriteLine("   Dequeued command \"" + aString + "\" ThreadID: " + Thread.CurrentThread.ManagedThreadId);
                if (aString.Equals("QUIT"))
                    Console.Out.WriteLine("   Quit worker called: " + aString);
            });
        }

        Console.Out.WriteLine("Background Worker: Stopped!");
        e.Cancel = true;
    }
}

这是一些示例输出。我制作了一个小的 UI 表单,我可以使用它来启动、停止和发送命令。它只是发送一个随机的 double 作为命令。

Called start command worker!
Background Worker Started ThreadID: 17
Waiting for command on ThreadID: 17
Enqueued command "Hello" ThreadID: 10
Waiting for command on ThreadID: 17
   Dequeued command "Hello" ThreadID: 16
Enqueued command "0.0745" ThreadID: 10
Waiting for command on ThreadID: 17
   Dequeued command "0.0745" ThreadID: 12
Enqueued command "0.7043" ThreadID: 10
   Dequeued command "0.7043" ThreadID: 16
Waiting for command on ThreadID: 17
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
   Dequeued command "QUIT" ThreadID: 12
   Quit worker called: QUIT
Background Worker: Stopped!
Enqueued command "0.2646" ThreadID: 10
Enqueued command "0.1619" ThreadID: 10
Enqueued command "0.5570" ThreadID: 10
Enqueued command "0.4129" ThreadID: 10
Called start command worker!
Background Worker Started ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
Waiting for command on ThreadID: 12
   Dequeued command "0.2646" ThreadID: 16
   Dequeued command "0.1619" ThreadID: 16
   Dequeued command "0.5570" ThreadID: 16
   Dequeued command "0.4129" ThreadID: 16
Enqueued command "0.8368" ThreadID: 10
   Dequeued command "0.8368" ThreadID: 17
Waiting for command on ThreadID: 12
Called stop command worker!
Enqueued command "QUIT" ThreadID: 10
   Dequeued command "QUIT" ThreadID: 16
   Quit worker called: QUIT
Background Worker: Stopped!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-09-12
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2012-04-10
    相关资源
    最近更新 更多