【问题标题】:Lazy single-threaded worker懒惰的单线程工作者
【发布时间】:2013-11-30 12:16:53
【问题描述】:

我有一个线程以小块读取数据并将其放入ImmutableList<T>。我需要将此列表转储到文件中。最初我在每次更新时都转储列表,但随着时间列表的增长,文件大小现在接近 200Mb,因此写入文件需要花费太多时间来保持同步。 我已经使文件写入异步,我目前拥有的代码是:

public ImmutableList<T> Items { get; private set; }

public void Insert(IEnumerable<T> items)
{
    lock (_syncObj)
    {
        Items = Items.AddRange(items).Sort(_sortOrder);
        QueueSaving();
    }
}

void SavingThread()
{
    for (; ; )
    {
        var snapshot = Items;
        SaveItems(snapshot);

        lock (_syncObj)
        {
            if (snapshot == Items)
                Monitor.Wait(_syncObj);
        }
    }
}

void QueueSaving()
{
    lock (_syncObj)
        Monitor.Pulse(_syncObj);
}

即如果自上次写入以来有很多更新,那么 writer 只会保存最后一个版本。 显然,我现在必须有一个专门用于保存的线程,并且在没有更新时休眠。

避免使用专用编写器线程的最简洁的代码是什么? IE。运行 QueueSaving 直到文件编写器赶上列表更改并完成,然后在下一次更改时启动它?

【问题讨论】:

  • 你每次都写整个文件吗?如果是这样,如果写的时间太长,你可以只写自上次写以来列表中的新项目。
  • 谢谢,我也考虑过使用分片的类似方法。数据被序列化为 json 数组,因此有一个右括号,我不能只是简单地附加记录,因为它会使 json 格式无效。我想让文件写入逻辑尽可能简单,并找到使用异步写入器的解决方案。
  • @KonstantinSpirin,它可以帮助您重新思考您的数据模型和工作流程。目前您的文件是 200MB,但如果增长到 2GB 怎么办?
  • 当然不是这样,写过时的数据几乎是不可避免的。您的 Monitor.Pulse() 在忙于写入时会被完全忽略。也赶不上。您只会巧合地在文件数据和实际列表之间进行匹配。如果你改变列表的速度比它写的快,那么这个值就很低了。
  • 您假设 SavingThread 进入了锁。它没有,SaveItems 正忙于保存以前的列表并且没有锁定。所以 QueueSaving 获取锁没有问题,Pulse 完全没有效果,修改后的列表永远不会被保存。这是一个错误。

标签: .net multithreading task-parallel-library


【解决方案1】:

据我从 cmets 了解到,您只是希望将逻辑转换为异步代码。以下是在没有显式单独线程的情况下如何完成此操作(除了在整个过程中使用Task.Run)。

using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public class Worker
    {
        class Item 
        {
            public string Data { get; set; }
        }

        const int SAVE_AFTER = 2;

        string _fileName;
        List<Item> _items;
        int _savedItemsCount = 0;

        CancellationToken _token;
        Task _processTask;

        Task _pendingSaveTask = null;

        // get next item
        async Task<Item> GetNextItemAsync()
        {
            await Task.Delay(500); // delay for testing
            return new Item { Data = "Item from " + DateTime.Now.ToString() };
        }

        // write
        async Task SaveItemsAsync(Item[] items)
        {
            if (_pendingSaveTask != null)
                await _pendingSaveTask; // await the previous save

            var text = items.Aggregate(String.Empty, (a, b) => a + b.Data + Environment.NewLine);

            using (var writer = new System.IO.StreamWriter(_fileName, append: false))
            {
                await writer.WriteAsync(text);
            }
        }

        // main process
        async Task ProcessAsync()
        {
            while (true)
            {
                _token.ThrowIfCancellationRequested();

                // start getting the next item
                var getNextItemTask = GetNextItemAsync();

                // save the snapshot if needed
                if (_items.Count >= _savedItemsCount + SAVE_AFTER)
                {
                    var snapshot = _items.ToArray();
                    _savedItemsCount = snapshot.Length;
                    _pendingSaveTask = SaveItemsAsync(snapshot);
                }

                // await the next item
                var item = await getNextItemTask;
                _items.Add(item);
            }
        }

        // start
        public void Start(CancellationToken token)
        {
            _token = token;
            _fileName = System.IO.Path.GetTempFileName();
            _items = new List<Item>();

            _processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
        }

        // stop
        public void Stop()
        {
            if (_pendingSaveTask != null)
                _pendingSaveTask.Wait();

            try
            {
                _processTask.Wait(); // wait for the task to complete
            }
            catch (Exception ex)
            {
                // rethrow if anything but OperationCanceledException
                if (!(ex is OperationCanceledException))
                {
                    var aggEx = ex as AggregateException;
                    if (aggEx == null || !(aggEx.InnerException is OperationCanceledException))
                        throw;
                }
            }
        }
    }

    class Program
    {
        public static void Main()
        {
            var cts = new CancellationTokenSource();
            var worker = new Worker();

            Console.WriteLine("Start process");
            worker.Start(cts.Token);

            Thread.Sleep(10000);

            Console.WriteLine("Stop process");
            cts.Cancel();
            worker.Stop();

            Console.WriteLine("Press Enter to exit...");
            Console.ReadLine();
        }
    }
}

