【问题标题】:How can I automatically infer schemas of CSV files on S3 as I load them?如何在加载 S3 时自动推断 CSV 文件的架构?
【发布时间】:2021-05-12 07:39:14
【问题描述】:

上下文

目前我使用 Snowflake 作为数据仓库,使用 AWS 的 S3 作为数据湖。 S3 上的大多数文件都采用Parquet 格式。对于这些,我正在使用 Snowflake 的一个新的受限功能(记录在 here),它自动检测 S3 上 parquet 文件中的模式,我可以使用它来生成具有正确列名和推断数据类型的 CREATE TABLE 语句.此功能目前仅适用于 Apache Parquet、Avro 和 ORC 文件。我想找到一种方法来实现相同的预期目标,但适用于 CSV 文件。

我尝试过的事情

这是我目前推断 Parquet 文件架构的方式:

select generate_column_description(array_agg(object_construct(*)), 'table') as columns 
from table (infer_schema(location=>'${LOCATION}', file_format=>'${FILE_FORMAT}'))

但是,如果我尝试将 FILE_FORMAT 指定为 csv,则该方法将失败。

我考虑过的其他方法:

  1. 将 S3 上的所有文件传输到 parquet(这涉及更多代码和基础设施设置,因此不是我的首选,尤其是我希望在 s3 上保留一些文件的自然类型)
  2. 有一个脚本(例如在 Python 中使用 Pandas 之类的库)可以推断 S3 中文件的架构(这也涉及更多代码,并且在 Snowflake 中处理 parquet 文件而不是非 parquet 文件的意义上会很奇怪由 aws 上的某些脚本处理)。
  3. 使用 Snowflake UDF 推断架构。还没有完全考虑我的选择。

期望的行为

当一个新的 csv 文件登陆 S3(在预先存在的 STAGE 上)时,我想推断架构,并能够使用推断的数据类型生成 CREATE TABLE 语句。最好,我想在 Snowflake 中这样做,因为现有的上述模式推理解决方案在那里存在。如果需要,很高兴添加更多信息。

【问题讨论】:

  • 与 Parquet 文件格式不同,CSV 文件没有元数据来说明列包含的内容。您将拥有的 CSV 文件是否提供了有关列数据类型的信息?
  • 嗨@AndrewMorton。并不真地。对于我控制它们如何写入 S3 的文件,我可以在 S3 上写入元数据文件。但对于其他服务写入 S3 的文件,这将更具挑战性。我也在考虑是否可以通过 Glue 或类似的方式对 S3 上的数据进行编目。但如果有更直接的方法,不会是我的首选。
  • 此页面的“相关”部分(在右侧的列中)有什么有用的吗?
  • 你的 csv 文件第一行有列名吗?如果不是,您打算如何调用推断列?
  • @NickW CSV 在第一行有列名,所以我想弄清楚数据类型比列名更具挑战性

标签: amazon-s3 snowflake-cloud-data-platform


【解决方案1】:

更新:我修改了在无类型(所有字符串类型列)表中推断数据类型的 SP,它现在直接针对 Snowflake 阶段工作。项目代码在这里:https://github.com/GregPavlik/InferSchema

我写了一个存储过程来帮助解决这个问题;但是,它的唯一目标是推断无类型列的数据类型。它的工作原理如下:

  1. 将 CSV 加载到表中,所有列都定义为 varchars。
  2. 通过对新表的查询调用 SP(要点是只获取您想要的列并限制行数以保持合理的类型推断时间)。
  3. SP 调用中还包含旧位置和新位置的数据库、架构和表 - old 带有所有 varchar,new 带有推断类型。

然后,SP 将推断数据类型并创建两个 SQL 语句。一条语句将使用推断的数据类型创建新表。一条语句将使用适当的包装器(例如 try_multi_timestamp(),一个扩展 try_to_timestamp() 以尝试各种常见格式的 UDF)从无类型(所有 varchar)表复制到新表。

