【问题标题】:How to properly access dbutils in Scala when using Databricks Connect使用 Databricks Connect 时如何在 Scala 中正确访问 dbutils
【发布时间】:2020-03-15 10:20:29
【问题描述】:

我正在使用 Databricks Connect 从 IntelliJ IDEA (Scala) 本地运行我的 Azure Databricks 群集中的代码。

一切正常。我可以在 IDE 中本地连接、调试和检查。

我创建了一个 Databricks 作业来运行我的自定义应用 JAR,但它失败并出现以下异常:

19/08/17 19:20:26 ERROR Uncaught throwable from user code: java.lang.NoClassDefFoundError: com/databricks/service/DBUtils$
at Main$.<init>(Main.scala:30)
at Main$.<clinit>(Main.scala)

我的 Main.scala 类的第 30 行是

val dbutils: DBUtils.type = com.databricks.service.DBUtils

就像this documentation page上描述的那样

该页面显示了一种访问 DBUtils 的方法,该方法在本地和集群中都有效。但是这个例子只显示了 Python,我使用的是 Scala。

在本地使用 databricks-connect 和在运行 JAR 的 Databricks 作业中访问它的正确方法是什么?

更新

似乎有两种使用 DBUtils 的方法。

1) DbUtils 类描述了here。 引用文档,该库允许您构建和编译项目,但不能运行它。这不允许您在集群上运行本地代码。

2) Databricks Connect 描述了here。这允许您在 Databricks 集群中运行本地 Spark 代码。

问题是这两种方法有不同的设置和包名。似乎没有办法在本地使用 Databricks Connect(这在集群中不可用),但是通过 sbt/maven 添加了使用 DbUtils 类的 jar 应用程序,以便集群可以访问它。

【问题讨论】:

  • 能否分享您在 IDE 中尝试的代码?

标签: scala databricks azure-databricks databricks-connect dbutils


【解决方案1】:

要访问 dbutils.fs 和 dbutils.secrets Databricks 实用程序,您可以使用 DBUtils 模块。

示例: 在 Scala 编程中访问 DBUtils 如下所示:

val dbutils = com.databricks.service.DBUtils
println(dbutils.fs.ls("dbfs:/"))
println(dbutils.secrets.listScopes())

参考:Databricks - Accessing DBUtils.

希望这会有所帮助。

【讨论】:

  • 正如我在回答中提到的那样,这正是我正在做的事情。在 Databricks 集群中部署 Jar 时,这不起作用。
【解决方案2】:

我不知道为什么the docs you mentioned 不起作用。也许您正在使用不同的依赖项?

These docs 有一个示例应用程序,您可以download。这是一个测试非常少的项目,因此它不会创建作业或尝试在集群上运行它们——但这只是一个开始。另外,请注意它使用的是旧的0.0.1 版本的dbutils-api

因此,要解决您当前的问题,请尝试从其他位置导入 dbutils,而不是使用 com.databricks.service.DBUtils

import com.databricks.dbutils_v1.DBUtilsHolder.dbutils

或者,如果您愿意:

import com.databricks.dbutils_v1.{DBUtilsV1, DBUtilsHolder}

type DBUtils = DBUtilsV1
val dbutils: DBUtils = DBUtilsHolder.dbutils

另外,请确保您在 SBT 中具有以下依赖项(如果 0.0.3 不起作用,请尝试使用版本 - 最新的是 0.0.4):

libraryDependencies += "com.databricks" % "dbutils-api_2.11" % "0.0.3"

This question and answer 为我指明了正确的方向。答案包含指向使用dbutils 的工作Github 存储库的链接:waimak。我希望这个 repo 可以帮助您解决有关 Databricks 配置和依赖项的更多问题。

祝你好运!


更新

我明白了,所以我们有两个相似但不相同的 API,并且没有在本地版本和后端版本之间切换的好方法(尽管 Databricks Connect 承诺无论如何它应该可以工作)。请让我提出一个解决方法。

