【问题标题】:Empty results from concurrent psycopg2 postgres select queries来自并发 psycopg2 postgres 选择查询的空结果
【发布时间】:2019-07-31 15:28:28
【问题描述】:

我正在尝试使用自定义 pytorch 数据集的 getitem 方法从 postgres 数据库中检索我的标签和特征数据集。当我尝试使用随机索引进行采样时,我的查询不返回任何结果

我检查了我的查询是否直接在 psql cli 上工作。他们是这样。 我检查了我的数据库连接池是否存在问题。似乎没有。 我已恢复为顺序采样,它仍然可以正常工作,因此随机索引值似乎是查询的问题。

执行查询的 getitem 方法放在下面。这显示了顺序查询和洗牌查询。这两者都通过变量名称清楚地标记。

def __getitem__(self, idx):

        query = """SELECT ls.taxonomic_id, it.tensor
                    FROM genomics.tensors2 AS it
                    INNER JOIN genomics.labeled_sequences AS ls
                    ON ls.accession_number = it.accession_number
                    WHERE (%s) <= it.index 
                    AND CARDINALITY(tensor) = 89
                    LIMIT (%s) OFFSET (%s)"""

        shuffle_query = """BEGIN
                           SELECT ls.taxonomic_id, it.tensor
                           FROM genomics.tensors2 AS it
                           INNER JOIN genomics.labeled_sequences AS ls
                           ON ls.accession_number = it.accession_number
                           WHERE it.index BETWEEN (%s) AND (%s)
                           END"""


        batch_size = 500
        upper_bound = idx + batch_size

        query_data = (idx, batch_size, batch_size)
        shuffle_query_data = (idx, upper_bound)

        result = None
        results = None

        conn = self.conn_pool.getconn() 

        try:
            conn.set_session(readonly=True, autocommit=True)
            cursor = conn.cursor()
            cursor.execute(query, query_data)
            results = cursor.fetchall()
            self.conn_pool.putconn(conn)


            print(idx)
            print(results)        
        except Error as conn_pool_error:
            print('Multithreaded __getitem__ query error')
            print(conn_pool_error)

        label_list = []
        sequence_list = []

        for (i,result) in enumerate(results):
            if result is not None:
                (label, sequence) = self.create_batch_stack_element(result)

                label_list.append(label)
                sequence_list.append(sequence)

        label_stack = torch.stack(label_list).to('cuda')
        sequence_stack = torch.stack(sequence_list).to('cuda')

        return (label_stack, sequence_stack)


    def create_batch_stack_element(self, result):
        if result is not None:

            label = np.array(result[0], dtype=np.int64)
            sequence = np.array(result[1], dtype=np.int64)

            label = torch.from_numpy(label)
            sequence = torch.from_numpy(sequence)

            return (label, sequence)

        else:
            return None

我收到的错误来自我尝试在 for 循环之后堆叠张量列表。这失败了,因为列表是空的。由于列表是根据查询结果在循环中填充的。它指出查询是问题所在。

我想要一些关于我的源代码的帮助来解决这个问题,并可能解释为什么我的带有随机索引的并发查询会失败。

谢谢。任何帮助表示赞赏。

E:我相信我已经找到了问题的根源,它来自 pytorch RandomSampler 源代码。我相信它提供的索引超出了我的数据库键范围。这解释了为什么我没有从查询中得到结果。我将不得不编写自己的采样器类来将此值限制为我的数据集的长度。这是我的疏忽。

E2:随机采样现在可以与自定义采样器类一起使用,但会阻止多线程查询。

E3:我现在已经解决了整个问题。使用多个进程通过自定义随机采样器将数据加载到 GPU。当我有机会并接受它作为关闭线程的答案时,将发布适用的代码。

【问题讨论】:

    标签: python postgresql concurrency pytorch psycopg2


    【解决方案1】:

    这是从带有可索引键的 postgres 表中正确构造的 pytorch getitem。

    def __getitem__(self, idx: int) -> tuple:
    
        query = """SELECT ls.taxonomic_id, it.tensor
                   FROM genomics.tensors2 AS it
                   INNER JOIN genomics.labeled_sequences AS ls
                   ON ls.accession_number = it.accession_number
                   WHERE (%s) = it.index"""
    
        query_data = (idx,)
    
        result = None
    
        conn = self.conn_pool.getconn()
    
        try:
            conn.set_session(readonly=True, autocommit=True)
            cursor = conn.cursor()
            cursor.execute(query, query_data)
            result = cursor.fetchone()
    
            self.conn_pool.putconn(conn)
        except Error as conn_pool_error:
            print('Multithreaded __getitem__ query error')
            print(conn_pool_error)
    
        return result
    
    def collate(self, results: list) -> tuple:
    
        label_list = []
        sequence_list = []
    
        for result in results:
            if result is not None:
                print(result)
                result = self.create_batch_stack_element(result)
    
                if result is not None:
                    label_list.append(result[0])
                    sequence_list.append(result[1])
    
        label_stack = torch.stack(label_list)
        sequence_stack = torch.stack(sequence_list)
    
        return (label_stack, sequence_stack)
    
    
    
    def create_batch_stack_element(self, result: tuple) -> tuple:
    
        if result is not None:
    
            label = np.array(result[0], dtype=np.int64)
            sequence = np.array(result[1], dtype=np.int64)
    
            label = torch.from_numpy(label)
            sequence = torch.from_numpy(sequence)
    
            return (label, sequence)
    
        return None
    

    然后我调用了我的训练函数:

    for rank in range(num_processes):
        p = mp.Process(target=train, args=(dataloader,))
        p.start()
        processes.append(p)
    for p in processes:
        p.join() 
    

    【讨论】:

      猜你喜欢
      • 2018-07-24
      • 2016-09-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2014-02-04
      相关资源
      最近更新 更多