我打算扩展它,使它根本不需要无类型(全 varchar)表,但还没有解决它。既然它出现在这里,我可能会回过头来用这种能力更新 SP。您可以指定直接从阶段读取的查询,但您必须使用 $1、$2... 和列名的别名(否则 DDL 将尝试创建像 $1 这样的列名)。如果查询直接针对某个阶段运行,对于旧的数据库、模式和表,您可以放入任何内容,因为这仅用于从 select 语句生成插入。

-- This shows how to use on the Snowflake TPCH sample, but could be any query.
-- Keep the row count down to reduce the time it take to infer the types.
call infer_data_types('select * from SNOWFLAKE_SAMPLE_DATA.TPCH_SF1.LINEITEM limit 10000', 
                      'SNOWFLAKE_SAMPLE_DATA', 'TPCH_SF1', 'LINEITEM',
                      'TEST', 'PUBLIC', 'LINEITEM');

create or replace procedure INFER_DATA_TYPES(SOURCE_QUERY string,
                                             DATABASE_OLD string,
                                             SCHEMA_OLD string,
                                             TABLE_OLD string,
                                             DATABASE_NEW string,
                                             SCHEMA_NEW string,
                                             TABLE_NEW string)
returns string
language javascript
as
$$

/****************************************************************************************************
*                                                                                                   *
*  DataType Classes                                        
*                                                                                                   *
****************************************************************************************************/

class Query{
    constructor(statement){
        this.statement = statement;
    }
}


class DataType {
    constructor(db, schema, table, column, sourceQuery) {
        this.db = db;
        this.schema = schema;
        this.table = table;
        this.sourceQuery = sourceQuery
        this.column = column;
        this.insert = '"@~COLUMN~@"';
        this.totalCount = 0;
        this.notNullCount = 0;
        this.typeCount = 0;
        this.blankCount = 0;
        this.minTypeOf  = 0.95;
        this.minNotNull = 1.00;
    }
    setSQL(sqlTemplate){
        this.sql = sqlTemplate;
        this.sql = this.sql.replace(/@~DB~@/g,     this.db);
        this.sql = this.sql.replace(/@~SCHEMA~@/g, this.schema);
        this.sql = this.sql.replace(/@~TABLE~@/g,  this.table);
        this.sql = this.sql.replace(/@~COLUMN~@/g, this.column);
    }
    getCounts(){
        var rs;
        rs = GetResultSet(this.sql);
        rs.next();
        this.totalCount   = rs.getColumnValue("TOTAL_COUNT");
        this.notNullCount = rs.getColumnValue("NON_NULL_COUNT");
        this.typeCount    = rs.getColumnValue("TO_TYPE_COUNT");
        this.blankCount   = rs.getColumnValue("BLANK");
    }
    isCorrectType(){
        return (this.typeCount / (this.notNullCount - this.blankCount) >= this.minTypeOf);
    }
    isNotNull(){
        return (this.notNullCount / this.totalCount >= this.minNotNull);
    }
}

class TimestampType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "timestamp";
        this.insert = 'try_multi_timestamp(trim("@~COLUMN~@"))';
        this.sourceQuery = SOURCE_QUERY;
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class IntegerType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "number(38,0)";
        this.insert = 'try_to_number(trim("@~COLUMN~@"), 38, 0)';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class DoubleType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "double";
        this.insert = 'try_to_double(trim("@~COLUMN~@"))';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

class BooleanType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "boolean";
        this.insert = 'try_to_boolean(trim("@~COLUMN~@"))';
        this.setSQL(GetCheckTypeSQL(this.insert, this.sourceQuery));
        this.getCounts();
    }
}

 // Catch all is STRING data type
class StringType extends DataType{
    constructor(db, schema, table, column, sourceQuery){
        super(db, schema, table, column, sourceQuery)
        this.syntax = "string";
        this.totalCount   = 1;
        this.notNullCount = 0;
        this.typeCount    = 1;
        this.minTypeOf    = 0;
        this.minNotNull   = 1;
    }
}

