【发布时间】:2021-10-22 02:40:05
【问题描述】:
我致力于从 Azure SQL Server 中提取超过 350 多个表的表计数信息。由于系统元数据表没有定期刷新,所以我不能依赖它。我写了下面的代码来帮助我实现同样的目标-
import pyodbc
from pyspark.sql.types import *
pyodbc.pooling = False
def get_table_count(query ,server, username, password, database):
conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
cursor = conn.cursor()
cursor.execute(query)
row = cursor.fetchone()
columns = StructType([StructField('tableCount', LongType(), True) , StructField('tableName', StringType(), True), StructField('databaseName', StringType(), True)])
data = [(row[0], row[1], row[2])]
df = spark.createDataFrame( data = data,schema = columns)
cursor.close()
del cursor
conn.close()
return df
import pyspark.sql.functions as F
dbList = [ SQLServerDB1 , SQLServerDB1 ]
SQLServerDB1_query = ""
SQLServerDB2_query = ""
for db in dbList:
print("Currently loading for "+db+" database")
serverName = db + "SQLServerName"
serverUser = db + "SQLServerUser"
serverPassword = db + "SQLServerPassword"
serverDB = db + "SQLServerDB"
tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']
for tablename in list(tables):
if tablename != list(tables)[-1]:
vars()["%s_query"%db] = f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
else:
vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )
# exec(f'{db}_DF = get_table_count( vars()["%s_query"%db] , eval(serverName), eval(serverUser), eval(serverPassword), eval(serverDB) )')
# print(tablename + " Loaded")
低于错误 -
('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Parse error at line: 3, column: 1: Incorrect syntax near 'union'. (103010) (SQLExecDirectW)")
我尝试打印 SQL 语句,它在 SQL Server DB 中没有任何问题。 请建议我在哪里写错了代码。
【问题讨论】:
-
您可以使用两个查询来执行此操作。第一个查询检索数据库中所有感兴趣的表名。使用该信息,您可以构建另一个查询,其中包含当前查询的联合(对于每个表),同时还包括表名作为列。如果您搜索“count rows in all tables”,您将看到生成此 SQL 的技术以及一些快捷方式。简而言之,让引擎完成工作。
-
@SMor - 你能详细说明一下吗?我目前正在通过 pyspark 数据框识别表,然后调用一个函数来完成计数部分。只有我有点担心的部分是使用两个 for 循环并使用 Pandas 将表数据框转换为列表。
-
计算 SQL Server 表中的行数需要扫描一些未过滤的索引。如果没有窄索引可供选择,则将扫描整个表。
-
@DavidBrowne-Microsoft 有没有更好的方法来做到这一点?请注意,我只有两个数据库的读取权限。此外,有时我还会收到 [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid object name ***table****。当我在桌子上尝试 count(1) 时,它工作得无可挑剔。
-
通过迭代表名来创建查询字符串,例如 Select table1, count_big(1) from table1 union Select table2 , count_big(1) from table2 然后将数据库作为单个字符串而不是 fecthing 传递一个一个
标签: python sql-server apache-spark pyspark databricks