Scala 编写适配器很方便。这是一段代码 sn-p 应该作为一个桥梁——这里定义了 DBUtils 对象,它为 API 的两个版本提供了足够的 API 抽象:com.databricks.service.DBUtils 上的 Databricks Connect 和后端 @ 987654338@ API。我们可以通过加载并随后通过反射使用com.databricks.service.DBUtils 来实现这一点——我们没有硬编码的导入。

package com.example.my.proxy.adapter

import org.apache.hadoop.fs.FileSystem
import org.apache.spark.sql.catalyst.DefinedByConstructorParams

import scala.util.Try

import scala.language.implicitConversions
import scala.language.reflectiveCalls


trait DBUtilsApi {
    type FSUtils
    type FileInfo

    type SecretUtils
    type SecretMetadata
    type SecretScope

    val fs: FSUtils
    val secrets: SecretUtils
}

trait DBUtils extends DBUtilsApi {
    trait FSUtils {
        def dbfs: org.apache.hadoop.fs.FileSystem
        def ls(dir: String): Seq[FileInfo]
        def rm(dir: String, recurse: Boolean = false): Boolean
        def mkdirs(dir: String): Boolean
        def cp(from: String, to: String, recurse: Boolean = false): Boolean
        def mv(from: String, to: String, recurse: Boolean = false): Boolean
        def head(file: String, maxBytes: Int = 65536): String
        def put(file: String, contents: String, overwrite: Boolean = false): Boolean
    }

    case class FileInfo(path: String, name: String, size: Long)

    trait SecretUtils {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    case class SecretMetadata(key: String) extends DefinedByConstructorParams
    case class SecretScope(name: String) extends DefinedByConstructorParams
}

object DBUtils extends DBUtils {

    import Adapters._

    override lazy val (fs, secrets): (FSUtils, SecretUtils) = Try[(FSUtils, SecretUtils)](
        (ReflectiveDBUtils.fs, ReflectiveDBUtils.secrets)    // try to use the Databricks Connect API
    ).getOrElse(
        (BackendDBUtils.fs, BackendDBUtils.secrets)    // if it's not available, use com.databricks.dbutils_v1.DBUtilsHolder
    )

    private object Adapters {
        // The apparent code copying here is for performance -- the ones for `ReflectiveDBUtils` use reflection, while
        // the `BackendDBUtils` call the functions directly.

        implicit class FSUtilsFromBackend(underlying: BackendDBUtils.FSUtils) extends FSUtils {
            override def dbfs: FileSystem = underlying.dbfs
            override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
            override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
            override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
            override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
            override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
            override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
            override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
        }

        implicit class FSUtilsFromReflective(underlying: ReflectiveDBUtils.FSUtils) extends FSUtils {
            override def dbfs: FileSystem = underlying.dbfs
            override def ls(dir: String): Seq[FileInfo] = underlying.ls(dir).map(fi => FileInfo(fi.path, fi.name, fi.size))
            override def rm(dir: String, recurse: Boolean = false): Boolean = underlying.rm(dir, recurse)
            override def mkdirs(dir: String): Boolean = underlying.mkdirs(dir)
            override def cp(from: String, to: String, recurse: Boolean = false): Boolean = underlying.cp(from, to, recurse)
            override def mv(from: String, to: String, recurse: Boolean = false): Boolean = underlying.mv(from, to, recurse)
            override def head(file: String, maxBytes: Int = 65536): String = underlying.head(file, maxBytes)
            override def put(file: String, contents: String, overwrite: Boolean = false): Boolean = underlying.put(file, contents, overwrite)
        }

        implicit class SecretUtilsFromBackend(underlying: BackendDBUtils.SecretUtils) extends SecretUtils {
            override def get(scope: String, key: String): String = underlying.get(scope, key)
            override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
            override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
            override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
        }

