【发布时间】:2021-09-28 00:00:08
【问题描述】:
我正在使用 TPL 数据流显示视频,同时首先通过 TCP 将数据传递到板。我正在使用 CancellationTokenSource 来取消 Block 活动。但问题是,当我重新运行“CreateVideoProcessingNetwork”函数时,我没有响应。 .Post() 命令返回 false。我应该如何重新创建或“重新运行”TPL 数据流?这是代码:
private void TPL_Click(object sender, EventArgs e)
{
CreateVideoProcessingNetwork();
}
public async void CreateVideoProcessingNetwork()
{
string video_path = @"C:\.....\video_640x360_360p.mp4";
_canceller = new CancellationTokenSource();
/****************** METHOD 1 - with yield *************/
/* Video Loading TPL Block */
//var video_loader = new TransformManyBlock<string, Bitmap>(load_video, new ExecutionDataflowBlockOptions { BoundedCapacity = 10 });
var send_recv_block = new TransformBlock<Bitmap, Bitmap>(async recv_bitmap =>
{
Console.WriteLine("Inside send_recv block");
var mem_stream = new MemoryStream();
recv_bitmap.Save(mem_stream, System.Drawing.Imaging.ImageFormat.Jpeg);
var recv_image_array = mem_stream.ToArray();
NetworkStream stream = client.GetStream();
byte[] transmit_buffer = new byte[4];
transmit_buffer[0] = (byte)(recv_image_array.Length & (0xFF));
transmit_buffer[1] = (byte)((recv_image_array.Length >> 8) & (0xFF));
transmit_buffer[2] = (byte)((recv_image_array.Length >> 16) & (0xFF));
transmit_buffer[3] = (byte)((recv_image_array.Length >> 24) & (0xFF));
// Sending first the 32bit length
await stream.WriteAsync(transmit_buffer, 0, 4);
// Sending data
await stream.WriteAsync(recv_image_array, 0, recv_image_array.Length);
// Receiving data
var recv_buffer = await Receive(stream);
Bitmap tx_image_array;
using (var ms = new MemoryStream(recv_image_array))
{
tx_image_array = new Bitmap(ms);
}
return tx_image_array;
},
new ExecutionDataflowBlockOptions
{
//BoundedCapacity = 10,
CancellationToken = cancellationSource.Token
});
/****************** METHOD 2 - with send async ***********/
var video_loader = new ActionBlock<string>(async path =>
{
Console.WriteLine("video_loader");
capture = new VideoCapture(path);
Mat matrix = new Mat();
capture.Read(matrix);
var mem_stream = new MemoryStream();
while (matrix.Rows != 0 && matrix.Width != 0)
{
Console.WriteLine("Inside Loop");
capture.Read(matrix);
if (matrix.Rows == 0 && matrix.Width == 0) break;
Bitmap bitmap = new Bitmap(matrix.Width, matrix.Rows);
bitmap = matrix.ToBitmap();
await send_recv_block.SendAsync(bitmap);
await Task.Delay(20);
if (_canceller.Token.IsCancellationRequested) break;
}
}, new ExecutionDataflowBlockOptions
{
//BoundedCapacity = 10 ,
CancellationToken = cancellationSource.Token
});
/* Video Loading TPL Block */
var display_video = new ActionBlock<Bitmap>(async received_image =>
{
Console.WriteLine("Inside Display Video");
PicturePlot2.Image = received_image;
await Task.Delay(25);
},
new ExecutionDataflowBlockOptions()
{
TaskScheduler = TaskScheduler.FromCurrentSynchronizationContext(),
//BoundedCapacity = 10,
CancellationToken = cancellationSource.Token
});
var linkOptions = new DataflowLinkOptions { PropagateCompletion = true };
/****************** METHOD 2 - with send async *************/
Console.WriteLine("to Link");
var send_recv_disposable = send_recv_block.LinkTo(display_video, linkOptions);
Console.WriteLine("Video path" + video_path);
//var apotelesma_post = video_loader.Post(video_path);
var apotelesma_post = await video_loader.SendAsync(video_path);
Console.WriteLine("Apotelesma Post "+ apotelesma_post);
video_loader.Complete();
try
{
await display_video.Completion;
}
catch (TaskCanceledException ex)
{
Console.WriteLine(ex.CancellationToken.IsCancellationRequested);
video_loader.Complete();
send_recv_block.Complete();
display_video.Complete();
MessageBox.Show("Video Ended");
}
}
private void Stop_Reset_Click(object sender, EventArgs e)
{
cancellationSource.Cancel();
_canceller.Cancel();
}
提前致谢
【问题讨论】:
-
您使用了多少个
CancellationTokenSources?这些块侦听未指定的cancellationSource,但该方法初始化_canceller。为什么不只使用_canceller?这段代码的编写方式,第二次调用该方法的块将使用已经取消的cancellationSource
标签: task-parallel-library cancellationtokensource