请注意,如果新项目 (GetNextItemAsync) 的到达速度快于 SaveItemsAsync 完成保存最后一个快照的速度,则此实现可能会导致待处理的 SaveItemsAsync 调用链不断增长。如果这是一个问题,您可以通过将SaveItemsAsync 任务限制为仅一个待处理实例并使用BlockingCollection 对新快照进行排队来处理它。

[UPDATE] 这是一个稍微改进的版本,如果更新比保存更快,它会消除冗余写入。它不使用BlockingCollection,而是向SaveItemsAsync 添加了一些额外的取消逻辑。这是一个控制台应用程序,请随意尝试看看发生了什么。尝试连续拨打几次_saveTask = SaveItemsAsync(snapshot)

using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;

namespace ConsoleApplication
{
    public class Worker
    {
        class Item 
        {
            public string Data { get; set; }
        }

        const int SAVE_AFTER = 2;

        string _fileName;
        List<Item> _items;
        int _savedItemsCount = 0;

        CancellationToken _token;
        Task _processTask;

        Task _saveTask;
        CancellationTokenSource _saveTaskCts;

        // get next item
        async Task<Item> GetNextItemAsync()
        {
            Console.WriteLine("Enter GetNextItemAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);

            await Task.Delay(500); // delay for testing
            return new Item { Data = "Item from " + DateTime.Now.ToString() };
        }

        // save items
        async Task SaveItemsAsync(Item[] items)
        {
            // avoid multiple pending SaveItemsAsync tasks
            Console.WriteLine("Enter SaveItemsAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);

            var oldSaveTaskCts = _saveTaskCts;
            var oldSaveTask = _saveTask;

            var thisSaveTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_token);

            _saveTaskCts = thisSaveTaskCts;
            _saveTask = null;

            // cancel the previous pending SaveItemsAsync, if any
            if (oldSaveTaskCts != null) 
            {
                oldSaveTaskCts.Cancel();
                if (oldSaveTask != null)
                    await oldSaveTask.WaitObservingCancellationAsync();
            }

            // another SaveItemsAsync call should lead to cancelling this one
            thisSaveTaskCts.Token.ThrowIfCancellationRequested();

            // execute the save logic on a pool thread, 
            // Task.Run automatically unwraps the nested Task<Task>
            await Task.Run(async () => 
            {
                // do the CPU-bound work: create textual representation of data
                var text = items.Aggregate(String.Empty, (agg, item) => agg + item.Data + Environment.NewLine);

                // write asynchronously
                Console.WriteLine("Write, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);

                // StreamWriter doesn't support cancellation, so do it in two stages with MemoryStream
                using (var memoryStream = new MemoryStream())
                {
                    // write to a memory stream first
                    using (var writer = new StreamWriter(
                        memoryStream,
                        encoding: System.Text.Encoding.UTF8,
                        bufferSize: Environment.SystemPageSize,
                        leaveOpen: true))
                    {
                        await writer.WriteAsync(text);
                    }

                    thisSaveTaskCts.Token.ThrowIfCancellationRequested();

                    // copy the memory stream to the file
                    using (var fileStream = new FileStream(_fileName, FileMode.Create, FileAccess.Write))
                    {
                        // copy with possible cancellation
                        memoryStream.Seek(0, SeekOrigin.Begin);
                        await memoryStream.CopyToAsync(fileStream, Environment.SystemPageSize, thisSaveTaskCts.Token);
                    }
                }
            }, thisSaveTaskCts.Token);
        }

        // main process
        async Task ProcessAsync()
        {
            while (true)
            {
                // handle cancellation
                if (_token.IsCancellationRequested)
                {
                    // await the pending save if any, before throwing
                    if (_saveTask != null)
                        await _saveTask.WaitObservingCancellationAsync();
                    _token.ThrowIfCancellationRequested();
                }

                // handle last save errors if any
                if (_saveTask != null && _saveTask.IsFaulted)
                    await _saveTask.WaitObservingCancellationAsync();

                // start getting the next item
                var getNextItemTask = GetNextItemAsync();

                // save the snapshot if needed
                if (_items.Count >= _savedItemsCount + SAVE_AFTER)
                {
                    var snapshot = _items.ToArray();
                    _savedItemsCount = snapshot.Length;
                    _saveTask = SaveItemsAsync(snapshot);
                }

                // await the next item
                var item = await getNextItemTask;
                _items.Add(item);
            }
        }

        // start
        public void Start(CancellationToken token)
        {
            _token = token;
            _fileName = System.IO.Path.GetTempFileName();
            _items = new List<Item>();

            _processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
        }

        // stop
        public void Stop()
        {
            _processTask.WaitObservingCancellation();
        }
    }

    // Main
    class Program
    {
        public static void Main()
        {
            var cts = new CancellationTokenSource();
            var worker = new Worker();

            Console.WriteLine("Start process");
            worker.Start(cts.Token);

            Thread.Sleep(10000);

            Console.WriteLine("Stop process");
            cts.Cancel();
            worker.Stop();

            Console.WriteLine("Press Enter to exit...");
            Console.ReadLine();
        }
    }

    // Useful extensions
    public static class Extras
    {
        // check if exception is OperationCanceledException
        public static bool IsOperationCanceledException(this Exception ex)
        {
            if (ex is OperationCanceledException)
                return true;

            var aggEx = ex as AggregateException;
            return aggEx != null && aggEx.InnerException is OperationCanceledException;
        }

        public static async Task WaitObservingCancellationAsync(this Task task)
        {
            try
            {
                await task; // await the task to complete
            }
            catch (Exception ex)
            {
                // rethrow if anything but OperationCanceledException
                if (!ex.IsOperationCanceledException())
                    throw;
            }
        }

        // a helper to wait for the task to complete and observe exceptions
        public static void WaitObservingCancellation(this Task task)
        {
            try
            {
                task.Wait(); // wait for the task to complete
            }
            catch (Exception ex)
            {
                // rethrow if anything but OperationCanceledException
                if (!ex.IsOperationCanceledException())
                    throw;
            }
        }
    }
}

【讨论】:

  • List.ToArray 需要锁:stackoverflow.com/questions/14946044/…
  • 我们真的需要有一系列待处理的SaveItemsAsync 任务吗?似乎如果更新的速度比保存任务完成的速度快,那么您的代码实际上会尝试从多个线程写入文件,并且充其量只会出现 IO 异常。
  • @KonstantinSpirin,我不熟悉您的应用程序的其余部分,但在此实现中,_items.ToArray() 需要锁如果_items 项目在外部访问Worker 对象,例如在Main.
  • 我还发布了一个更新版本,以处理更新速度快于保存速度的情况。
  • @KonstantinSpirin,现在保存是同步的 - 绝对不是。 SaveItemsAsync 是一个后台异步任务,与 GetNextItemAsync 并行工作。你试过代码了吗?
【解决方案2】:

当列表中未保存项目的计数超过阈值时,启动写入器Task。您可以将该逻辑放入现有锁下的Insert。这样一来,写线程/Task 仅在有工作要做时才存在。

我最近实现了一个类似的东西,我使用Timer 来按某个时间表启动持久性工作。

【讨论】:

  • 保证您在任何时候最多只能运行一个作家的最佳方式是什么?根据您使用的Timer,您可能已经获得了开箱即用的保证,但任务是在线程池上执行的,因此我最终可能会同时运行多个。
  • 是的,我会使用在锁下访问的 bool 实例变量。如果已设置,则任务正在运行,您只需退出。这也可以通过 Interlocked 完成,但在这种低吞吐量的情况下不要聪明。
  • 我的想法差不多,谢谢分享。
【解决方案3】:

这是我最后写的:

public ImmutableList<T> Items { get; private set; }

public void Insert(IEnumerable<T> items)
{
    lock (_syncObj)
    {
        Items = Items.AddRange(items).Sort(_sortOrder);
        StartSaving();
    }
}

Task _activeSavingTask;

void SavingThread()
{
    for (;;)
    {
        var snapshot = Items;
        SaveItems(snapshot);

        lock (_syncObj)
        {
            if (snapshot == Items)
            {
                _activeSavingTask = null;
                return;
            }
        }
    }
}

void StartSaving()
{
    lock (_syncObj)
        if (_activeSavingTask == null)
            _activeSavingTask = Task.Factory
                .StartNew(SavingThread, TaskCreationOptions.LongRunning);
}

【讨论】:

  • 在没有正确锁定的情况下从两个线程访问“Items”属性可能会导致问题
  • 它会导致什么样的问题? Items 是公开只读的,所有写入都在Insert 内部的关键部分完成,根据stackoverflow.com/questions/2192124/… 保证引用类型分配是原子的,所以我自己想不出任何潜在的问题。
  • 读\写都应该受到锁(或显式内存屏障)的保护。否则读取它的线程可能会从 CPU 缓存中获取陈旧值,或者指令可能会被重新排序并导致难以预测的结果
  • 同意。关于该主题的好文章:albahari.com/threading/part4.aspx#%5FNonBlockingSynch,尤其令人印象深刻的是“我们真的需要锁和障碍吗?”部分。
【解决方案4】:
class AsyncSaver<T> where T : class
{
    private readonly object _lock = new object();
    private readonly Func<T, Task> _save;
    private T _item;
    private bool _running;

    public AsyncSaver(Func<T, Task> save)
    {
        _save = save;
    }

    public void Enqueue(T item)
    {
        lock (_lock)
        {
            if (_running)
            {
                _item = item;
            }
            else
            {
                _running = true;
                Save(item);
            }
        }
    }

    private async void Save(T item)
    {
        await _save(item);

        lock (_lock)
        {
            if (_item != null)
            {
                var nextItem = _item;
                _item = null;
                Save(nextItem);
            }
            else
            {
                _running = false;
            }
        }
    }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2015-04-02
    • 2013-02-22
    • 1970-01-01
    • 2022-12-25
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多