【问题标题】:How to load data from Cloud Storage into BigQuery using Java如何使用 Java 将数据从 Cloud Storage 加载到 BigQuery
【发布时间】:2014-08-07 04:55:17
【问题描述】:

我想将数据从 Google Cloud Storage 上传到 Big Query 中的表。有我的代码来创建工作:

public class LoadStorageToBigQuery {

// ///////////////////////
// USER GENERATED VALUES: you must fill in values specific to your
// application.
//
// Visit the Google API Console to create a Project and generate an
// OAuth 2.0 Client ID and Secret (http://code.google.com/apis/console).
// Then, add the Project ID below, and update the clientsecrets.json file
// with your client_id and client_secret
//
// ///////////////////////
private static final String PROJECT_ID = "gavd.com:compute";
private static final String CLIENTSECRETS_LOCATION = "/client_secrets.json";
 private static final String RESOURCE_PATH =
          ("E:/Work On/ads/Cloud/Dev/Source/BigQueryDemo02" + CLIENTSECRETS_LOCATION).replace(
              '/', File.separatorChar);
static GoogleClientSecrets clientSecrets = loadClientSecrets();

// Static variables for API scope, callback URI, and HTTP/JSON functions
private static final List<String> SCOPES = Arrays
        .asList("https://www.googleapis.com/auth/bigquery");
private static final String REDIRECT_URI = "urn:ietf:wg:oauth:2.0:oob";
private static final HttpTransport TRANSPORT = new NetHttpTransport();
private static final JsonFactory JSON_FACTORY = new JacksonFactory();

private static GoogleAuthorizationCodeFlow flow = null;

/**
 * @param args
 * @throws IOException
 * @throws InterruptedException
 */
public static void main(String[] args) throws IOException,
        InterruptedException {
    System.out.println(CLIENTSECRETS_LOCATION);
    // Create a new BigQuery client authorized via OAuth 2.0 protocol
    Bigquery bigquery = createAuthorizedClient();

    // Print out available datasets to the console
    listDatasets(bigquery, "publicdata");
    JobReference jobId = startQuery(bigquery, PROJECT_ID);
    System.out.println("Job ID = " + jobId); 
    JobReference jobRef = startQuery(bigquery, PROJECT_ID);
    checkQueryResults(bigquery, PROJECT_ID, jobRef);

}

/**
 * Creates an authorized BigQuery client service using the OAuth 2.0
 * protocol
 * 
 * This method first creates a BigQuery authorization URL, then prompts the
 * user to visit this URL in a web browser to authorize access. The
 * application will wait for the user to paste the resulting authorization
 * code at the command line prompt.
 * 
 * @return an authorized BigQuery client
 * @throws IOException
 */
public static Bigquery createAuthorizedClient() throws IOException {

    String authorizeUrl = new GoogleAuthorizationCodeRequestUrl(
            clientSecrets, REDIRECT_URI, SCOPES).setState("").build();

    System.out
            .println("Paste this URL into a web browser to authorize BigQuery Access:\n"
                    + authorizeUrl);

    System.out.println("... and type the code you received here: ");
    BufferedReader in = new BufferedReader(new InputStreamReader(System.in));
    String authorizationCode = in.readLine();

    // Exchange the auth code for an access token and refesh token
    Credential credential = exchangeCode(authorizationCode);

    return Bigquery.builder(TRANSPORT, JSON_FACTORY)
            .setHttpRequestInitializer(credential)
            .setApplicationName("Your User Agent Here").build();
}

/**
 * Display all BigQuery Datasets associated with a Project
 * 
 * @param bigquery
 *            an authorized BigQuery client
 * @param projectId
 *            a string containing the current project ID
 * @throws IOException
 */
public static void listDatasets(Bigquery bigquery, String projectId)
        throws IOException {
    Datasets.List datasetRequest = bigquery.datasets().list(projectId);
    DatasetList datasetList = datasetRequest.execute();
    if (datasetList.getDatasets() != null) {
        List<DatasetList.Datasets> datasets = datasetList.getDatasets();
        System.out.println("Available datasets\n----------------");
        System.out.println( " B = " + datasets.toString());
        for (DatasetList.Datasets dataset : datasets) {
            System.out.format("%s\n", dataset.getDatasetReference()
                    .getDatasetId());
        }
    }
}


public static JobReference startQuery(Bigquery bigquery, String projectId) throws IOException {

    Job job = new Job();
    JobConfiguration config = new JobConfiguration();
    JobConfigurationLoad loadConfig = new JobConfigurationLoad();
    config.setLoad(loadConfig);

    job.setConfiguration(config);

    // Set where you are importing from (i.e. the Google Cloud Storage paths).
    List<String> sources = new ArrayList<String>();
    sources.add("gs://gms_cloud_project/bigquery_data/06_13_2014/namesbystate.csv");
    loadConfig.setSourceUris(sources);
    //state:STRING,sex:STRING,year:INTEGER,name:STRING,occurrence:INTEGER
    // Describe the resulting table you are importing to:
    TableReference tableRef = new TableReference();
    tableRef.setDatasetId("gimasys_database");
    tableRef.setTableId("table_test");
    tableRef.setProjectId(projectId);
    loadConfig.setDestinationTable(tableRef);

    List<TableFieldSchema> fields = new ArrayList<TableFieldSchema>();
    TableFieldSchema fieldState = new TableFieldSchema();
    fieldState.setName("state");
    fieldState.setType("STRING");
    TableFieldSchema fieldSex = new TableFieldSchema();
    fieldSex.setName("sex");
    fieldSex.setType("STRING");
    TableFieldSchema fieldName = new TableFieldSchema();
    fieldName.setName("name");
    fieldName.setType("STRING");
    TableFieldSchema fieldYear = new TableFieldSchema();
    fieldYear.setName("year");
    fieldYear.setType("INTEGER");
    TableFieldSchema fieldOccur = new TableFieldSchema();
    fieldOccur.setName("occurrence");
    fieldOccur.setType("INTEGER");
    fields.add(fieldState);
    fields.add(fieldSex);
    fields.add(fieldName);
    fields.add(fieldYear);
    fields.add(fieldOccur);
    TableSchema schema = new TableSchema();
    schema.setFields(fields);
    loadConfig.setSchema(schema);

    // Also set custom delimiter or header rows to skip here....
    // [not shown].

    Insert insert = bigquery.jobs().insert(projectId, job);
    insert.setProjectId(projectId);
    JobReference jobRef =  insert.execute().getJobReference();

    // ... see rest of codelab for waiting for job to complete.
    return jobRef;
    //return jobId;
}

/**
 * Polls the status of a BigQuery job, returns Job reference if "Done"
 * 
 * @param bigquery
 *            an authorized BigQuery client
 * @param projectId
 *            a string containing the current project ID
 * @param jobId
 *            a reference to an inserted query Job
 * @return a reference to the completed Job
 * @throws IOException
 * @throws InterruptedException
 */
private static Job checkQueryResults(Bigquery bigquery, String projectId,
        JobReference jobId) throws IOException, InterruptedException {
    // Variables to keep track of total query time
    long startTime = System.currentTimeMillis();
    long elapsedTime;

    while (true) {
        Job pollJob = bigquery.jobs().get(projectId, jobId.getJobId())
                .execute();
        elapsedTime = System.currentTimeMillis() - startTime;
        System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
                jobId.getJobId(), pollJob.getStatus().getState());
        if (pollJob.getStatus().getState().equals("DONE")) {
            return pollJob;
        }
        // Pause execution for one second before polling job status again,
        // to
        // reduce unnecessary calls to the BigQUery API and lower overall
        // application bandwidth.
        Thread.sleep(1000);
    }
}

