每当您必须执行合并 SQL 时,都需要将其分解为 2 个步骤。
第一步是与 Target Table 进行左连接并获取所需的值,并使用结果集,我们必须执行 Batch Insert_or_update。这将节省大量查找并提高效率。我已将 Batch Insert_or_update 设为多线程,以便您可以触发更多线程并且进程将更快地完成。如果您不需要那么花哨,那么您可以将其作为内联代码。
'''
import threading
import pandas as pd
import datetime
import time
from merge_ins_upd_using_df import merge_ins_upd_using_df
from google.cloud import spanner
# Instantiate a client.
spanner_client = spanner.Client()
# Your Cloud Spanner instance ID.
instance_id = 'spanner-instance'
# Get a Cloud Spanner instance by ID.
instance = spanner_client.instance(instance_id)
# Your Cloud Spanner database ID.
database_id = 'database-id'
max_thread_cnt = 30
threadLimiter = threading.BoundedSemaphore(max_thread_cnt)
thread_list = []
thread_count = 0
thread_cnt_before = 0
thread_counter = 0
sql_stmt = """ (SELECT A.COL1, A.COL2, SUM(A.TOTAL_CNT + COALESCE(B.TOTAL_CNT,0)) AS TOTAL_CNT
FROM (SELECT COL1, COL2, SUM(TOTAL_CNT) AS TOTAL_CNT
FROM TABLE1 GROUP BY COL1, COL2) A
LEFT JOIN TABLE2 B on (A.COL1 = B.COL1 AND A.COL2 = B.COL2) """
spanner_client = spanner.Client()
instance = spanner_client.instance(instance_id )
database = instance.database(database_id)
with database.snapshot() as snapshot:
results = snapshot.execute_sql(sql_stmt)
df = pd.DataFrame(results)
df.columns = ['COL1', 'COL2', 'TOTAL_CNT']
process_cnt = 10 # set this count based on the number of columns/index updates so that it wont go beyond 20,000 mutations limit
rec_cnt = df.shape[0]
print('Total Rec Count: ' + str(rec_cnt))
total_rec_processed = 0
from_index = 0
to_index = 0
dest_table = 'TABLE2'
### Build the threads
while True:
from_index = to_index
to_index = to_index + process_cnt
thread_counter = thread_counter + 1
if to_index > rec_cnt:
to_index = rec_cnt
df1 = df[from_index:to_index]
thread_count += 1
t = threading.Thread(target=merge_ins_upd_using_df,args=(instance_id, database_id, df1, thread_counter, dest_table))
thread_list.append(t)
total_rec_processed = total_rec_processed + process_cnt
# print("Threads Added: " + str(thread_count) + "Proc Count:" + str(total_rec_processed ))
if total_rec_processed >= rec_cnt:
break
begin = datetime.datetime.now()
print("Thread Kick-off has Started : " + str(begin))
print ("Thread Count before :" + str(threading.active_count()))
thread_cnt_before = threading.active_count()
# Starts threads
for thread in thread_list:
while threading.active_count() >= max_thread_cnt:
time.sleep(.05)
thread.start()
print ("Thread Count after :" + str(threading.active_count()))
print("All Threads have been kicked off : " + str(datetime.datetime.now()))
if thread_count > 0:
while threading.active_count() > thread_cnt_before:
time.sleep(2)
end = datetime.datetime.now()
diff = end-begin
print("Total time for completion in minutes : " + str(diff.total_seconds()/60))
####### function - merge_ins_upd_using_df
class merge_ins_upd_using_df:
def __init__(self, cs_instance, cs_database, df, thread_counter, dest_table):
self.cs_instance = cs_instance
self.cs_database = cs_database
self.thread_counter = thread_counter
self.df = df
self.dest_table = dest_table
from google.cloud import spanner
import datetime
begin = datetime.datetime.now()
spanner_client = spanner.Client()
instance = spanner_client.instance(cs_instance)
database = instance.database(cs_database)
with database.batch() as batch:
batch.insert_or_update(
table=dest_table, columns=df.columns,
values=df.values.tolist())
end = datetime.datetime.now()
diff = end-begin
### add logic to handle exceptions