【发布时间】:2012-11-02 23:17:26
【问题描述】:
我有一个简单的(只是一个测试)状态机,它接受以下输入字符串 abc 和 ac。状态机设置如下:
s1 --> 'a' --> s2s2 --> 'b' --> s3s3 --> 'c' --> s4s2 --> s4 (Epsilon transition)
s1是开始状态
s4是接受状态
我想使用 TPL 并行执行s1->s2->s3->s4 和s1->s2->s3->s4(彼此独立)。
如果我传入“abc”作为机器接受的输入,即
> Thread 1 - Consumed: a, from State: 1 to State: 2> Thread 2 - Consumed: b, from State: 2 to State: 3> Thread 3 - Epsilon transition from State: 2 to State: 3> Thread 4 - Consumed: c, from State: 3 to State: 4> Thread 4 - Accepted in state 4
Time taken = 19
Input 'abc' is valid
Press any key to exit
但是,如果我传入“ac”,我会得到:
> Thread 1 - Consumed: a, from State: 1 to State: 2> Thread 2 - Epsilon transition from State: 2 to State: 3> Thread 3 - Consumed: c, from State: 3 to State: 4> Thread 3 - Accepted in state 4> Thread 4 - Consumed: c, from State: 3 to State: 4> Thread 4 - Accepted in state 4
Time taken = 39
Input 'ac' is not valid (Reason: RejectedAmbiguous)
Press any key to exit
由于某种原因,状态机两次接受相同的输入(在状态 4 中接受),这是不可能的,因为并行执行的两条线都接受不同的输入。
由于代码太多,我不会发布所有代码,但我会发布主要部分,以便您了解我做错了什么。
public enum eResult
{
Accepted = 0,
RejectedAmbiguous,
RejectedNoResults,
RejectedNoInitialState
}
public eResult Execute()
{
var startState = States.FirstOrDefault(s => s.Initial);
if (startState == null) return eResult.RejectedNoInitialState;
tasks.Clear();
CancellationTokenSource cts = new CancellationTokenSource();
Task t = new Task(() =>
{
foreach(Transition tr in getTransitions(startState))
{
var tr = trans[n];
var actor = new Actor(tr.FromState, this.input);
Task<Actor> task = Task<Actor>.Factory.StartNew(obj =>
{
return doTransitionFunction(tr, cts).Invoke((Actor)obj);
}, actor, cts.Token);
buildContinuationTask(Transitions[tr], task, cts);
tasks.Add(task);
}
}, cts.Token);
t.RunSynchronously();
try
{
Task.WaitAll(tasks.ToArray());
}
catch (AggregateException ae)
{
foreach (Exception e in ae.Flatten().InnerExceptions)
{
Console.WriteLine(e.Message);
}
}
eResult result = eResult.Accepted;
if (!results.Any()) result = eResult.RejectedNoResults;
else if (results.Where(r => r.State.Accepted).Count() > 1) result = eResult.RejectedAmbiguous;
return result;
}
IEnumerable<Transition> getTransitions(AtomicState state)
{
return Transitions.Keys.Where(k => k.FromState == state);
}
bool isAccept(Actor parcel)
{
return (parcel.State.Accepted && parcel.Cursor.EOF());
}
Func<object, Actor> doTransitionFunction(Transition transition, CancellationTokenSource cts)
{
return new Func<object, Actor>(obj =>
{
var ts = (Actor)obj;
var cur = ts.Cursor.Peek();
if (transition.Epsilon || transition.Input.Invoke() == cur)
{
if (!transition.Epsilon) ts.Cursor.MoveNext();
ts.State = Transitions[transition];
OnTransitioned(this, new TransitionedEventArgs(transition.FromState, ts.State, cur, transition.Epsilon, Task.CurrentId));
if (isAccept(ts))
{
OnAccepted(this, new AcceptedEventArgs(ts.State, Task.CurrentId));
results.Add(ts);
cts.Cancel();
}
}
return ts;
});
}
void buildContinuationTask(AtomicState s, Task<Actor> antecedentTask, CancellationTokenSource cts)
{
var trans = getTransitions(s).ToArray();
for (int n = 0; n < trans.Count(); n++)
{
Transition tr = trans[n];
Task<Actor> continuation = antecedentTask.ContinueWith<Actor>(antecdent =>
{
if (!cts.IsCancellationRequested)
return doTransitionFunction(tr, cts).Invoke((Actor)antecdent.Result.Clone());
else
return (Actor)antecdent.Result.Clone();
}, cts.Token, TaskContinuationOptions.OnlyOnRanToCompletion, TaskScheduler.Current);
buildContinuationTask(Transitions[tr], continuation, cts);
tasks.Add(continuation);
}
}
如果这是不可能的,请纠正我,但我想要发生的是:
第一个并行任务接受abc 作为输入:
s1 是Task<Actor>
s2 是 s1 的延续
s3 是 s2 的延续
s4 是 s3 的延续
第二个并行任务接受ac:
s1 是Task<Actor>
s2 是 s1 的延续
s3 是 s2 的延续(这是 epsilon 移动)
s4 是 s3 的延续
这两个任务都有自己的 Actor 对象副本,该对象将从主要的先行任务传递到后续任务。
我知道我快到了,我只需要解开这最后一个谜。
【问题讨论】:
-
只是好奇,您为什么不使用 TPL DataFlow?它将简化很多。
-
我从未听说过 TPL DataFlow。感谢您引起我的注意。我会阅读它,看看它是否能帮助我解决我的问题。为此 +1。
-
您是否愿意接受 TPL DataFlow 解决方案来满足您的需求,或者您想严格遵守您现在拥有的基于
Task的解决方案? -
我都会接受。如果我可以使用基于
Task的解决方案使其工作,我会接受,但如果我可以使用 TPL DataFlow 来做同样的事情,那也很棒。它会让我很容易看到它是如何使用两种不同的方法工作的。 -
我刚刚阅读了 TPL DataFlow,虽然它看起来非常强大,但我不确定它如何在遇到具有多个转换的状态时跟踪多个 Actor 状态对象?
标签: c# asynchronous task-parallel-library state-machine