【问题标题】:F# CancellationTokenSource.Cancel() does not cancel the underlying workF# CancellationTokenSource.Cancel() 不取消底层工作
【发布时间】:2020-04-10 11:28:59
【问题描述】:

我有一些可能运行时间很长的函数,有时可能会挂断。所以,我想如果我把它包装到async 工作流中,那么我应该可以取消它。这是一个不起作用的 FSI 示例(但编译后的代码也会发生相同的行为):

open System.Threading

let mutable counter = 0

/// Emulates an external C# sync function that hung up.
/// Please, don't change it to some F# async stuff because
/// that won't fix that C# method.
let run() =
    while true
        do
            printfn "counter = %A" counter
            Thread.Sleep 1000
            counter <- counter + 1


let onRunModel() =
    let c = new CancellationTokenSource()
    let m = async { do run() }
    Async.Start (m, c.Token)
    c


let tryCancel() =
    printfn "Starting..."
    let c = onRunModel()
    printfn "Waiting..."
    Thread.Sleep 5000
    printfn "Cancelling..."
    c.Cancel()
    printfn "Waiting again..."
    Thread.Sleep 5000
    printfn "Completed."


#time
tryCancel()
#time

如果你在 FSI 中运行它,你会看到类似的东西:

Starting...
Waiting...
counter = 0
counter = 1
counter = 2
counter = 3
counter = 4
Cancelling...
Waiting again...
counter = 5
counter = 6
counter = 7
counter = 8
counter = 9
Completed.
Real: 00:00:10.004, CPU: 00:00:00.062, GC gen0: 0, gen1: 0, gen2: 0
counter = 10
counter = 11
counter = 12
counter = 13
counter = 14
counter = 15
counter = 16

这意味着在调用c.Cancel() 之后它根本不会停止。

我做错了什么以及如何使这样的事情起作用?

以下是一些附加信息:

  1. 当代码挂起时,它会在一些外部同步 C# 库中执行此操作, 我无法控制。所以检查取消令牌 我控制的代码没用。这就是为什么函数run() 上面就是这样建模的。
  2. 我不需要任何关于完成和/或进度的沟通。它已经通过一些消息传递系统完成并且超出了范围 的问题。
  3. 基本上,只要我“决定”这样做,我只需要终止后台工作。

【问题讨论】:

  • 杀死是什么意思? CanellationToken 只是一个通知。如果该方法实际上并没有注册自己或检查取消它实际上没有做任何事情。
  • 您正在使用 Tread.Sleep。试试 Async.Sleep。不确定,但我怀疑这会尊重取消。
  • 没错。我正在尝试寻找一种可行的替代方法。例如。 t = new Thread(fun () -&gt; run()) ... 然后调用 t.Abort() 确实会终止线程,但它会在 FSI 中发出一声巨响 - 整个 FSI 会话被终止。
  • Thread.Sleep 是故意的。请参阅问题末尾的评论 #1。
  • 我认为你根本不应该使用线程的东西。 Async 和 Task 不是线程的东西,尽管它们使用线程来完成工作。

标签: asynchronous f#


【解决方案1】:

您正在将控制权交给一个代码段,尽管该代码段包含在 async 块中,但无法检查取消。如果您是直接在async 中构建循环,还是将其替换为递归async 循环,它将按预期工作:

let run0 () =   // does not cancel
    let counter = ref 0
    while true do
        printfn "(0) counter = %A" !counter
        Thread.Sleep 1000
        incr counter
let m = async { run0 () }

let run1 () =   // cancels
    let counter = ref 0
    async{
        while true do
            printfn "(1) counter = %A" !counter
            Thread.Sleep 1000
            incr counter }

let run2 =      // cancels too
    let rec aux counter = async {
        printfn "(2) counter = %A" counter
        Thread.Sleep 1000
        return! aux (counter + 1) }
    aux 0

printfn "Starting..."
let cts = new CancellationTokenSource()
Async.Start(m, cts.Token)
Async.Start(run1(), cts.Token)
Async.Start(run2, cts.Token)
printfn "Waiting..."
Thread.Sleep 5000
printfn "Cancelling..."
cts.Cancel()
printfn "Waiting again..."
Thread.Sleep 5000
printfn "Completed."

但请注意:F# 中的嵌套async 调用会自动检查是否取消,这就是为什么do! Async.Sleep 更可取的原因。如果您要走递归路线,请务必通过return! 启用尾递归。进一步阅读:Scott W. 在 Asynchronous programming 上的博客和 Tomas Petricek 的 Async in C# and F# Asynchronous gotchas in C#

