【发布时间】:2023-03-31 21:25:01
【问题描述】:
我有一个处理各种表的 Azure Databricks 集群,然后作为最后一步,我将这些表推送到 Azure SQL Server 以供其他进程使用。我在数据块中有一个单元格,看起来像这样:
def generate_connection():
jdbcUsername = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlUserName")
jdbcPassword = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlPassword")
connectionProperties = {
"user" : jdbcUsername,
"password" : jdbcPassword,
"driver" : "com.microsoft.sqlserver.jdbc.SQLServerDriver"
}
return connectionProperties
def generate_url():
jdbcHostname = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlHostName")
jdbcDatabase = dbutils.secrets.get(scope = "Azure-Key-Vault-Scope", key = "AzureSqlDatabase")
jdbcPort = 1433
return "jdbc:sqlserver://{0}:{1};database={2}".format(jdbcHostname, jdbcPort, jdbcDatabase)
def persist_table(table, sql_table, mode):
jdbcUrl = generate_url();
connectionProperties = generate_connection()
table.write.jdbc(jdbcUrl, sql_table, properties=connectionProperties, mode=mode)
persist_table(spark.table("Sales.OpenOrders"), "Sales.OpenOrders", "overwrite")
persist_table(spark.table("Sales.Orders"), "Sales.Orders", "overwrite")
这按预期工作。我遇到的问题是 Orders 表非常大,每天只有一小部分行可能更改,所以我想要做的是将覆盖模式更改为附加模式并将数据框从整个表只包含可能已更改的行。所有这一切我都知道如何轻松完成,但我想做的是针对 Azure SQL 数据库运行一个简单的 SQL 语句,以删除已经存在的行,以便将它们可能更改的行插入回来.
我想针对 Azure SQL 数据库运行一条 SQL 语句,例如
Delete From Sales.Orders Where CreateDate >= '01/01/2019'
【问题讨论】:
-
您应该能够使用 %sql 魔法在单元格中运行 SQL 语句。
-
不,我不想在集群上运行那个 sql 我想在远程 sql 服务器上运行它
标签: sql-server azure databricks azure-databricks