【发布时间】:2014-01-19 00:40:29
【问题描述】:
我正在为 .NET 移植 AKKA 框架(现在不要太认真,现在是周末对 Actor 部分的破解)
我在其中的“未来”支持方面遇到了一些问题。 在 Java/Scala Akka 中,Future 将与 Await 调用同步等待。 很像 .NET Task.Wait()
我的目标是为此支持真正的异步等待。 它现在可以工作,但是在我当前的解决方案中,继续在错误的线程上执行。
这是将消息传递给我的一个演员时的结果,其中包含一个未来的等待块。 如您所见,actor 始终在同一个线程上执行,而 await 块在随机线程池线程上执行。
actor thread: 6
await thread 10
actor thread: 6
await thread 12
actor thread: 6
actor thread: 6
await thread 13
...
actor 使用 DataFlow BufferBlock<Message> 获取消息
或者更确切地说,我在缓冲区块上使用 RX 来订阅消息。
它是这样配置的:
var messages = new BufferBlock<Message>()
{
BoundedCapacity = 100,
TaskScheduler = TaskScheduler.Default,
};
messages.AsObservable().Subscribe(this);
到目前为止一切顺利。
但是,当我等待未来的结果时。 像这样:
protected override void OnReceive(IMessage message)
{
....
var result = await Ask(logger, m);
// This is not executed on the same thread as the above code
result.Match()
.With<SomeMessage>(t => {
Console.WriteLine("await thread {0}",
System.Threading.Thread.CurrentThread.GetHashCode());
})
.Default(_ => Console.WriteLine("Unknown message"));
...
我知道这是异步等待的正常行为,但我确实必须确保只有一个线程可以访问我的演员。
我不希望 future 同步运行,我希望像往常一样运行异步,但我希望继续运行在与消息处理器/actor 相同的线程上。
我的未来支持代码如下所示:
public Task<IMessage> Ask(ActorRef actor, IMessage message)
{
TaskCompletionSource<IMessage> result =
new TaskCompletionSource<IMessage>();
var future = Context.ActorOf<FutureActor>(name : Guid.NewGuid().ToString());
// once this object gets a response,
// we set the result for the task completion source
var futureActorRef = new FutureActorRef(result);
future.Tell(new SetRespondTo(), futureActorRef);
actor.Tell(message, future);
return result.Task;
}
有什么想法可以强制继续在启动上述代码的同一线程上运行吗?
【问题讨论】:
-
该线程上的某些东西必须合作。你不能劫持一个线程。该线程必须以某种方式调用您的继续。也许使用自定义 SynchronizationContext。
-
不能以某种方式为活动线程安排继续吗?
-
线程池线程不获取特定的工作项,它们从队列中获取未指定的项。您将无法定位任何特定线程。不过,您可以使用使用您控制的线程的自定义 TaskScheduler。现在这变得很棘手。也许更容易删除结束在同一个线程上的要求?!
-
我想说的是 SetResult 不能保证被阻止。如果您在文档中没有找到该声明,则无法保证并且您不能依赖它。您也无法对此进行测试。
-
我在 MSDN 上发现了这一点:msdn.microsoft.com/en-us/library/… "ExecuteSynchronously 指定应同步执行延续任务。指定此选项后,延续将在导致先前任务的同一线程上运行过渡到它的最终状态。如果在创建延续时前件已经完成,则延续将在创建延续的线程上运行。只有非常短运行的延续应该同步执行“。它是可靠的。
标签: c# async-await akka system.reactive tpl-dataflow