【问题标题】:Google Cloud Spanner Merge SQL Equivalent process in Python using Google API'sGoogle Cloud Spanner 使用 Google API 在 Python 中合并 SQL 等效过程
【发布时间】:2020-08-17 19:42:21
【问题描述】:

如何使用 Google API 在 Google Cloud Spanner 中执行如下所述的合并 SQL?

MERGE INTO TABLE2 B 
USING (SELECT COL1, COL2, SUM(TOTAL_CNT) 
 FROM TABLE1 GROUP BY COL1, COL2) A 
   ON (B.COL1=A.COL1 AND B.COL2 = A.COL2)
 WHEN MATCHED THEN
UPDATE SET B.TOTAL_CNT = B.TOTAL_CNT + A.TOTAL_CNT) 
 WHEN NOT MATCHED THEN
INSERT (COL1, COL2, TOTAL_CNT) 
VALUES (A.COL1.A.COL2,A.TOTAL_CNT)

【问题讨论】:

  • 你能把这个改成一个真正的问题吗?让您的问题/答案更像tour?中提到的问题/答案?
  • @Scratte,重新表述了这个问题。让我知道这是否需要进一步完善。
  • 您可以提供的详细信息越多越好。您可以添加一个您想要合并的示例并显示您正在寻找的结果:)

标签: google-cloud-spanner sql-merge


【解决方案1】:

我想说你可以使用类似的 SQL 子句,如 union 和 intersect 来实现你的目标,这个post 详细说明了这个目标。我认为您使用连接的响应中的近似值也很好。

【讨论】:

    【解决方案2】:

    每当您必须执行合并 SQL 时,都需要将其分解为 2 个步骤。 第一步是与 Target Table 进行左连接并获取所需的值,并使用结果集,我们必须执行 Batch Insert_or_update。这将节省大量查找并提高效率。我已将 Batch Insert_or_update 设为多线程,以便您可以触发更多线程并且进程将更快地完成。如果您不需要那么花哨,那么您可以将其作为内联代码。

        '''
        import threading
        import pandas as pd
        import datetime
        import time
        from merge_ins_upd_using_df import merge_ins_upd_using_df 
        from google.cloud import spanner
    
        # Instantiate a client.
        spanner_client = spanner.Client()
    
        # Your Cloud Spanner instance ID.
        instance_id = 'spanner-instance'
    
        # Get a Cloud Spanner instance by ID.
        instance = spanner_client.instance(instance_id)
    
        # Your Cloud Spanner database ID.
        database_id = 'database-id'
    
        max_thread_cnt = 30
        threadLimiter = threading.BoundedSemaphore(max_thread_cnt)
        thread_list = []
        thread_count = 0
        thread_cnt_before = 0
        thread_counter = 0
    
             
        sql_stmt = """ (SELECT A.COL1, A.COL2, SUM(A.TOTAL_CNT +  COALESCE(B.TOTAL_CNT,0)) AS TOTAL_CNT
                          FROM (SELECT COL1, COL2, SUM(TOTAL_CNT) AS TOTAL_CNT 
                          FROM TABLE1 GROUP BY COL1, COL2) A 
                          LEFT JOIN TABLE2 B on (A.COL1 = B.COL1 AND A.COL2 = B.COL2) """
                        
    
        spanner_client = spanner.Client()
        instance = spanner_client.instance(instance_id )
        database = instance.database(database_id)
        with database.snapshot() as snapshot:
             results = snapshot.execute_sql(sql_stmt)
    
        df = pd.DataFrame(results)
        df.columns = ['COL1', 'COL2', 'TOTAL_CNT']
    
        process_cnt = 10  # set this count based on the number of columns/index updates so that it wont go beyond 20,000 mutations limit
        rec_cnt = df.shape[0]
        print('Total Rec Count: ' + str(rec_cnt))
        total_rec_processed = 0 
        from_index = 0 
        to_index = 0
        dest_table = 'TABLE2'
    
        ### Build the threads
    
        while True:
            from_index = to_index
            to_index = to_index + process_cnt
            thread_counter = thread_counter + 1
            if to_index > rec_cnt:
               to_index = rec_cnt
            df1 = df[from_index:to_index]
            thread_count += 1
            t = threading.Thread(target=merge_ins_upd_using_df,args=(instance_id, database_id, df1, thread_counter, dest_table))
            thread_list.append(t)
            total_rec_processed = total_rec_processed + process_cnt
        #    print("Threads Added: " + str(thread_count) + "Proc Count:" + str(total_rec_processed ))
            if total_rec_processed >= rec_cnt:
               break 
    
        begin = datetime.datetime.now()
    
        print("Thread Kick-off has Started : " + str(begin))
    
        print ("Thread Count before :" +  str(threading.active_count()))
    
        thread_cnt_before = threading.active_count()
    
        # Starts threads
        for thread in thread_list:
            while threading.active_count() >= max_thread_cnt:
                  time.sleep(.05)
            thread.start()
            
        print ("Thread Count after :" +  str(threading.active_count()))
    
        print("All Threads have been kicked off : " + str(datetime.datetime.now()))    
    
        if thread_count > 0:
           while threading.active_count() > thread_cnt_before:
                 time.sleep(2)
    
        end = datetime.datetime.now()
        diff = end-begin
        print("Total time for completion in minutes : " + str(diff.total_seconds()/60))
    
    
       #######  function - merge_ins_upd_using_df
     class merge_ins_upd_using_df:
      def __init__(self, cs_instance, cs_database, df, thread_counter, dest_table):
        self.cs_instance = cs_instance
        self.cs_database = cs_database
        self.thread_counter = thread_counter
        self.df = df
        self.dest_table = dest_table
    
        from google.cloud import spanner
        import datetime
    
        begin = datetime.datetime.now()
                   
        spanner_client = spanner.Client()
        instance = spanner_client.instance(cs_instance)
        database = instance.database(cs_database)
    
        with database.batch() as batch:
             batch.insert_or_update(
              table=dest_table, columns=df.columns,
              values=df.values.tolist())
        
        end = datetime.datetime.now()
        diff = end-begin
    ### add logic to handle exceptions
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2020-06-10
      • 2017-07-05
      • 2017-07-07
      • 2019-07-07
      • 2021-06-22
      • 1970-01-01
      • 1970-01-01
      • 2015-08-01
      相关资源
      最近更新 更多