【问题标题】:Monix task handle failure for list of futures期货列表的 Monix 任务处理失败
【发布时间】:2020-02-16 15:01:54
【问题描述】:

我该如何处理异步执行任务的失败? IE。至少打印堆栈跟踪并关闭。下面的代码似乎永远等待输入> 5

val things = Range(1, 40)
  implicit val scheduler = monix.execution.Scheduler.global
  def t(i:Int) = Task.eval {
      Try{
        Thread.sleep(1000)
        val result = i + 1
        if(result > 5){
          throw new Exception("asdf")
        }
        // i.e. write to file, that's why unit is returned
        println(result) // Effect
        "Result"
      }
    }
    val futures = things.map(e=> t(e))
  futures.foreach(_.runToFuture)

编辑

尝试:

futures.foreach(_.runToFuture.onComplete {
    case Success(value) =>
      println(value)
    case Failure(ex) =>
      System.err.println(ex)
      System.exit(1)
  })

不会停止计算。 如何记录堆栈跟踪并取消正在进行的计算并停止?

【问题讨论】:

    标签: scala task future monix


    【解决方案1】:

    更惯用的方法是使用Observable 而不是Task,因为它正在处理数据列表(我假设这是用例,因为它显示在问题中)。

     val obs = Observable
      .fromIterable(Range(1, 40))
      .mapEval(i =>
        if (i + 1 > 5) Task.raiseError(new Exception("Error")) // will stop the stream
        else Task.delay(println(i)) // Or write to file in your case
      )
      .completedL
      .runToFuture
    
    
    obs
      .recover {
        case NonFatal(e) => println("Error")
      }
    

    或者,您也可以使用Either 发出错误信号,这样可以提高类型安全性,因为您需要处理Either 结果。

    val obs = Observable
      .fromIterable(Range(1, 40))
      .mapEval(i =>
        if (i + 1 > 5) Task.pure(Left("Error"))
        else Task.delay(println(i)).map(_ => Right(())) // Or write to file in your case
      )
      .takeWhileInclusive(_.isRight) // will also emit the failing result
      .lastL
      .runToFuture
    
    
    obs.map {
      case Left(err) => println("There's an error")
      case _ => println("Completed successfully")
    }
    

    【讨论】:

      【解决方案2】:

      这个问题有两个部分:

      • 使任务可取消。
      • 当一项任务失败时取消同级。

      使任务可取消

      Monix 有BooleanCancelable,这将允许您在调用cancel 时将isCancelled 的结果设置为true

      cancel 还需要在Thread.sleep 运行时调用Thread.interrupt 将其唤醒。否则sleep 将运行它的过程。但是,这将在您的任务中抛出InterruptedException。这需要处理。

      取消兄弟姐妹

      CompositeCancelable。似乎CompositeCancellable 的用例是从父任务调用cancel。因此,一旦构建了CompositeCancellable(即所有任务都已构建):

      • 必须使每个任务都可以使用此引用,因此失败的任务可以对其调用cancel。 (注意这是一种循环引用,最好避免)
      • 当子任务失败并调用cancel 时,会通知父任务(或代码)。 (这样可以避免循环引用)

      另一种通知同级任务的方法是使用AtomicBoolean 并经常检查它(休眠 10 毫秒而不是 1000 毫秒)。当一个任务失败时,它会设置这个布尔值,以便其他任务可以停止执行。这当然不涉及Cancellable。 (而且这是一种 hack,最好使用 monix 调度程序)

      注意

      Task 中调用Thread.sleep 是个好主意吗?我认为这会阻止另一个任务使用该线程。我认为使用调度器添加延迟并组合这些子任务是最有效地利用线程池的方法。

      【讨论】:

      • 使用睡眠只是一个虚拟的例子。显然你是对的。
      • CompositeCancellable 的方法听起来最好。但是我怎样才能调用取消呢?在`case Failure(ex) =>` 中,我只能得到Exception,而不是Cancelable。您能否使用Or parent task 方法添加一个最小示例?
      • 我已经链接了documentation for CompositeCancellable,第二个例子应该有帮助。基本上,您通过+=Cancellables 添加到CompositeCancellable,然后在复合上调用cancel
      • 我还不明白如何将Task 映射到Cancellable 并保持引用可取消。 gist.github.com/geoHeil/580518df9b3c8b5a978e736af757ac21 更详细地概述了我的方法。
      • Task.cancellable 似乎构建了一个可取消的任务,但坦率地说,我还没有完全理解它。该页面上的示例非常嘈杂。我也想看看一个可行的例子。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-05-12
      • 1970-01-01
      • 2019-12-24
      • 2019-04-29
      • 1970-01-01
      • 2015-05-02
      相关资源
      最近更新 更多