        implicit class SecretUtilsFromReflective(underlying: ReflectiveDBUtils.SecretUtils) extends SecretUtils {
            override def get(scope: String, key: String): String = underlying.get(scope, key)
            override def getBytes(scope: String, key: String): Array[Byte] = underlying.getBytes(scope, key)
            override def list(scope: String): Seq[SecretMetadata] = underlying.list(scope).map(sm => SecretMetadata(sm.key))
            override def listScopes(): Seq[SecretScope] = underlying.listScopes().map(ss => SecretScope(ss.name))
        }
    }
}

object BackendDBUtils extends DBUtilsApi {
    import com.databricks.dbutils_v1

    private lazy val dbutils: DBUtils = dbutils_v1.DBUtilsHolder.dbutils
    override lazy val fs: FSUtils = dbutils.fs
    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = dbutils_v1.DBUtilsV1
    type FSUtils = dbutils_v1.DbfsUtils
    type FileInfo = com.databricks.backend.daemon.dbutils.FileInfo

    type SecretUtils = dbutils_v1.SecretUtils
    type SecretMetadata = dbutils_v1.SecretMetadata
    type SecretScope = dbutils_v1.SecretScope
}

object ReflectiveDBUtils extends DBUtilsApi {
    // This throws a ClassNotFoundException when the Databricks Connection API isn't available -- it's much better than
    // the NoClassDefFoundError, which we would get if we had a hard-coded import of com.databricks.service.DBUtils .
    // As we're just using reflection, we're able to recover if it's not found.
    private lazy val dbutils: DBUtils =
        Class.forName("com.databricks.service.DBUtils$").getField("MODULE$").get().asInstanceOf[DBUtils]

    override lazy val fs: FSUtils = dbutils.fs
    override lazy val secrets: SecretUtils = dbutils.secrets

    type DBUtils = AnyRef {
        val fs: FSUtils
        val secrets: SecretUtils
    }

    type FSUtils = AnyRef {
        def dbfs: org.apache.hadoop.fs.FileSystem
        def ls(dir: String): Seq[FileInfo]
        def rm(dir: String, recurse: Boolean): Boolean
        def mkdirs(dir: String): Boolean
        def cp(from: String, to: String, recurse: Boolean): Boolean
        def mv(from: String, to: String, recurse: Boolean): Boolean
        def head(file: String, maxBytes: Int): String
        def put(file: String, contents: String, overwrite: Boolean): Boolean
    }

    type FileInfo = AnyRef {
        val path: String
        val name: String
        val size: Long
    }

    type SecretUtils = AnyRef {
        def get(scope: String, key: String): String
        def getBytes(scope: String, key: String): Array[Byte]
        def list(scope: String): Seq[SecretMetadata]
        def listScopes(): Seq[SecretScope]
    }

    type SecretMetadata = DefinedByConstructorParams { val key: String }

    type SecretScope = DefinedByConstructorParams { val name: String }
}

如果您将Main 中提到的val dbutils: DBUtils.type = com.databricks.service.DBUtils 替换为val dbutils: DBUtils.type = com.example.my.proxy.adapter.DBUtils,则无论是本地还是远程,所有内容都应该可以直接替换。

如果您有一些新的NoClassDefFoundErrors,请尝试将特定的依赖项添加到 JAR 作业中,或者尝试重新排列它们、更改版本或将依赖项标记为已提供。

这个适配器并不漂亮,它使用反射,但我希望它作为一种解决方法应该足够好。祝你好运:)

【讨论】:

  • 请看我的更新。您链接的那个答案没有使用 Databricks Connect。我想在本地运行时使用 Databricks Connect,然后在打包要部署到集群的 jar 时使用 dbutils 类。
  • 我很欣赏你的回答,但似乎有很多 hacky-reflection 代码只是为了让它工作。我不是 Scala 开发人员,但我仍然想知道是否有更好的方法。也许有一种方法可以为这个 DBUtils 设置一个接口,当从 IDE 运行时将使用 Databricks Connect 中的接口,而在实际构建 jar 以进行部署时使用另一个接口。
猜你喜欢
  • 1970-01-01
  • 2019-12-24
  • 2019-01-23
  • 2021-09-26
  • 1970-01-01
  • 1970-01-01
  • 2019-05-25
  • 1970-01-01
  • 2022-07-25
相关资源
最近更新 更多