【问题标题】:Fsharpx Async.AwaitObservable does not call cancellation continuationFsharpx Async.AwaitObservable 不调用取消继续
【发布时间】:2015-01-31 09:40:16
【问题描述】:

我正在尝试在使用 Async.StartWithContinuations 启动的异步工作流中使用 Fsharpx'Async.AwaitObservable。出于某种原因,如果用于启动此工作流的取消令牌在等待可观察对象时被取消(但不是在工作流的其他部分),则永远不会调用取消继续。但是,如果我把它放在use! __ = Async.OnCancel (interruption) 中,那么会调用中断函数。有人可以澄清为什么会发生这种情况,最好的方法是什么,并确保始终调用其中一个延续函数?

open System
open System.Reactive.Linq
open FSharp.Control.Observable
open System.Threading

[<EntryPoint>]
let main _ =
    let cancellationCapability = new CancellationTokenSource()

    let tick = Observable.Interval(TimeSpan.FromSeconds 1.0)
    let test = async {
        let! __ = Async.AwaitObservable tick
        printfn "Got a thing." }

    Async.StartWithContinuations(test,
        (fun () -> printfn "Finished"),
        (fun exn -> printfn "Error!"),
        (fun exn -> printfn "Canceled!"),
        cancellationCapability.Token)

    Thread.Sleep 100
    printfn "Cancelling..."
    cancellationCapability.Cancel()

    Console.ReadLine() |> ignore
    0 // return an integer exit code

【问题讨论】:

  • 在我看来,问题在于 FSharpx 对 AwaitObservable 的定义仅在取消发生之前可观察序列具有下一个值(或错误)时才调用延续函数之一。它还需要使用取消令牌注册一个回调,该回调将调用取消继续并将生成的 CancellationTokenRegistration 释放到序列中的下一个元素上。正在尝试找到一种方法来实现这一点。

标签: asynchronous f# system.reactive f#-async


【解决方案1】:

在我看来,如何实现 AwaitObservable 也是一个问题。祝你好运。

也就是说,您可以在客户端代码上使用的一种解决方法是将 AwaitObservable 包装在任务中:

async {
    let! ct = Async.CancellationToken
    let! __ = 
        Async.StartAsTask(Async.AwaitObservable tick, cancellationToken = ct)
        |> Async.AwaitTask
    printfn "Got a thing." 
}

不理想,但有效。

【讨论】:

  • 谢谢:现在可以解决问题了!我可以将它隐藏在某个实用程序库中,直到知道他们在做什么的人可以找到适当的修复,因为我在这里有点超出我的深度。
【解决方案2】:

似乎 GitHub 上的 Fsharpx 版本已经包含一个修复(不是我实现的)。但是,NuGet (1.8.41) 上的当前版本尚未更新以包含此修复程序。查看更改here

编辑 1: GitHub 上的代码在带有重播语义的 Observables 方面也存在一些问题。我现在已经解决了这个问题,但希望有一个更清洁的解决方案。在考虑有没有办法让它变得更简单之后,我会提交 PR。

/// Creates an asynchronous workflow that will be resumed when the 
/// specified observables produces a value. The workflow will return 
/// the value produced by the observable.
static member AwaitObservable(observable : IObservable<'T1>) =
    let removeObj : IDisposable option ref = ref None
    let removeLock = new obj()
    let setRemover r = 
        lock removeLock (fun () -> removeObj := Some r)
    let remove() =
        lock removeLock (fun () ->
            match !removeObj with
            | Some d -> removeObj := None
                        d.Dispose()
            | None   -> ())
    synchronize (fun f ->
    let workflow =
        Async.FromContinuations((fun (cont,econt,ccont) ->
            let rec finish cont value =
                remove()
                f (fun () -> cont value)
            setRemover <|
                observable.Subscribe
                    ({ new IObserver<_> with
                        member x.OnNext(v) = finish cont v
                        member x.OnError(e) = finish econt e
                        member x.OnCompleted() =
                            let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
                            finish ccont (new System.OperationCanceledException(msg)) })
            () ))
    async {
        let! cToken = Async.CancellationToken
        let token : CancellationToken = cToken
        #if NET40
        use registration = token.Register(fun () -> remove())
        #else
        use registration = token.Register((fun _ -> remove()), null)
        #endif
        return! workflow
    })

    static member AwaitObservable(observable : IObservable<'T1>) =
        let synchronize f = 
            let ctx = System.Threading.SynchronizationContext.Current 
            f (fun g ->
                let nctx = System.Threading.SynchronizationContext.Current 
                if ctx <> null && ctx <> nctx then ctx.Post((fun _ -> g()), null)
                else g() )

        let continued = ref false
        let continuedLock = new obj()
        let removeObj : IDisposable option ref = ref None
        let removeLock = new obj()
        let setRemover r = 
            lock removeLock (fun () ->  removeObj := Some r)
        let remove() =
            lock removeLock (fun () ->
                match !removeObj with
                | Some d -> 
                    removeObj := None
                    d.Dispose()
                | None   -> ())
        synchronize (fun f ->
        let workflow =
            Async.FromContinuations((fun (cont,econt,ccont) ->
                let rec finish cont value =
                    remove()
                    f (fun () -> lock continuedLock (fun () ->
                        if not !continued then
                            cont value
                            continued := true))
                let observer = 
                    observable.Subscribe
                        ({ new IObserver<_> with
                            member __.OnNext(v) = finish cont v
                            member __.OnError(e) = finish econt e
                            member __.OnCompleted() =
                                let msg = "Cancelling the workflow, because the Observable awaited using AwaitObservable has completed."
                                finish ccont (new System.OperationCanceledException(msg)) })
                lock continuedLock (fun () -> if not !continued then setRemover observer else observer.Dispose())
                () ))
        async {
            let! cToken = Async.CancellationToken
            let token : CancellationToken = cToken
            use __ = token.Register((fun _ -> remove()), null)
            return! workflow
        })

编辑 2: 对可观察到的热点问题进行了更整洁的修复...

let AwaitObservable(observable : IObservable<'T>) = async {
    let! token = Async.CancellationToken // capture the current cancellation token
    return! Async.FromContinuations(fun (cont, econt, ccont) ->
        // start a new mailbox processor which will await the result
        Agent.Start((fun (mailbox : Agent<Choice<'T, exn, OperationCanceledException>>) ->
            async {
                // register a callback with the cancellation token which posts a cancellation message
                #if NET40
                use __ = token.Register((fun _ ->
                    mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))))
                #else
                use __ = token.Register((fun _ ->
                    mailbox.Post (Choice3Of3 (new OperationCanceledException("The opeartion was cancelled.")))), null)
                #endif

                // subscribe to the observable: if an error occurs post an error message and post the result otherwise
                use __ = 
                    observable.FirstAsync()
                        .Catch(fun exn -> mailbox.Post(Choice2Of3 exn) ; Observable.Empty())
                        .Subscribe(fun result -> mailbox.Post(Choice1Of3 result))

                // wait for the first of these messages and call the appropriate continuation function
                let! message = mailbox.Receive()
                match message with
                | Choice1Of3 reply -> cont reply
                | Choice2Of3 exn -> econt exn
                | Choice3Of3 exn -> ccont exn })) |> ignore) }

【讨论】:

  • 不错。所以会调用延续,但不是,因为订阅在 nuget 版本中没有正确处理?
  • 不,我相信 NuGet 版本没有使用取消令牌注册回调
猜你喜欢
  • 1970-01-01
  • 2012-02-18
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多