/****************************************************************************************************
*                                                                                                   *
*  Main function                                                                                    *
*                                                                                                   *
****************************************************************************************************/

var pass    = 0;
var column;
var typeOf;
var ins = '';

var newTableDDL = '';
var insertDML   = '';

var columnRS = GetResultSet(GetTableColumnsSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD));

while (columnRS.next()){
    pass++;
    if(pass > 1){
        newTableDDL += ",\n";
        insertDML   += ",\n";
    }
    column = columnRS.getColumnValue("COLUMN_NAME");
    typeOf = InferDataType(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD, column, SOURCE_QUERY);
    newTableDDL += '"' + typeOf.column + '" ' + typeOf.syntax;
    ins = typeOf.insert;
    insertDML   += ins.replace(/@~COLUMN~@/g, typeOf.column);
}

return GetOpeningComments()                                     +
       GetDDLPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW)     +
       newTableDDL                                              +
       GetDDLSuffixSQL()                                        +
       GetDividerSQL()                                          +
       GetInsertPrefixSQL(DATABASE_NEW, SCHEMA_NEW, TABLE_NEW)  +
       insertDML                                                +
       GetInsertSuffixSQL(DATABASE_OLD, SCHEMA_OLD, TABLE_OLD)  ;

/****************************************************************************************************
*                                                                                                   *
*  Helper functions                                                                                 *
*                                                                                                   *
****************************************************************************************************/

function InferDataType(db, schema, table, column, sourceQuery){

    var typeOf;

    typeOf = new IntegerType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new DoubleType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new BooleanType(db, schema, table, column, sourceQuery);        // May want to do a distinct and look for two values
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new TimestampType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    typeOf = new StringType(db, schema, table, column, sourceQuery);
    if (typeOf.isCorrectType()) return typeOf;

    return null;
}

/****************************************************************************************************
*                                                                                                   *
*  SQL Template Functions                                                                           *
*                                                                                                   *
****************************************************************************************************/

function GetCheckTypeSQL(insert, sourceQuery){

var sql = 
`
select      count(1)                              as TOTAL_COUNT,
            count("@~COLUMN~@")                   as NON_NULL_COUNT,
            count(${insert})                      as TO_TYPE_COUNT,
            sum(iff(trim("@~COLUMN~@")='', 1, 0)) as BLANK
--from        "@~DB~@"."@~SCHEMA~@"."@~TABLE~@";
from        (${sourceQuery})
`;

return sql;
}

function GetTableColumnsSQL(dbName, schemaName, tableName){

var sql = 
`
select  COLUMN_NAME 
from    ${dbName}.INFORMATION_SCHEMA.COLUMNS
where   TABLE_CATALOG = '${dbName}' and
        TABLE_SCHEMA  = '${schemaName}' and
        TABLE_NAME    = '${tableName}'
order by ORDINAL_POSITION;
`;
  
return sql;
}

function GetOpeningComments(){
return `
/**************************************************************************************************************
*                                                                                                             *
*   Copy and paste into a worksheet to create the typed table and insert into the new table from the old one. *
*                                                                                                             *
**************************************************************************************************************/
`;
}

function GetDDLPrefixSQL(db, schema, table){

var sql =
`
create or replace table "${db}"."${schema}"."${table}"
(
`;

    return sql;
}

function GetDDLSuffixSQL(){
    return "\n);";
}

function GetDividerSQL(){
return `
/**************************************************************************************************************
*                                                                                                             *
*   The SQL statement below this attempts to copy all rows from the string tabe to the typed table.           *
*                                                                                                             *
**************************************************************************************************************/
`;
}

function GetInsertPrefixSQL(db, schema, table){
var sql =
`\ninsert into "${db}"."${schema}"."${table}" select\n`;
return sql;
}

function GetInsertSuffixSQL(db, schema, table){
var sql =
`\nfrom "${db}"."${schema}"."${table}" ;`;
return sql;
}

