据我从 cmets 了解到,您只是希望将逻辑转换为异步代码。以下是在没有显式单独线程的情况下如何完成此操作(除了在整个过程中使用Task.Run)。
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
public class Worker
{
class Item
{
public string Data { get; set; }
}
const int SAVE_AFTER = 2;
string _fileName;
List<Item> _items;
int _savedItemsCount = 0;
CancellationToken _token;
Task _processTask;
Task _pendingSaveTask = null;
// get next item
async Task<Item> GetNextItemAsync()
{
await Task.Delay(500); // delay for testing
return new Item { Data = "Item from " + DateTime.Now.ToString() };
}
// write
async Task SaveItemsAsync(Item[] items)
{
if (_pendingSaveTask != null)
await _pendingSaveTask; // await the previous save
var text = items.Aggregate(String.Empty, (a, b) => a + b.Data + Environment.NewLine);
using (var writer = new System.IO.StreamWriter(_fileName, append: false))
{
await writer.WriteAsync(text);
}
}
// main process
async Task ProcessAsync()
{
while (true)
{
_token.ThrowIfCancellationRequested();
// start getting the next item
var getNextItemTask = GetNextItemAsync();
// save the snapshot if needed
if (_items.Count >= _savedItemsCount + SAVE_AFTER)
{
var snapshot = _items.ToArray();
_savedItemsCount = snapshot.Length;
_pendingSaveTask = SaveItemsAsync(snapshot);
}
// await the next item
var item = await getNextItemTask;
_items.Add(item);
}
}
// start
public void Start(CancellationToken token)
{
_token = token;
_fileName = System.IO.Path.GetTempFileName();
_items = new List<Item>();
_processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
}
// stop
public void Stop()
{
if (_pendingSaveTask != null)
_pendingSaveTask.Wait();
try
{
_processTask.Wait(); // wait for the task to complete
}
catch (Exception ex)
{
// rethrow if anything but OperationCanceledException
if (!(ex is OperationCanceledException))
{
var aggEx = ex as AggregateException;
if (aggEx == null || !(aggEx.InnerException is OperationCanceledException))
throw;
}
}
}
}
class Program
{
public static void Main()
{
var cts = new CancellationTokenSource();
var worker = new Worker();
Console.WriteLine("Start process");
worker.Start(cts.Token);
Thread.Sleep(10000);
Console.WriteLine("Stop process");
cts.Cancel();
worker.Stop();
Console.WriteLine("Press Enter to exit...");
Console.ReadLine();
}
}
}
请注意,如果新项目 (GetNextItemAsync) 的到达速度快于 SaveItemsAsync 完成保存最后一个快照的速度,则此实现可能会导致待处理的 SaveItemsAsync 调用链不断增长。如果这是一个问题,您可以通过将SaveItemsAsync 任务限制为仅一个待处理实例并使用BlockingCollection 对新快照进行排队来处理它。
[UPDATE] 这是一个稍微改进的版本,如果更新比保存更快,它会消除冗余写入。它不使用BlockingCollection,而是向SaveItemsAsync 添加了一些额外的取消逻辑。这是一个控制台应用程序,请随意尝试看看发生了什么。尝试连续拨打几次_saveTask = SaveItemsAsync(snapshot)。
using System;
using System.Collections.Generic;
using System.IO;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
namespace ConsoleApplication
{
public class Worker
{
class Item
{
public string Data { get; set; }
}
const int SAVE_AFTER = 2;
string _fileName;
List<Item> _items;
int _savedItemsCount = 0;
CancellationToken _token;
Task _processTask;
Task _saveTask;
CancellationTokenSource _saveTaskCts;
// get next item
async Task<Item> GetNextItemAsync()
{
Console.WriteLine("Enter GetNextItemAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
await Task.Delay(500); // delay for testing
return new Item { Data = "Item from " + DateTime.Now.ToString() };
}
// save items
async Task SaveItemsAsync(Item[] items)
{
// avoid multiple pending SaveItemsAsync tasks
Console.WriteLine("Enter SaveItemsAsync, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
var oldSaveTaskCts = _saveTaskCts;
var oldSaveTask = _saveTask;
var thisSaveTaskCts = CancellationTokenSource.CreateLinkedTokenSource(_token);
_saveTaskCts = thisSaveTaskCts;
_saveTask = null;
// cancel the previous pending SaveItemsAsync, if any
if (oldSaveTaskCts != null)
{
oldSaveTaskCts.Cancel();
if (oldSaveTask != null)
await oldSaveTask.WaitObservingCancellationAsync();
}
// another SaveItemsAsync call should lead to cancelling this one
thisSaveTaskCts.Token.ThrowIfCancellationRequested();
// execute the save logic on a pool thread,
// Task.Run automatically unwraps the nested Task<Task>
await Task.Run(async () =>
{
// do the CPU-bound work: create textual representation of data
var text = items.Aggregate(String.Empty, (agg, item) => agg + item.Data + Environment.NewLine);
// write asynchronously
Console.WriteLine("Write, thread: " + System.Threading.Thread.CurrentThread.ManagedThreadId);
// StreamWriter doesn't support cancellation, so do it in two stages with MemoryStream
using (var memoryStream = new MemoryStream())
{
// write to a memory stream first
using (var writer = new StreamWriter(
memoryStream,
encoding: System.Text.Encoding.UTF8,
bufferSize: Environment.SystemPageSize,
leaveOpen: true))
{
await writer.WriteAsync(text);
}
thisSaveTaskCts.Token.ThrowIfCancellationRequested();
// copy the memory stream to the file
using (var fileStream = new FileStream(_fileName, FileMode.Create, FileAccess.Write))
{
// copy with possible cancellation
memoryStream.Seek(0, SeekOrigin.Begin);
await memoryStream.CopyToAsync(fileStream, Environment.SystemPageSize, thisSaveTaskCts.Token);
}
}
}, thisSaveTaskCts.Token);
}
// main process
async Task ProcessAsync()
{
while (true)
{
// handle cancellation
if (_token.IsCancellationRequested)
{
// await the pending save if any, before throwing
if (_saveTask != null)
await _saveTask.WaitObservingCancellationAsync();
_token.ThrowIfCancellationRequested();
}
// handle last save errors if any
if (_saveTask != null && _saveTask.IsFaulted)
await _saveTask.WaitObservingCancellationAsync();
// start getting the next item
var getNextItemTask = GetNextItemAsync();
// save the snapshot if needed
if (_items.Count >= _savedItemsCount + SAVE_AFTER)
{
var snapshot = _items.ToArray();
_savedItemsCount = snapshot.Length;
_saveTask = SaveItemsAsync(snapshot);
}
// await the next item
var item = await getNextItemTask;
_items.Add(item);
}
}
// start
public void Start(CancellationToken token)
{
_token = token;
_fileName = System.IO.Path.GetTempFileName();
_items = new List<Item>();
_processTask = Task.Run(new Func<Task>(ProcessAsync), _token);
}
// stop
public void Stop()
{
_processTask.WaitObservingCancellation();
}
}
// Main
class Program
{
public static void Main()
{
var cts = new CancellationTokenSource();
var worker = new Worker();
Console.WriteLine("Start process");
worker.Start(cts.Token);
Thread.Sleep(10000);
Console.WriteLine("Stop process");
cts.Cancel();
worker.Stop();
Console.WriteLine("Press Enter to exit...");
Console.ReadLine();
}
}
// Useful extensions
public static class Extras
{
// check if exception is OperationCanceledException
public static bool IsOperationCanceledException(this Exception ex)
{
if (ex is OperationCanceledException)
return true;
var aggEx = ex as AggregateException;
return aggEx != null && aggEx.InnerException is OperationCanceledException;
}
public static async Task WaitObservingCancellationAsync(this Task task)
{
try
{
await task; // await the task to complete
}
catch (Exception ex)
{
// rethrow if anything but OperationCanceledException
if (!ex.IsOperationCanceledException())
throw;
}
}
// a helper to wait for the task to complete and observe exceptions
public static void WaitObservingCancellation(this Task task)
{
try
{
task.Wait(); // wait for the task to complete
}
catch (Exception ex)
{
// rethrow if anything but OperationCanceledException
if (!ex.IsOperationCanceledException())
throw;
}
}
}
}