【问题标题】:How do I copy a CSV file into Snowflake DB using Java如何使用 Java 将 CSV 文件复制到 Snowflake DB
【发布时间】:2021-09-20 23:02:21
【问题描述】:

我正在尝试编写一个工具来将 CSV 文件加载到多个数据库中。 在尝试在线搜索如何在 Snowflake 中使用 COPY 命令时,我找不到如何在 Java 中执行此操作的信息。这就是我使用 PostgreSQL 的方式

public void loadData(Message message) throws Exception {
    try (Connection connection = DriverManager.getConnection(message.getJdbcUrl(),
            message.getUser(), message.password)) {
        loadDataWithMode(loadRequest, (BaseConnection) connection);
    } catch (Throwable error){
        throw error;   
    }
}


public void loadDataWithMode(Message message, BaseConnection connection) throws Exception {
    CopyManager copyManager = new CopyManager(connection);
    String fields = message.getFields();
    final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
        try (InputStream input = fileService.load(loadRequest.getFilePath())) {
            try (InputStreamReader reader = new InputStreamReader(input, "UTF-8")) {
                copyManager.copyIn("COPY " + targetTableWithFields + " from STDIN 
            }
        }
 }

我不熟悉雪花怎么做,任何帮助将不胜感激。

【问题讨论】:

标签: java jdbc snowflake-cloud-data-platform snowflake-schema


【解决方案1】:
public void loadDataWithMode(Message message, Connection connection) throws Exception {
        String fields = message.getFields();
        final String targetTableWithFields = message.getTableName() + "(" + fields + ")";
            LOG.info("about to copy data into table: " + targetTableWithFields);
            try (Statement statement = connection.createStatement()) {

            final SnowflakeConnectionV1 snowflakeConnectionV1 = (SnowflakeConnectionV1) connection;
            final File tempFile = fileSystemService.asLocalFile(message.getFilePath());
            try (Statement stmt = snowflakeConnectionV1.createStatement(); InputStream inputStream = new FileInputStream(tempFile)) {
                final String createStage = buildCreateStageStatement();
                LOG.info("Executing sql:{}", createStage);
                stmt.execute(createStage);
                LOG.info("Create stage was successfully executed");
                snowflakeConnectionV1.uploadStream("COPYIN_STAGE", "", inputStream, tempFile.getName(), false);
                LOG.info("Upload stream was successfully executed");
                stmt.execute("USE WAREHOUSE "+ message.getExportConnectionDetails().getWarehouse());
                LOG.info("Warehouse was successfully set to: "+message.getExportConnectionDetails().getWarehouse());
                final boolean purgeData = !(message.getLoadMode() == LoadMode.INCREMENTAL);
                String sql = String.format("copy into %s(%s) from @COPYIN_STAGE/%s file_format = ( type='CSV', skip_header=1) purge=" + purgeData + "   ", message.getTableName(), fields, tempFile.getName());
                LOG.info("Executing sql:{}", sql);
                stmt.execute(sql);
            }
        connection.commit();
        LOG.info("data was successfully copied " + targetTableWithFields);
    }
}

private String buildCreateStageStatement() {
    return "CREATE OR REPLACE TEMPORARY STAGE COPYIN_STAGE " + "file_format = ( type ='CSV')";
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2018-03-24
    • 2018-09-29
    • 2018-02-16
    • 1970-01-01
    • 1970-01-01
    • 2020-03-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多