【问题标题】:How to use PostgreSQL in multi thread python program如何在多线程python程序中使用PostgreSQL
【发布时间】:2015-08-24 12:31:20
【问题描述】:

我在一个多线程 python 程序中使用 psycopg2 (2.6) 连接到 PostgreSQL 数据库。

当程序中的队列大小增加时,选择查询会出现错误“没有要获取的结果”,但将记录插入数据库效果很好。

示例代码:

class Decoders(threading.Thread):
    def __init__(self, queue):
        threading.Thread.__init__(self)
        self.queue = queue

    def run(self):
        self.decode()

    def decode(self):
        queue = self.queue
        db = Database()
        while queue.qsize() > 0:    
            # calling db methods, just an example
            temp = queue.get()
            db.select_records()
            db.insert_record(temp)

和:

Decoders(queue).start()
Decoders(queue).start()

注意:多处理没有这个问题。

编辑:

当我只启动一个线程时,程序没有任何问题。

数据库类:

class Database:
    db = object
    cursor = object

    def __init__(self):
        self.db = connect(host=conf_hostname,
                          database=conf_dbname,
                          user=conf_dbuser,
                          password=conf_dbpass,
                          port=conf_dbport)
        self.db.autocommit = True
        self.cursor = self.db.cursor()

    def select_records(self):
        self.cursor.execute(simple select)
        return self.cursor.fetchall()


    def insert_record(self, temp):
        # insert query

【问题讨论】:

  • Database 类是什么样的?
  • @Kevin,我现在会更新问题
  • 我的精神力量告诉我你正在使用的选择查询(你还没有向我们展示!)没有返回任何行。
  • @Kevin,这正是选择查询 "SELECT * FROM " + conf_dbpref + "interfaces " "WHERE exporters_id = %d " "AND if_index = %d " % (exporter_id, if_index) - 我只是想简化代码
  • 它适用于多进程或单线程!!!

标签: python multithreading psycopg2 python-multithreading


【解决方案1】:

您是否正在为每个线程创建一个连接?如果您有多个线程,则每个线程都需要一个连接(或一个在连接周围具有锁定机制的池),否则您将遇到各种奇怪的问题。

这就是为什么您不会在多处理中遇到问题,因为每个进程都将创建自己的连接。

【讨论】:

  • 我认为我上面的代码为每个线程创建了一个连接。对吗?还是不对?
  • 去掉 Database 类中的这两行,看看是否有帮助:db = objectcursor = object
  • 要注意的另一件事是,由于代码周围没有锁定,因此选择可能在插入完成之前运行。没有更多细节很难说。
  • @MichaelRobellard:OP 代码的结构方式,至少有一个选择 必须 运行,然后才能运行任何插入。无论哪个线程赢得比赛,都会在他们有机会插入任何东西之前进行选择。
【解决方案2】:

我们可以在 psql 中使用单个连接创建多个游标。

请使用以下代码

#!/usr/bin/env python

import psycopg2
from config import config
from psycopg2.pool import ThreadedConnectionPool
from multiprocessing import Process
import time
import threading
from multiprocessing import Queue

data_queque = Queue()  # reader reads data from queue

SELECT_QUERY = 'Select something from some_table limit %s offset %s ';

INSERT_QUERY = "Insert INTO sometable (col1, col2, col3) values "

