【问题标题】:SQLAlchemy - create table from yaml or a dictionary?SQLAlchemy - 从 yaml 或字典创建表?
【发布时间】:2018-07-26 21:50:09
【问题描述】:

有没有办法从 yaml 文件中指定的字典创建动态表?我在 yaml 文件中定义了很多 ETL 配置,所以我很好奇我是否也可以向其中添加表创建方面,这样我就不必在单独的目录中修改单独的 .sql 文件。

database:
  table: 'schema.fact_stuff'
  create_columns: [
    {}
  ] #not sure how this section should be

我在 stackoverflow 上找到了一个解决方案,该解决方案将一些列表压缩在一起,这是类似的,但我更愿意明确定义每一列。

{'column_name': 'id', 'column_type': Integer, 'primary_key': False, 'nullable': True}

我最终让它与这个一起工作:

from sqlalchemy.types import (Integer, NUMERIC, TEXT, BOOLEAN, TIMESTAMP, DATE)

sql_types = {'integer': Integer,
        'numeric': NUMERIC,
        'text': TEXT,
        'date': DATE,
        'timestamp': TIMESTAMP(timezone=False),
        'timestamptz': TIMESTAMP(timezone=True)}

exclude_list = ['original_name']
table_dict = [{k: v for k, v in d.items() if k not in exclude_list} for d in c[variable]['load']['columns']]
for column in table_dict:
    for key, val in column.copy().items():
        if key == 'type_':
            column[key] = sql_types[val]
        elif key == 'default':
            column[key] = dt.datetime.utcnow

metadata = sa.MetaData(schema=c[variable]['load']['schema'])
metadata.reflect(bind=engine, autoload=True)
fact = sa.Table(c[variable]['load']['table'], metadata, extend_existing=True,
        *(sa.Column(**kwargs) for kwargs in table_dict))
fact.create_all(engine, checkfirst=True)

但后来我转而让 pandas 确定 dtypes,而不是在 yaml 文件中定义它们。这将使用 jinja2 模板创建 sql,然后我遍历所有数据源以创建 DDL。

def pandas_to_postgres(df):
    dtype_dict = {
      'i': 'integer',
      'O': 'text',
      'f': 'real',
      'b': 'boolean',
      'datetime64[ns]': 'timestamp',
      'datetime64[ns, UTC]': 'timestampz',
    }
    column_list = []
    column_dict = {}
    for k, v in df.dtypes.items():
        column_dict['name'] = k
        column_dict['dtype'] = dtype_dict.get(v.kind, 'text')
        column_list.append(column_dict.copy())
    return column_list


def generate_create_table(df, schema, table, table_type, columns, constraint, unique_columns):
    """ Returns a dictionary of coefs from training """
    query = Template(
        template
    ).render(
        schema_name=schema,
        table_name=table,
        table_type=table_type,
        columns=columns,
        constraint=constraint,
        constraint_columns=unique_columns
    )
    print(query)

