【问题标题】:Is there a way to improve the run time or optimize Python / Pyspark code?有没有办法改善运行时间或优化 Python / Pyspark 代码?
【发布时间】:2021-10-22 02:40:05
【问题描述】:

我致力于从 Azure SQL Server 中提取超过 350 多个表的表计数信息。由于系统元数据表没有定期刷新,所以我不能依赖它。我写了下面的代码来帮助我实现同样的目标-

import pyodbc
from pyspark.sql.types import *
pyodbc.pooling = False

def get_table_count(query ,server, username, password, database):

  conn = pyodbc.connect('DRIVER={ODBC Driver 17 for SQL Server};SERVER='+server+';DATABASE='+database+';UID='+username+';PWD='+ password)
  cursor = conn.cursor()  
  cursor.execute(query)
  row = cursor.fetchone()
  
  columns = StructType([StructField('tableCount', LongType(), True) , StructField('tableName', StringType(), True), StructField('databaseName', StringType(), True)])
  data = [(row[0], row[1], row[2])]  
  df = spark.createDataFrame( data = data,schema = columns)

  cursor.close()
  del cursor
  conn.close()
  
  return df
import pyspark.sql.functions  as F

dbList = [ SQLServerDB1 , SQLServerDB1 ]

SQLServerDB1_query = ""
SQLServerDB2_query = ""

for db in dbList:
  print("Currently loading for "+db+" database")
  serverName = db + "SQLServerName"
  serverUser = db + "SQLServerUser"
  serverPassword = db + "SQLServerPassword"
  serverDB = db + "SQLServerDB"  
  tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']

  for tablename in list(tables):
    if tablename != list(tables)[-1]:
      vars()["%s_query"%db] = f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
    else:
      vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(*) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"
    
    vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName),  eval(serverUser),  eval(serverPassword), eval(serverDB) )

#     exec(f'{db}_DF = get_table_count( vars()["%s_query"%db] , eval(serverName),  eval(serverUser),  eval(serverPassword), eval(serverDB) )')

    
#     print(tablename + " Loaded")

低于错误 -

('42000', "[42000] [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Parse error at line: 3, column: 1: Incorrect syntax near 'union'. (103010) (SQLExecDirectW)")

我尝试打印 SQL 语句,它在 SQL Server DB 中没有任何问题。 请建议我在哪里写错了代码。

【问题讨论】:

  • 您可以使用两个查询来执行此操作。第一个查询检索数据库中所有感兴趣的表名。使用该信息,您可以构建另一个查询,其中包含当前查询的联合(对于每个表),同时还包括表名作为列。如果您搜索“count rows in all tables”,您将看到生成此 SQL 的技术以及一些快捷方式。简而言之,让引擎完成工作。
  • @SMor - 你能详细说明一下吗?我目前正在通过 pyspark 数据框识别表,然后调用一个函数来完成计数部分。只有我有点担心的部分是使用两个 for 循环并使用 Pandas 将表数据框转换为列表。
  • 计算 SQL Server 表中的行数需要扫描一些未过滤的索引。如果没有窄索引可供选择,则将扫描整个表。
  • @DavidBrowne-Microsoft 有没有更好的方法来做到这一点?请注意,我只有两个数据库的读取权限。此外,有时我还会收到 [Microsoft][ODBC Driver 17 for SQL Server][SQL Server]Invalid object name ***table****。当我在桌子上尝试 count(1) 时,它工作得无可挑剔。
  • 通过迭代表名来创建查询字符串,例如 Select table1, count_big(1) from table1 union Select table2 , count_big(1) from table2 然后将数据库作为单个字符串而不是 fecthing 传递一个一个

标签: python sql-server apache-spark pyspark databricks


【解决方案1】:

尝试使用以下代码,它可以工作。谢谢大家的建议!

def get_table_count(query ,server, username, password, database):
  
  jdbc_url = f"jdbc:sqlserver://{server}:1433;databaseName={database}"
  
  df_read = spark.read \
            .format("jdbc") \
            .option("url",jdbc_url) \
            .option("query", query) \
            .option("user", username) \
            .option("password", password) \
            .option("driver", "com.microsoft.sqlserver.jdbc.SQLServerDriver") \
            .load()
  
  df_save = df_read.write.mode('overwrite').parquet('/tmp/' + f"{database}" + '.parquet')
  
  df = spark.read.parquet('/tmp/' + f"{database}" + '.parquet')
    
  return df
import pyspark.sql.functions  as F

dbList = [ SQLServerDB1 , SQLServerDB1 ]

SQLServerDB1_query = ""
SQLServerDB2_query = ""

for db in dbList:
  print("Currently loading for "+db+" database")
  serverName = db + "SQLServerName"
  serverUser = db + "SQLServerUser"
  serverPassword = db + "SQLServerPassword"
  serverDB = db + "SQLServerDB"  
  tables=df.select('target_object').filter(F.col('source') == db).distinct().toPandas()['target_object']

  for tablename in list(tables):
    if tablename != list(tables)[-1]:
      vars()["%s_query"%db] = f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename} \n" + " union \n" + vars()["%s_query"%db]
    else:
      vars()["%s_query"%db] = vars()["%s_query"%db] + f" Select count_big(1) as tableCount, '{tablename}' as tableName, '{db}' as databaseName from " + f"{tablename}"    
    
  print(vars()["%s_query"%db])
    
  vars()["%s_DF"%db] = get_table_count( vars()["%s_query"%db] , eval(serverName),  eval(serverUser),  eval(serverPassword), eval(serverDB) )
  
  vars()["%s_DF"%db].createOrReplaceTempView(f"{db}_tablesCount")
  
  print(f"{db}"+ " Loaded")

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多