# writer write data to queue
class PsqlMultiThreadExample(object):
    _select_conn_count = 10;
    _insert_conn_count = 10;
    _insert_conn_pool = None;
    _select_conn_pool = None;

    def __init__(self):
        self = self;

    def postgres_connection(self):
        """ Connect to the PostgreSQL database server """
        conn = None
        try:
            # read connection parameters
            params = config()

            # connect to the PostgreSQL server
            print('Connecting to the PostgreSQL database...')
            conn = psycopg2.connect(**params)

            # create a cursor
            cur = conn.cursor()

            # execute a statement
            print('PostgreSQL database version:')
            cur.execute('SELECT version()')

            # display the PostgreSQL database server version
            db_version = cur.fetchone()
            print(db_version)

            # close the communication with the PostgreSQL
            cur.close()
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
        finally:
            if conn is not None:
                conn.close()
                print('Database connection closed.')

    def check_connection(self):
        """ Checking the postgres database connection"""
        conn = None;
        try:
            conn = PsqlMultiThreadExample._select_conn_pool.getconn()

            # create a cursor
            cur = conn.cursor()

            # execute a statement
            print('PostgreSQL database version:')
            cur.execute('SELECT version()')

            # display the PostgreSQL database server version
            db_version = cur.fetchone()
            print(db_version)
            # close the communication with the PostgreSQL
            cur.close()
        except (Exception, psycopg2.DatabaseError) as error:
            print(error)
        finally:
            if conn is not None:
                conn.close()
                print('Database connection closed.')

    def create_connection_pool(self):
        """ Create the thread safe threaded postgres connection pool"""

        # calculate the max and min connection required
        max_conn = PsqlMultiThreadExample._insert_conn_count + PsqlMultiThreadExample._select_conn_count;
        min_conn = max_conn / 2;
        params = config()

        # creating separate connection for read and write purpose
        PsqlMultiThreadExample._insert_conn_pool = PsqlMultiThreadExample._select_conn_pool \
            = ThreadedConnectionPool(min_conn, max_conn, **params);

    def read_data(self):
        """
        This read thedata from the postgres and shared those records with each
        processor to perform their operation using threads
        Here we calculate the pardition value to help threading to read data from database

        :return:
        """
        pardition_value = 805000 / 10; # Its total record
        # this helps to identify the starting number to get data from db
        start_index = 1
        for pid in range(1, 11):
            # Getting connection from the connection pool
            select_conn = PsqlMultiThreadExample._select_conn_pool.getconn();
            insert_conn = PsqlMultiThreadExample._insert_conn_pool.getconn();
            #setting auto commit true
            insert_conn.autocommit = 1;
            # insert_conn.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)

            #Creating 10 process to perform the operation
            ps = Process(target=self.process_data, args=(data_queque, pid, (start_index - 1) * pardition_value,
                                                               start_index * pardition_value, select_conn, insert_conn))
            start_index = start_index + 1;
            ps.daemon = True;
            ps.start();
            _start = time.time()
            ps.join()
            print "Process %s took %s seconds" % (pid, (time.time() - _start))

    def process_data(self, queue, pid, start_index, end_index, select_conn, insert_conn):
        """
        Here we process the each process into 10 multiple threads to do data process

        :param queue: 
        :param pid:
        :param start_index:
        :param end_index:
        :param select_conn:
        :param insert_conn:
        :return:
        """
        print "\n";
        print(" \n Started processing record from %s to %s" % (start_index, end_index))
        pardition_value = (end_index - start_index) / 10;
        for tid in range(1, 11):
            ins_cur = insert_conn.cursor();
            worker = threading.Thread(target=self.process_thread, args=(
            queue, pid, tid, start_index, (start_index + pardition_value), select_conn.cursor(), ins_cur,
            threading.Lock()))
            start_index = start_index + pardition_value;
            worker.daemon = True;
            worker.start();
            worker.join()

    def process_thread(self, queue, pid, tid, start_index, end_index, sel_cur, ins_cur, lock):
        """
        Thread read data from database and doing the elatic search to get
        experience have the same data

        :param queue:
        :param pid:
        :param tid:
        :param start_index:
        :param end_index:
        :param sel_cur:
        :param ins_cur:
        :param lock:
        :return:
        """
        limit = end_index - start_index;
        sel_cur.execute(SELECT_QUERY,  (limit, start_index,))
        rows = sel_cur.fetchall();


        records.append(ins_cur.mogrify("(%s,%s,%s)", (row[0], row[1], row[2],)));



        self.write_data(records, ins_cur, lock)

    def write_data(self, records, ins_cur, lock):
        """
        Insert the data with experience id

        :param records:
        :param ins_cur:
        :param lock:
        :return:
        """

        lock.acquire()
        if records and records != '':
            ins_cur.execute(INSERT_QUERY + records)
        lock.release()


if __name__ == '__main__':
    _start = time.time()
    cmp_clener = PsqlMultiThreadExample();
    #Craeting database connection pool to help connection shared along process
    cmp_clener.create_connection_pool()
    cmp_clener.read_data();
    print('Total Processing time %s seconds' % (time.time() - _start))

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2010-10-13
    • 1970-01-01
    • 2017-11-17
    • 2016-02-23
    • 2022-06-12
    • 2018-04-28
    • 2010-09-09
    • 2019-05-22
    相关资源
    最近更新 更多