【问题标题】:Scala futures - how to end on completion?Scala 期货 - 如何在完成时结束?
【发布时间】:2021-09-21 23:54:52
【问题描述】:

我从一位前同事那里继承了一些代码,他开始使用期货(在 Scala 中)来处理 Databricks 中的一些数据。

我将它分成在相似时间段内完成的块。但是没有输出,我知道他们没有使用 onSuccess 或 Await 或任何东西。

问题是,代码完成运行(不返回输出)但 Databricks 中的块一直执行到 thread.sleep() 部分。

我是 Scala 和期货的新手,我不确定如何在所有期货完成运行后退出笔记本(我应该在未来块之后使用 dbutils.notebook.exit() 吗?)

代码如下:

import scala.concurrent.{Future, blocking, Await}
import java.util.concurrent.Executors
import scala.concurrent.ExecutionContext
import com.databricks.WorkflowException

val numNotebooksInParallel = 15
  // If you create too many notebooks in parallel the driver may crash when you submit all of the jobs at once. 
  // This code limits the number of parallel notebooks.

implicit val ec = ExecutionContext.fromExecutor(Executors.newFixedThreadPool(numNotebooksInParallel))
  
  val ctx = dbutils.notebook.getContext()
  // The simplest interface we can have but doesn't
  // have protection for submitting to many notebooks in parallel at once

println("starting parallel jobs... hang tight")

Future {
  
        process("pro","bseg")

        process("prc","bkpf")

        process("prc","bseg")
        
        process("pr4","bkpf")

        process("pr4","bseg")

        println("done with future1")

      }
Future {
        
        process("pr5","bkpf")
        
        process("pr5","bseg")
        
        process("pri","bkpf") 
        
        process("pri","bseg")
        
        process("pr9","bkpf")

        println("done with future2")
  
      }
Future { 
        
        process("pr9","bseg")
        
        process("prl","bkpf") 
        
        process("prl","bseg")
        
        process("pro","bkpf")

        println("done with future3")

      }
  println("finished futures - yay! :)")

  Thread.sleep(5*60*60*1000)
  println("thread timed out after 5 hrs... hope it all finished.")

【问题讨论】:

    标签: scala future databricks


    【解决方案1】:

    通常会将期货保存为值:

    val futs = Seq(
      Future {
        process("pro","bseg")
        // and so on
      },
      // then the other futures
    )
    

    然后对期货进行操作:

    import scala.concurrent.Await
    import scala.concurrent.duration._
    
    Await.result(Future.sequence(futs), 5.hours)
    

    Future.sequence 将在第一个失败或全部成功后停止。如果您希望它们全部运行,即使其中一个失败,您可以执行类似的操作

    Await.result(
      futs.foldLeft(Future.unit) { (_, f) =>
        f.recover {
          case _ => ()
        }
      },
      5.hours
    )
    

    【讨论】:

    • 谢谢,我明天试试看效果如何!
    • 看起来这正是我所需要的 - 谢谢
    猜你喜欢
    • 2023-03-03
    • 2015-05-17
    • 1970-01-01
    • 2020-08-21
    • 2023-03-16
    • 2018-08-22
    • 2020-01-20
    • 2013-06-29
    • 2020-08-18
    相关资源
    最近更新 更多