/**
 * Helper to load client ID/Secret from file.
 * 
 * @return a GoogleClientSecrets object based on a clientsecrets.json
 */
private static GoogleClientSecrets loadClientSecrets() {
    try {
        System.out.println("A");
        System.out.println(CLIENTSECRETS_LOCATION);
        GoogleClientSecrets clientSecrets = GoogleClientSecrets.load(new JacksonFactory(),
                    new FileInputStream(new File(
                        RESOURCE_PATH)));
        return clientSecrets;
    } catch (Exception e) {
        System.out.println("Could not load file Client_Screts");
        e.printStackTrace();
    }
    return clientSecrets;
}

/**
 * Exchange the authorization code for OAuth 2.0 credentials.
 * 
 * @return an authorized Google Auth flow
 */
static Credential exchangeCode(String authorizationCode) throws IOException {
    GoogleAuthorizationCodeFlow flow = getFlow();
    GoogleTokenResponse response = flow.newTokenRequest(authorizationCode)
            .setRedirectUri(REDIRECT_URI).execute();
    return flow.createAndStoreCredential(response, null);
}

/**
 * Build an authorization flow and store it as a static class attribute.
 * 
 * @return a Google Auth flow object
 */
static GoogleAuthorizationCodeFlow getFlow() {
    if (flow == null) {
        HttpTransport httpTransport = new NetHttpTransport();
        JacksonFactory jsonFactory = new JacksonFactory();

        flow = new GoogleAuthorizationCodeFlow.Builder(httpTransport,
                jsonFactory, clientSecrets, SCOPES)
                .setAccessType("offline").setApprovalPrompt("force")
                .build();
    }
    return flow;
}

}