【讨论】:

  • 答案很好,但是...请查看问题中的其他信息#1:当代码挂起时,它会在一些外部同步 C# 库中执行此操作,我没有控制。 .因此,run() 被“设计”为模仿该行为。如果您可以重写您的代码,以便它可以使用原始的run() 同时能够取消计算,那就太好了。
【解决方案2】:

开发这段代码是为了解决我无法获得一些终止/超时调用的情况。他们只会挂起。也许你能得到一些可以帮助你解决问题的想法。

对您来说有趣的部分只是前两个函数。剩下的只是为了演示我是如何使用它们的。

module RobustTcp =

    open System
    open System.Text
    open System.Net.Sockets
    open Railway

    let private asyncSleep (sleepTime: int) (error: 'a) = async {
        do! Async.Sleep sleepTime
        return Some error
    }

    let private asyncWithTimeout asy (timeout: int) (error: 'a) =
        Async.Choice [ asy; asyncSleep timeout error ]

    let private connectTcpClient (host: string) (port: int) (tcpClient: TcpClient) = async {
        let asyncConnect = async {
            do! tcpClient.ConnectAsync(host, port) |> Async.AwaitTask
            return Some tcpClient.Connected }
        match! asyncWithTimeout asyncConnect 1_000 false with
        | Some isConnected -> return Ok isConnected
        | None -> return Error "unexpected logic error in connectTcpClient"
        }

    let private writeTcpClient (outBytes: byte[]) (tcpClient: TcpClient) = async {
        let asyncWrite = async {
            let stream = tcpClient.GetStream()
            do! stream.WriteAsync(outBytes, 0, outBytes.Length) |> Async.AwaitTask
            do! stream.FlushAsync() |> Async.AwaitTask
            return Some (Ok ()) }
        match! asyncWithTimeout asyncWrite 10_000 (Error "timeout writing") with
        | Some isWrite -> return isWrite
        | None -> return Error "unexpected logic error in writeTcpClient"
        }

    let private readTcpClient (tcpClient: TcpClient) = async {
        let asyncRead = async {
            let inBytes: byte[] = Array.zeroCreate 1024
            let stream = tcpClient.GetStream()
            let! byteCount = stream.ReadAsync(inBytes, 0, inBytes.Length) |> Async.AwaitTask
            let bytesToReturn = inBytes.[ 0 .. byteCount - 1 ]
            return Some (Ok bytesToReturn) }
        match! asyncWithTimeout asyncRead 2_000 (Error "timeout reading reply") with
        | Some isRead ->
            match isRead with
            | Ok s -> return Ok s
            | Error error -> return Error error
        | None -> return Error "unexpected logic error in readTcpClient"
        }

    let sendReceiveBytes (host: string) (port: int) (bytesToSend: byte[]) = async {
        try
            use tcpClient = new TcpClient()
            match! connectTcpClient host port tcpClient with
            | Ok isConnected ->
                match isConnected with
                | true ->
                    match! writeTcpClient bytesToSend tcpClient with
                    | Ok () ->
                        let! gotData = readTcpClient tcpClient
                        match gotData with
                        | Ok result -> return Ok result
                        | Error error -> return Error error
                    | Error error -> return Error error
                | false -> return Error "Not connected."
            | Error error -> return Error error
        with
        | :? AggregateException as ex ->
            (* TODO ? *)
            return Error ex.Message
        | ex ->
            (*
            printfn "Exception in getStatus : %s" ex.Message
            *)
            return Error ex.Message
    }

    let sendReceiveText (host: string) (port: int) (textToSend: string) (encoding: Encoding) =
        encoding.GetBytes textToSend
        |> sendReceiveBytes host port
        |> Async.map (Result.map encoding.GetString)

【讨论】:

  • 这里的想法是asyncSleep会超时,然后asyncWithTimeout也会超时,这样即使TCP的东西不会超时,也会有超时。
  • 也许有人会告诉我,TCP 的东西确实有我可以使用的超时。我的问题是,在某些情况下,这些超时不起作用,并且电话无论如何都会挂起。这就是为什么这是我能找到的唯一完全万无一失的方法,以始终保证不会出现任何问题,无论出现什么问题。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2016-10-02
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-02-27
相关资源
最近更新 更多