//function GetInsertSuffixSQL(db, schema, table){
//var sql = '\nfrom "${db}"."${schema}"."${table}";';
//return sql;
//}


/****************************************************************************************************
*                                                                                                   *
*  SQL functions                                                                                    *
*                                                                                                   *
****************************************************************************************************/

function GetResultSet(sql){
    cmd1 = {sqlText: sql};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    rs = stmt.execute();
    return rs;
}

function ExecuteNonQuery(queryString) {
    var out = '';
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    rs = stmt.execute();
}

function ExecuteSingleValueQuery(columnName, queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(columnName);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

function ExecuteFirstValueQuery(queryString) {
    var out;
    cmd1 = {sqlText: queryString};
    stmt = snowflake.createStatement(cmd1);
    var rs;
    try{
        rs = stmt.execute();
        rs.next();
        return rs.getColumnValue(1);
    }
    catch(err) {
        if (err.message.substring(0, 18) == "ResultSet is empty"){
            throw "ERROR: No rows returned in query.";
        } else {
            throw "ERROR: " + err.message.replace(/\n/g, " ");
        } 
    }
    return out;
}

function getQuery(sql){
    var cmd = {sqlText: sql};
    var query = new Query(snowflake.createStatement(cmd));
    try {
        query.resultSet = query.statement.execute();
    } catch (err) {
        throw "ERROR: " + err.message.replace(/\n/g, " ");
    }
    return query;
}

$$;

【讨论】:

    【解决方案2】:

    您尝试过 STAGES 吗?

    创建 2 个阶段 ... 一个没有标题,另一个有标题 .. . .

    请参阅下面的示例。

    然后是一点 SQL,瞧你的 DDL。

    唯一的问题 - 您需要知道列数才能放置正确数量的 t.$。

    如果有人可以自动化...我们将有一个几乎自动的 CSV 的 DDL 生成器。

    显然,一旦您有了 SQL stmt,然​​后只需将创建或替换表添加到前面,您的表就会很好地使用 CSV 中的所有名称创建。

    :-)

        --  create or replace stage CSV_NO_HEADER
      URL = 's3://xxx-x-dev-landing/xxx/'
      STORAGE_INTEGRATION = "xxxLAKE_DEV_S3_INTEGRATION"
      FILE_FORMAT = ( TYPE = CSV  SKIP_HEADER = 1 FIELD_OPTIONALLY_ENCLOSED_BY = '"'  )
     
       --  create or replace stage CSV
      URL = 's3://xxx-xxxlake-dev-landing/xxx/'
      STORAGE_INTEGRATION = "xxxLAKE_DEV_S3_INTEGRATION"
      FILE_FORMAT = ( TYPE = CSV    FIELD_OPTIONALLY_ENCLOSED_BY = '"'  )
       
     
     
      select   concat('select t.$1 ',  t.$1, ',t.$2 ',  t.$2,',t.$3 ',  t.$3, ',t.$4 ',  t.$4,',t.$5 ',  t.$5,',t.$6 ',  t.$6,',t.$7 ',  t.$7,',t.$8 ',  t.$8,',t.$9 ',  t.$9,
     ',t.$10 ',  t.$10, ',t.$11 ',  t.$11,',t.$12 ',  t.$12 ,',t.$13 ',  t.$13, ',t.$14 ',  t.$14 ,',t.$15 ',  t.$15 ,',t.$16 ',  t.$16 ,',t.$17 ',  t.$17 ,' from @xxxx_NO_HEADER/SUB_TRANSACTION_20201204.csv t')  from
     --- CHANGE TABLE ---
     @xxx/SUB_TRANSACTION_20201204.csv t  limit 1;
    

    【讨论】:

      猜你喜欢
      • 2020-11-18
      • 2017-12-10
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-01-17
      • 1970-01-01
      • 2023-03-17
      • 2017-08-12
      相关资源
      最近更新 更多