【问题标题】:Forcing certain code to always run on the same thread强制某些代码始终在同一个线程上运行
【发布时间】:2020-08-15 05:50:31
【问题描述】:

我们有一个旧的第 3 方系统(我们称之为 Junksoft® 95),我们通过 PowerShell 进行交互(它公开了一个 COM 对象),我正在将它包装在一个 REST API(ASP.NET 框架4.8 和 WebAPI 2)。我使用System.Management.Automation nuget 包创建了一个PowerShell,在其中我将Junksoft 的COM API 实例化为dynamic 对象,然后我使用它:

//I'm omitting some exception handling and maintenance code for brevity
powerShell = System.Management.Automation.PowerShell.Create();
powerShell.AddScript("Add-Type -Path C:\Path\To\Junksoft\Scripting.dll");
powerShell.AddScript("New-Object Com.Junksoft.Scripting.ScriptingObject");
dynamic junksoftAPI = powerShell.Invoke()[0];

//Now we issue commands to junksoftAPI like this:
junksoftAPI.Login(user,pass);
int age = junksoftAPI.GetAgeByCustomerId(custId);
List<string> names = junksoftAPI.GetNames();

当我在同一个线程上运行所有这些时(例如,在控制台应用程序中),这工作正常。但是,由于某种原因,当我将junksoftAPI 放入System.Web.Caching.Cache 并从我的网络应用程序中的不同控制器中使用它时,这通常不起作用。我说 ususally 因为当 ASP.NET 碰巧将传入调用传递给创建 junksoftAPI 的线程时,这实际上是有效的。如果没有,Junksoft 95 会给我一个错误。

我有什么办法可以确保与junksoftAPI 的所有交互都发生在同一个线程上?

注意,我不想将整个 Web 应用程序变成单线程应用程序!控制器和其他地方的逻辑应该在不同的线程上正常发生。应该只是 Junksoft 特定线程上发生的 Junksoft 交互,如下所示:

[HttpGet]
public IHttpActionResult GetAge(...)
{
    //finding customer ID in database...

    ...

    int custAge = await Task.Run(() => {
        //this should happen on the Junksoft-specific thread and not the next available thread
        var cache = new System.Web.Caching.Cache();
        var junksoftAPI = cache.Get(...); //This has previously been added to cache on the Junksoft-specific thread
        return junksoftAPI.GetAgeByCustomerId(custId);
    });

    //prepare a response using custAge...
}

【问题讨论】:

  • 这种行为听起来很像我们为 GUI 类强制执行的规则。也许他们的解决方案也有效? stackoverflow.com/a/14703806/3346583
  • 对于 COM,您可能需要抽水。因此,您需要一个单独的 STA 线程来创建它并编组对其的所有调用。
  • 你说的是junksoft,但这对于System.Printing.PrintQueue之类的东西也很有用

标签: c# asp.net multithreading powershell asp.net-web-api2


【解决方案1】:

您可以创建自己的单例工作线程来实现这一点。这是您可以将其插入 Web 应用程序的代码。

public class JunkSoftRunner
{
    private static JunkSoftRunner _instance;

    //singleton pattern to restrict all the actions to be executed on a single thread only.
    public static JunkSoftRunner Instance => _instance ?? (_instance = new JunkSoftRunner());

    private readonly SemaphoreSlim _semaphore;
    private readonly AutoResetEvent _newTaskRunSignal;

    private TaskCompletionSource<object> _taskCompletionSource;
    private Func<object> _func;

    private JunkSoftRunner()
    {
        _semaphore = new SemaphoreSlim(1, 1);
        _newTaskRunSignal = new AutoResetEvent(false);
        var contextThread = new Thread(ThreadLooper)
        {
            Priority = ThreadPriority.Highest
        };
        contextThread.Start();
    }

    private void ThreadLooper()
    {
        while (true)
        {
            //wait till the next task signal is received.
            _newTaskRunSignal.WaitOne();

            //next task execution signal is received.
            try
            {
                //try execute the task and get the result
                var result = _func.Invoke();

                //task executed successfully, set the result
                _taskCompletionSource.SetResult(result);
            }
            catch (Exception ex)
            {
                //task execution threw an exception, set the exception and continue with the looper
                _taskCompletionSource.SetException(ex);
            }

        }
    }

    public async Task<TResult> Run<TResult>(Func<TResult> func, CancellationToken cancellationToken = default(CancellationToken))
    {
        //allows only one thread to run at a time.
        await _semaphore.WaitAsync(cancellationToken);

        //thread has acquired the semaphore and entered
        try
        {
            //create new task completion source to wait for func to get executed on the context thread
            _taskCompletionSource = new TaskCompletionSource<object>();

            //set the function to be executed by the context thread
            _func = () => func();

            //signal the waiting context thread that it is time to execute the task
            _newTaskRunSignal.Set();

            //wait and return the result till the task execution is finished on the context/looper thread.
            return (TResult)await _taskCompletionSource.Task;
        }
        finally
        {
            //release the semaphore to allow other threads to acquire it.
            _semaphore.Release();
        }
    }
}

用于测试的控制台主要方法:

public class Program
{
    //testing the junk soft runner
    public static void Main()
    {
        //get the singleton instance
        var softRunner = JunkSoftRunner.Instance;

        //simulate web request on different threads
        for (var i = 0; i < 10; i++)
        {
            var taskIndex = i;
            //launch a web request on a new thread.
            Task.Run(async () =>
            {
                Console.WriteLine($"Task{taskIndex} (ThreadID:'{Thread.CurrentThread.ManagedThreadId})' Launched");
                return await softRunner.Run(() =>
                {
                    Console.WriteLine($"->Task{taskIndex} Completed On '{Thread.CurrentThread.ManagedThreadId}' thread.");
                    return taskIndex;
                });
            });
        }
    }   
}

