【问题标题】:Upload CSV to BigQuery in C#在 C# 中将 CSV 上传到 BigQuery
【发布时间】:2015-04-13 13:26:13
【问题描述】:

基本上我想要做的是向BigQuery(异步)提交作业,检查作业状态并打印出相应的状态信息或错误信息。我创建了一个如下所示的框架。但我需要帮助:

  1. GoogleApiException:调用“BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute()”时未找到作业异常。我的直觉是作业没有正确提交,但我不知道如何正确提交。

  2. 我应该如何处理 GoogleApiExceptions?

第一步:创建一个 Job(将 CSV 文件上传到 BigQuery),返回 JobReference

        TableReference DestTable = new TableReference();
        DestTable.ProjectId = project;
        DestTable.DatasetId = dataset;
        DestTable.TableId = tableId;

        Job Job = new Job();
        JobConfiguration Config = new JobConfiguration();
        JobConfigurationLoad ConfigLoad = new JobConfigurationLoad();


        ConfigLoad.Schema = schema;
        ConfigLoad.DestinationTable = DestTable;
        ConfigLoad.Encoding = "ISO-8859-1";
        ConfigLoad.CreateDisposition = "CREATE_IF_NEEDED";
        ConfigLoad.WriteDisposition = createDisposition;
        ConfigLoad.FieldDelimiter = delimiter.ToString();
        ConfigLoad.AllowJaggedRows = true;
        Config.Load = ConfigLoad;
        Job.Configuration = Config;

        //set job reference (mainly job id)
        JobReference JobRef = new JobReference();
        JobRef.JobId = GenerateJobID("Upload");
        JobRef.ProjectId = project;
        Job.JobReference = JobRef;

        using(FileStream fileStream = new FileStream(filePath,FileMode.Open)){
            var JobInfo = BigQueryService.Jobs.Insert(Job,project,fileStream,"text/csv");//application/octet-stream
            JobInfo.UploadAsync();
            Console.WriteLine(JobInfo.GetProgress().Status.ToString());
        }
        return JobRef;

然后,在第一步返回的 JobReference 中使用 projectId 和 jobId 拉取 Job 状态:

     while (true)
        {
              pollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();
                i = 0;
                Console.WriteLine("Job status" + jobReference.JobId + ": " + pollJob.Status.State);
                if (pollJob.Status.State.Equals("DONE"))
                {
                    return pollJob;
                }
                // Pause execution for pauseSeconds before polling job status again,
                // to reduce unnecessary calls to the BigQuery API and lower overall
                // application bandwidth.
                Thread.Sleep(pauseSeconds * 1000);

        }

【问题讨论】:

标签: c#-4.0 google-bigquery


【解决方案1】:

几乎没有任何有用的示例代码显示如何将本地 CSV 文件上传到 Bigquery 表。我最终得到了一些工作。它可能不是最好的解决方案,但它至少有效。它可以进行任何改进。

private JobReference JobUpload(string project, string dataset, string tableId, string filePath, TableSchema schema, string createDisposition, char delimiter)
    {

        TableReference DestTable = new TableReference();
        DestTable.ProjectId = project;
        DestTable.DatasetId = dataset;
        DestTable.TableId = tableId;

        Job Job = new Job();
        JobConfiguration Config = new JobConfiguration();
        JobConfigurationLoad ConfigLoad = new JobConfigurationLoad();


        ConfigLoad.Schema = schema;
        ConfigLoad.DestinationTable = DestTable;
        ConfigLoad.Encoding = "ISO-8859-1";
        ConfigLoad.CreateDisposition = "CREATE_IF_NEEDED";
        ConfigLoad.WriteDisposition = createDisposition;
        ConfigLoad.FieldDelimiter = delimiter.ToString();
        ConfigLoad.AllowJaggedRows = true;
        ConfigLoad.SourceFormat = "CSV";
        Config.Load = ConfigLoad;
        Job.Configuration = Config;

        //set job reference (mainly job id)
        JobReference JobRef = new JobReference();
        JobRef.JobId = GenerateJobID("Upload");
        JobRef.ProjectId = project;
        Job.JobReference = JobRef;

        using(FileStream fileStream = new FileStream(filePath,FileMode.Open)){
            JobsResource.InsertMediaUpload InsertMediaUpload = new  JobsResource.InsertMediaUpload(BigQueryService,Job,Job.JobReference.ProjectId,fileStream,"application/octet-stream");
            var JobInfo = InsertMediaUpload.UploadAsync();
            Console.WriteLine(JobInfo.Status);
            while (!JobInfo.IsCompleted)
            {
               //wait for the job to be activated and run 
                Console.WriteLine(JobInfo.Status);
            }
        }
        return JobRef;
    }

在此之后,您实际上可以使用返回的 JobRef 来拉取作业状态,几乎与我们使用 Java API 所做的一样:

while(true)
{
     PollJob = BigQueryService.Jobs.Get(jobReference.ProjectId, jobReference.JobId).Execute();

     Console.WriteLine("Job status" + jobReference.JobId + ": " + PollJob.Status.State);
     if (PollJob.Status.State.Equals("DONE"))
     {
       return PollJob;
     }
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2020-01-21
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2016-08-15
    • 1970-01-01
    • 1970-01-01
    • 2014-07-04
    相关资源
    最近更新 更多