【问题标题】:Postgres COPY stream using pg8000 (error : could not determine data type of parameter $1)使用 pg8000 的 Postgres COPY 流(错误:无法确定参数 $1 的数据类型)
【发布时间】:2021-11-28 19:45:59
【问题描述】:

我正在尝试实现一个 COPY 语句以将 pandas 数据帧推送到 Airflow DAG 中的 CloudSQL Postgres 数据库。 我有一个限制:我只能使用 pg8000 驱动程序。 我将此用作参考https://github.com/tlocke/pg8000#copy-from-and-to-a-file(我在此线程中找到https://news.ycombinator.com/item?id=25402430

这是我的代码

    def getconn() -> pg8000.native.Connection:
        conn: pg8000.native.Connection = connector.connect(
            PG_CONFIG["host"],
            "pg8000",
            user=PG_CONFIG["user"],
            password=PG_CONFIG["password"],
            db=PG_CONFIG["database"]
        )
        return conn
    engine = sqlalchemy.create_engine("postgresql+pg8000://",creator=getconn)
    engine.dialect.description_encoding = None

    stream_in = StringIO()
    csv_writer = csv.writer(stream_in)
    csv_writer.writerow([1, "electron"])
    csv_writer.writerow([2, "muon"])
    csv_writer.writerow([3, "tau"])
    stream_in.seek(0)

    conn = engine.connect()
    conn.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")    
    conn.execute("COPY temp_table FROM STDIN WITH (FORMAT CSV)", stream=stream_in)

我已经尝试了所有我能想到的方法(使用 DELEMITER 选项,传递文本而不是 csv...)但我不断收到以下错误“无法确定参数 $1 的数据类型

[SQL: COPY winappsx.aa FROM STDIN WITH (FORMAT CSV)]
[parameters: {'stream': <_io.StringIO object at 0x7f86a58d7dc8>}]
(Background on this error at: http://sqlalche.me/e/13/4xp6)
Traceback (most recent call last):
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/base.py", line 1277, in _execute_context
    cursor, statement, parameters, context
  File "/opt/python3.6/lib/python3.6/site-packages/sqlalchemy/engine/default.py", line 593, in do_execute
    cursor.execute(statement, parameters)
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/dbapi.py", line 454, in execute
    statement, vals=vals, input_oids=self._input_oids, stream=stream
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 632, in execute_unnamed
    self.handle_messages(context)
  File "/opt/python3.6/lib/python3.6/site-packages/pg8000/core.py", line 769, in handle_messages
    raise self.error
pg8000.exceptions.DatabaseError: {'S': 'ERROR', 'V': 'ERROR', 'C': '42P18', 'M': 'could not determine data type of parameter $1', 'F': 'postgres.c', 'L': '1363', 'R': 'exec_parse_message'}

我知道连接有效,因为表已正确创建。错误发生在 COPY 语句中

我怀疑提供流参数的方式存在问题,但找不到正确的语法。这可能会有所帮助https://www.kite.com/python/docs/pg8000.Cursor.execute

感谢您的帮助!

【问题讨论】:

    标签: python postgresql airflow pg8000


    【解决方案1】:

    一位朋友找到了答案;-)

    我们创建了一个使用 pg8000 API 的连接,而不是普通的 SQLAlchemy 连接。这是来自https://docs.sqlalchemy.org/en/13/core/connections.html#working-with-raw-dbapi-connections

    现在我们有了 pg8000 连接,我查看了 pg8000 示例的这一部分:https://github.com/tlocke/pg8000#copy-from-and-to-a-file-1 从 pg8000-conn 创建一个游标,然后使用 cursor.execute 函数。 第 120 行的 connPG8K.cursor.execute() 使用 pg8000,然后将能够使用函数中的流输入。 sqlAlchemy conn.execute 没有流输入选项,因此可能会失败。

    代码如下:

    stream_in = StringIO()
        csv_writer = csv.writer(stream_in)
        csv_writer.writerow([1, "electron"])
        csv_writer.writerow([2, "muon"])
        csv_writer.writerow([3, "tau"])
        csv_writer.writerow([4, "sean is the best"])
        stream_in.seek(0)
        
       # Creates a connection with sqlalchemy methods
        conn = engine.connect()
        # Get the connection from pg8000 library
        connPG8K = engine.raw_connection()
        # Get cursor from pg8000 to be able to run commands
        cursor = connPG8K.cursor()
        cursor.execute("CREATE TABLE IF NOT EXISTS temp_table (user_id numeric, user_name text)")        
        cursor.execute("COPY temp_table FROM STDIN WITH (FORMAT csv, DELIMITER)", stream=stream_in)
        connPG8K.commit()
    

    【讨论】:

      猜你喜欢
      • 2018-08-20
      • 2018-02-19
      • 1970-01-01
      • 2017-10-29
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2011-05-02
      相关资源
      最近更新 更多