输出:

请注意,尽管该函数是从不同的线程启动的,但某些代码部分始终总是在 ID 为“5”的同一上下文线程上执行。

但请注意,尽管所有 Web 请求都在独立线程上执行,但它们最终会等待一些任务在单例工作线程上执行。这最终会在您的 Web 应用程序中造成瓶颈。无论如何,这是您的设计限制。

【讨论】:

  • 是否有理由将contextThread 的线程优先级设置为最高?
  • @jbb 你可以设置任何你喜欢的优先级。我将它设置为最高,因为线程负责为所有其他线程执行部分代码,因此其他线程不必等待
  • 通过将 _semaphore.WaitAsync 移动到 try 块中,如果取消令牌,您可能会在未获取信号量的情况下释放信号量。这会导致不必要的并行。
  • @TheodorZoulias 不错的收获!我很困惑!可能这就是我第一次把它放在 try 块之外的原因
  • 很好的解决方案@zafar!我实现它略有不同,并删除了对异步调用的支持(我希望 WebAPI 调用始终以阻塞方式调用它)。
【解决方案2】:

您可以使用BlockingCollection 类从专用 STA 线程向 Junksoft API 发出命令:

public class JunksoftSTA : IDisposable
{
    private readonly BlockingCollection<Action<Lazy<dynamic>>> _pump;
    private readonly Thread _thread;

    public JunksoftSTA()
    {
        _pump = new BlockingCollection<Action<Lazy<dynamic>>>();
        _thread = new Thread(() =>
        {
            var lazyApi = new Lazy<dynamic>(() =>
            {
                var powerShell = System.Management.Automation.PowerShell.Create();
                powerShell.AddScript("Add-Type -Path C:\Path\To\Junksoft.dll");
                powerShell.AddScript("New-Object Com.Junksoft.ScriptingObject");
                dynamic junksoftAPI = powerShell.Invoke()[0];
                return junksoftAPI;
            });
            foreach (var action in _pump.GetConsumingEnumerable())
            {
                action(lazyApi);
            }
        });
        _thread.SetApartmentState(ApartmentState.STA);
        _thread.IsBackground = true;
        _thread.Start();
    }

    public Task<T> CallAsync<T>(Func<dynamic, T> function)
    {
        var tcs = new TaskCompletionSource<T>(
            TaskCreationOptions.RunContinuationsAsynchronously);
        _pump.Add(lazyApi =>
        {
            try
            {
                var result = function(lazyApi.Value);
                tcs.SetResult(result);
            }
            catch (Exception ex)
            {
                tcs.SetException(ex);
            }
        });
        return tcs.Task;
    }

    public Task CallAsync(Action<dynamic> action)
    {
        return CallAsync<object>(api => { action(api); return null; });
    }

    public void Dispose() => _pump.CompleteAdding();

    public void Join() => _thread.Join();
}

使用Lazy 类的目的是为了在动态对象的构造过程中通过将其传播给调用者来显示可能的异常。

...异常被缓存。也就是说,如果工厂方法在线程第一次尝试访问Lazy&lt;T&gt; 对象的Value 属性时抛出异常,那么每次后续尝试都会抛出相同的异常。

使用示例:

// A static field stored somewhere
public static readonly JunksoftSTA JunksoftStatic = new JunksoftSTA();

await JunksoftStatic.CallAsync(api => { api.Login("x", "y"); });
int age = await JunksoftStatic.CallAsync(api => api.GetAgeByCustomerId(custId));

如果您发现单个 STA 线程不足以及时处理所有请求,您可以添加更多 STA 线程,它们都运行相同的代码(private readonly Thread[] _threads; 等)。 BlockingCollection 类是线程安全的,可以被任意数量的线程同时使用。

【讨论】:

  • CallAsync 并不是纯异步的,因为_pump.Add 会同步阻塞调用线程
  • @zafar BlockingCollection.Add 仅在 BlockingCollection 初始化时指定了有限容量时才会阻塞,直到有空间可用于存储提供的项目。在我的示例中,我没有指定有界容量,尽管我可能应该指定,以防止在极端使用场景中缓冲区不受控制地增长。
  • 这个答案和zafar的答案都很好,答案解决了问题!我选择了 zafar 的答案,因为我觉得它更容易理解,但这个解决方案可能更适合你,这取决于你的经验!
  • @jbb 感谢 zafar 提供最容易理解的解决方案! :-)
【解决方案3】:

如果你没有说那是一个 3rd 方工具,我会认为它是一个 GUI 类。出于实际原因,让多个线程写入它们是一个非常糟糕的主意。 .NET 从2.0 onward 强制执行严格的“只有创建线程才能写入”规则。

一般的 WebServers,尤其是 ASP.Net 使用一个相当大的线程池。我们说的是每个核心有 10 到 100 个线程。这意味着很难将任何请求确定到特定线程。你不妨不试。

同样,查看 GUI 类可能是您最好的选择。您基本上可以创建一个线程,其唯一目的是模仿 GUI 的事件队列。普通 Windows 窗体应用程序的主/UI 线程负责创建每个 GUI 类实例。它通过轮询/处理事件队列来保持活动状态。它仅在通过事件队列收到取消命令时结束。调度只是将订单放入该队列,因此我们可以避免跨线程问题。

【讨论】:

    猜你喜欢
    • 2012-05-03
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2023-03-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多