【问题讨论】:

    标签: python sqlalchemy yaml


    【解决方案1】:

    今天发布的 SQLAthanor (v.0.3.0) 完全支持这一点。使用 SQLAthanor,您可以使用以下代码以编程方式生成 SQLAlchemy Table 对象(假设 metadata 包含您的 SQLAlchemy MetaData 对象):

    from sqlathanor import Table
    
    my_table = Table.from_yaml('yaml_file.yaml', 
                               'my_table_name', 
                               metadata, 
                               primary_key = 'id')
    

    ETA:请注意,您也可以使用Table.from_json()Table.from_dict()Table.from_csv() 创建Table 对象。

    以下是有关其工作原理的文档(一般):https://sqlathanor.readthedocs.io/en/latest/using.html#generating-sqlalchemy-tables-from-serialized-data

    这里是特定Table.from_yaml()方法的文档链接:https://sqlathanor.readthedocs.io/en/latest/api.html#sqlathanor.schema.Table.from_yaml

    (我建议查看方法文档 - 它涉及以编程方式从序列化数据构造 Table 对象的一些“陷阱”)


    预计到达时间:

    基本上,程序化Table 生成的工作方式是 SQLAthanor:

    1. 首先将序列化字符串(或文件)转换为 Python dict。对于 YAML,默认的反序列化器是 PyYAML。对于 JSON,默认的反序列化器是 simplejson(两个默认反序列化器都可以使用 deserialize_function 参数覆盖)。

    2. 一旦生成了 Python dict,SQLAthanor 就会读取该 dict 中的每个键以确定列名。它读取每个键的值,并根据值的数据类型尝试“猜测” SQLAlchemy 数据类型。

    3. 1234563 /li>

    如果您需要更精确地控制每个Column,您可以:

    • 使用type_mapping 参数覆盖其SQLAlchemy 数据类型(type_mapping 接收dict,其中顶级键对应于列名,每个值都是应用于Column 的数据类型)
    • 使用column_kwargs 参数将额外的关键字参数传递给Column 构造函数(column_kwargs 接收dict,其中顶级键对应于列名,每个值都是带有关键字参数的dict将提供给该列的构造函数。

    默认情况下,Table.from_<format>()支持嵌套数据结构。默认情况下,skip_nested 设置为True,这意味着反序列化的dict 中包含嵌套对象(可迭代对象或dict)的键将被跳过(即不会接收对应的Column)。如果你的Table需要存储嵌套数据,可以设置skip_nestedFalse,激活default_to_strTrue。这会将嵌套数据(可迭代对象或dict 对象)转换为字符串,从而将它们保存在Text 列中(除非被type_mapping 覆盖)。


    Table.from_dict() 示例

    以下是可以提供给Table.from_dict()dict 示例:

    sample_dict = {
        'id': 123,
        'some_column_name': 'Some Column Value',
        'created_datetime': datetime.utcnow()
    }
    
    my_table = Table.from_dict(sample_dict, 
                               'my_table', 
                               metadata, 
                               primary_key = 'id')
    

    当提供给Table.from_dict() 时,此dict 将生成一个Table 对象,其数据库表名为my_table,其中包含三列:

    • id 将具有类型 Integer 设置为表的主键
    • some_column_name 类型为 Text
    • created_datetime 类型为 DateTime

    Table.from_yaml() 示例

    以下是相同的示例,但使用 YAML 字符串/文档代替,可以提供给 Table.from_yaml()

    sample_yaml = """
        id: 123
        some_column_name: Test Value
        created_timestamp: 2018-01-01T01:23:45.67890
    """
    
    my_table = Table.from_yaml(sample_yaml, 
                               'my_table', 
                               metadata, 
                               primary_key = 'id')
    

    当提供给Table.from_yaml() 时,这将首先将sample_yaml 反序列化为dict,就像在前面的示例中一样,然后生成一个数据库表名为my_tableTable 对象,其中包含三列:

    • id 将具有类型 Integer 设置为表的主键
    • some_column_name 类型为 Text
    • created_datetime 类型为 DateTime

    希望这会有所帮助!

    【讨论】:

    • 哇很好,是的,我刚刚编辑了我的问题,并包括了一些我走的路。您是否有将用作输入的 yaml 文件或 dict 的示例?我目前正在使用带有 df.columns 和 df.types 的 pandas 并将其传递给 jinja 模板,因为手动更新 yaml 文件很繁琐。
    • 别担心!我用更详细的解释更新了我的答案,包括将Table.from_dict()Table.from_yaml() 与Python dict 和相应的YAML 文档一起使用的示例。希望它有助于澄清!
    • 完美,这对我来说很清楚。谢谢一堆。试一试
    • 哇!你的图书馆看起来很酷!您是否有一些带有表定义的 YAML/JSON 文件示例(包括约束 [primary、unique、not null 等])?
    • 另外,要明确一件事:JSON/YAML 文件本身没有定义约束:这些是在 Python 调用 Table.from_json() 或类似方法中定义的。我有一个feature request 用于序列化以利用 JSON Schema 和 OpenAPI,这可能会扩展一些“约束设置”功能,但目前还不支持。
    猜你喜欢
    • 2017-12-09
    • 1970-01-01
    • 1970-01-01
    • 2020-12-14
    • 2020-01-24
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-02-13
    相关资源
    最近更新 更多