在创建表时状态通知“完成”,但在 console.gooogle.com 中检查我的项目时 目前,我们也没有看到任何错误或问题或异常,但它没有将任何数据上传到我的表(表大小 0B)。我试图创建没有任何数据的表,而不是将数据上传到该表,但它不是。

Job ID = {"jobId":"job_MqfuhuAU1Ms0GIOSbiePFGlc6TE","projectId":"ads.com:compute"}
Job status (451ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (2561ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (6812ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (8273ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (9695ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (11146ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (12466ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (13948ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (15392ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (16796ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: PENDING
Job status (18296ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: RUNNING
Job status (19755ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: RUNNING
Job status (21587ms) job_fOtciwR1pfytbkeMaQ9RvvH18qc: DONE

如果有任何帮助,我将不胜感激,

谢谢

【问题讨论】:

  • 你可以试试 fieldYear.setType("STRING") 和 fieldOccur.setType("STRING") 吗?这可能是数据一致性问题。
  • 好主意,我试过了,看起来还不错。但是,我仍然在思考年份和发生的类型。以下是我要插入 Bigquery 的示例数据,“LA F 1976 Jessica 321”。您如何看待将“整数”数据插入 Bigquery 的方式...:)
  • 那么问题来了!请注意,代码设置变量的顺序与该行不同。
  • 是的,我也在考虑这个问题?您如何看待将“整数”类型插入 BigQuery 的好方法。如果您能告诉我解决此案的方法,我将不胜感激:)
  • 要么将其作为字符串插入并在运行时强制转换(如果需要,将结果保存到新表中),或者在定义列时直接设置顺序(参见下面的答案)。

标签: java google-bigquery google-cloud-storage


【解决方案1】:

所以问题似乎是代码要求“字符串,字符串,字符串,整数,整数”,但提供的数据是“字符串,字符串,整数,字符串,整数” - 因此 BigQuery 无法转换将第 4 列中的字符串转换为整数。

要获得确切的错误消息,请运行bq show -j job_yourjobid

【讨论】:

  • 谢谢,我会努力的:)
【解决方案2】:

pollJob.getStatus().getState().equals("DONE") 会告诉你工作何时完成,但它不会给你退出代码。

您应该明确检查错误结果 pollJob.getStatus().getErrorResult();

    while (true) {
     Job pollJob = getBigQuery().jobs().get(projectId, jobId.getJobId()).execute();
     elapsedTime = System.currentTimeMillis() - startTime;
     if (pollJob.getStatus().getErrorResult() != null) {
        // The job ended with an error.
         System.out.format("Job %s ended with error %s", jobId.getJobId(),pollJob.getStatus().getErrorResult().getMessage(), projectId);
         throw new RuntimeException(String.format("Job %s ended with error %s", jobId.getJobId(), 
                 pollJob.getStatus().getErrorResult().getMessage()));       
     }       
     System.out.format("Job status (%dms) %s: %s\n", elapsedTime,
       jobId.getJobId(), pollJob.getStatus().getState());

     if (pollJob.getStatus().getState().equals("DONE")) {
       break;
     }
     Thread.sleep(5000);
    }

【讨论】:

    猜你喜欢
    • 2013-05-24
    • 2021-12-28
    • 1970-01-01
    • 2021-10-10
    • 1970-01-01
    • 2019-09-03
    • 1970-01-01
    • 2018-03-28
    相关资源
    最近更新 更多