【问题标题】:Triggering Databricks job from Airflow without starting new cluster在不启动新集群的情况下从 Airflow 触发 Databricks 作业
【发布时间】:2019-06-30 21:06:12
【问题描述】:

我正在使用气流来触发数据块上的作业。我有许多运行数据块作业的 DAG,我希望只使用一个集群而不是多个集群,因为据我了解,这将降低这些任务产生的成本。

使用DatabricksSubmitRunOperator有两种方法可以在数据块上运行作业。要么使用正在运行的集群,通过 id 调用它

'existing_cluster_id' : '1234-567890-word123',

或启动一个新集群

'new_cluster': {
    'spark_version': '2.1.0-db3-scala2.11',
    'num_workers': 2
  },

现在我想尽量避免为每个任务启动一个新集群,但是集群在停机期间关闭,因此它不再通过它的 id 可用,我会收到一个错误,所以我的唯一选项view 是一个新的集群。

1) 有没有办法让集群即使在关闭时也可以通过 id 调用?

2) 人们只是让集群保持活力吗?

3) 还是我完全错了,为每个任务启动集群不会产生更多成本?

4) 有什么我完全错过的吗?

【问题讨论】:

  • Step1- 点击集群并在 URL 中找到以下详细信息。第 2 步:从 URL 复制 ClusterName,如下定义。 eastus.azuredatabricks.net? o=WorkSpaceID#/setting/clusters//configuration notebook_task_params = { 'existing_cluster_id': "", 'notebook_task': { 'notebook_path': '/Users/username@domain.com/notebookName', } , }
  • 我不再为这个问题所困扰,但我仍然想知道你会怎么做。似乎缺少步骤,而且我在 AWS 中工作,如果这有什么不同的话,我不确定。
  • 我的情况是,我只是通过了这些步骤并在 azure 中正常工作。未在 AWS 中尝试过,我将在此处检查和更新。

标签: airflow databricks


【解决方案1】:

更新基于 @YannickSSE 的评论回复
我不使用数据块;您能否通过与您可能期望或可能不期望正在运行的集群相同的 id 启动一个新集群,并且在它正在运行的情况下让它成为空操作?也许不是,或者你可能不会问这个。 响应:不,当启动一个新集群时你不能提供一个 id。

您能否编写一个 python 或 bash 运算符来测试集群是否存在? (响应:这将是一个测试作业提交......不是最好的方法。)如果它找到它并成功,下游任务将使用现有集群 ID 触发您的作业,但如果它没有另一个下游任务可以使用 trigger_rule all_failed 执行相同的任务,但使用新集群。然后这两个任务DatabricksSubmitRunOperators 可以有一个下游任务trigger_ruleone_success。 (响应:或者使用分支运算符来确定执行的运算符。

这可能并不理想,因为我认为您的集群 ID 会不时更改,导致您必须跟上。 ......集群是该操作员的databricks钩子连接的一部分,并且可以更新吗?也许您想在需要它的任务中将其指定为 {{ var.value.<identifying>_cluster_id }} 并将其更新为气流变量。 (响应:集群 id 不在挂钩中,因此变量或 DAG 文件在发生更改时必须更新。

【讨论】:

  • 我只有两个选项,在给定 id 的正在运行的集群上运行它或启动一个新集群(我无法指定它的 id)。我可以进行一些分支并启动一个新集群,但我需要测试以查看当集群启动并且另一个 dag 尝试向该集群发送作业时会发生什么。遗憾的是,id 不是钩子 databricks 使用的一部分,但我必须将其作为参数提供给 databricks 运算符。但是我可以创建一个变量并更新这个。如果没有更好的答案,我会这样做。谢谢
  • @YannickSSE 感谢您澄清我的猜测。我在想,要么你需要使用一个变量,要么提供一个 Airflow 可以让操作员检查的服务(就像一个简单的 REST Web 请求,或者像SELECT cluster_id FROM active_clusters WHERE intended_use = 'This_DAG_id'; 这样的数据库查询),如果它返回一个值,将其推送到 xcom 并使用 {{ ti.xcom_pull('taskname_for_getting_which_cluster') }} 作为 cluster_id 模板字符串。不知道如何在没有作业的情况下启动集群,然后找出将其放入 xcom 并在服务中 POST/insert 的 ID。
【解决方案2】:

Databricks 最近似乎添加了一个选项,可以在作业中重用作业集群,在任务之间共享它。

https://databricks.com/blog/2022/02/04/saving-time-and-costs-with-cluster-reuse-in-databricks-jobs.html

到目前为止,每个任务都有自己的集群来容纳 不同类型的工作负载。虽然这种灵活性允许 细粒度的配置,也可以引入时间和成本 并行期间集群启动或未充分利用的开销 任务。

为了保持这种灵活性,但进一步改进 利用率,我们很高兴地宣布集群重用。通过分享工作 多任务集群客户可以减少工作时间 采取,通过消除开销和增加集群来降低成本 使用并行任务。

这似乎在新 API 中也可用。 https://docs.databricks.com/dev-tools/api/latest/jobs.html#operation/JobsCreate

job_clusters 对象数组 (JobCluster)

可以共享和重用的作业集群规范列表 这项工作的任务。不能在共享作业中声明库 簇。您必须在任务设置中声明依赖库。

为了适应您的用例,您可以使用您的作业启动一个新集群,在您的任务之间共享它,它会在结束时自动关闭。

如果我们想让作业无延迟地启动,我仍然不完全理解如何让作业集群始终保持热状态。我也认为不可能在作业之间共享这些集群。

目前,这些信息应该可以提供不错的线索。

【讨论】:

  • 您可以使用实例池来更快地启动作业
  • 实例池,尤其是带有热节点的实例池,可以减少启动时间。在我的例子中,启动延迟从 3 分钟缩短到 1 分钟。不幸的是,我有一个不到 20 秒的理想,但我无法实现。我认为这主要是围绕在热节点上安装库。
  • 是的,如果你需要库,那​​么你可以用它们创建一个 Docker 镜像,并与 databricks 运行时一起预加载到节点
【解决方案3】:

实际上,当您想通过气流执行笔记本时,您必须指定集群的特征。

databricks 会将您的笔记本视为一项新作业,并将其放在您创建的集群上。但是当执行完成后,创建的集群会被自动删除。

要验证这一点:当工作在气流上运行时 ==> 去查看日志 => 它为您提供了一个链接 => 链接将您转发到数据块:您点击查看集群,因此您将看到执行一个新创建的集群,例如 job-1310-run-980

【讨论】:

  • 此外,如果您想删除这些作业集群,您可以执行以下操作: 将令牌存储在 .netrc 文件中并在 curl 中使用它们 创建具有机器、登录名和密码属性的 .netrc 文件:` machine abc-d1e2345f-a6b2.cloud.databricks.com 登录令牌密码 dapi1234567890ab1cde2f3ab456c7d89efa ` ` curl --netrc -X POST dbc-a1b2345c-d6e7.cloud.databricks.com/api/2.0/clusters/delete --data '{ "cluster_id": "1234-567890-frays123" }'`
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 2019-03-16
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多