【问题标题】:BigQuery Streaming with C# Client Library使用 C# 客户端库的 BigQuery 流式传输
【发布时间】:2019-02-08 22:13:31
【问题描述】:

我有一个需要迁移到 BigQuery 的 Oracle 表。我用 C# 编写了一个简单的控制台应用程序并开始流式插入。但有时应用程序会引发以下错误。我的代码很简单,下面也是。有没有人知道可能导致此错误的原因?提前致谢。

Unhandled Exception: System.Net.Http.HttpRequestException: An error occurred whi
        le sending the request. ---> System.Net.WebException: The underlying connection
        was closed: A connection that was expected to be kept alive was closed by the se
        rver. ---> System.IO.IOException: Unable to read data from the transport connect
        ion: An existing connection was forcibly closed by the remote host. ---> System.
        Net.Sockets.SocketException: An existing connection was forcibly closed by the r
        emote host
        at System.Net.Sockets.Socket.EndReceive(IAsyncResult asyncResult)
        at System.Net.Sockets.NetworkStream.EndRead(IAsyncResult asyncResult)
        --- End of inner exception stack trace ---
        at System.Net.Security._SslStream.EndRead(IAsyncResult asyncResult)
        at System.Net.TlsStream.EndRead(IAsyncResult asyncResult)
        at System.Net.PooledStream.EndRead(IAsyncResult asyncResult)
        at System.Net.Connection.ReadCallback(IAsyncResult asyncResult)
        --- End of inner exception stack trace ---
        at System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult)
        at System.Net.Http.HttpClientHandler.GetResponseCallback(IAsyncResult ar)
        --- End of inner exception stack trace ---
        at Google.Apis.Http.ConfigurableMessageHandler.<SendAsync>d__58.MoveNext()
        --- End of stack trace from previous location where exception was thrown ---
        at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
        at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNot
        ification(Task task)
        at Google.Apis.Requests.ClientServiceRequest`1.<ExecuteUnparsedAsync>d__33.Mo
        veNext()
        --- End of stack trace from previous location where exception was thrown ---
        at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
        at Google.Apis.Requests.ClientServiceRequest`1.Execute()
        at Google.Cloud.BigQuery.V2.BigQueryClientImpl.InsertRows(TableReference tabl
        eReference, IEnumerable`1 rows, InsertOptions options)
        at Google.Cloud.BigQuery.V2.BigQueryClient.InsertRows(String datasetId, Strin
        g tableId, BigQueryInsertRow[] rows)
        at BigQueryStreamer.Program.UploadJsonStreamingSync(String datasetId, String
        tableId, BigQueryClient client, BigQueryInsertRow[] _rows) in C:\Projects\BigQue
        ryStreamer\BigQueryStreamer\Program.cs:line 330
        at BigQueryStreamer.Program.Main(String[] args) in C:\Projects\BigQueryStream
        er\BigQueryStreamer\Program.cs:line 185

我的代码块是:

List<BigQueryInsertRow> _list = new List<BigQueryInsertRow>();
        while (oracleReader.Read())
            {
                BigQueryInsertRow bigQueryInsertRow = new BigQueryInsertRow();
                Dictionary<string, object> dictionary = new Dictionary<string, object>();
                for (int ordinal = 0; ordinal < oracleReader.FieldCount; ++ordinal)
                {
                    typeof(Decimal).ToString();
                    string str = oracleReader.GetValue(ordinal).GetType().ToString();
                    object obj = (str == "System.Decimal" || str== "System.Double" || str == "System.Float") ? 
                        (object)double.Parse(oracleReader.GetValue(ordinal).ToString()) :
                        (str == "System.DBNull" ? (object)null : oracleReader.GetValue(ordinal));
                    dictionary.Add(oracleReader.GetName(ordinal), obj);
                }
                bigQueryInsertRow.Add(dictionary);
                _list.Add(bigQueryInsertRow);

            }

            List<BigQueryInsertRow> _SendList = new List<BigQueryInsertRow>();

            //To Stream in 1000 rows at a time, I set _batcSize to 1000 in application configuration
            for (int i = 0; i < _list.Count; i++)
            {
                _SendList.Add(_list[i]);
                if (_SendList.Count == _batchSize)
                {
                    System.Threading.Thread.Sleep(150);
                    UploadJsonStreamingSync(_datasetid, _target, _client, _SendList.ToArray());
                    Console.WriteLine("Offset: " + ((ubound + 1) * _batchSize).ToString());
                    ubound++;
                    _SendList.Clear();
                }
            }

            if (_SendList.Count > 0)
            {
                System.Threading.Thread.Sleep(150);
                UploadJsonStreamingSync(_datasetid, _target, _client, _SendList.ToArray());
                Console.WriteLine("Offset: " + (_SendList.Count).ToString());
                ubound++;
                _SendList.Clear();
            }

            _list.Clear();
            _list = null;


        //Streaming Insert Function
        public static void UploadJsonStreamingSync(string datasetId, string tableId, BigQueryClient client, BigQueryInsertRow[] _rows)
        {
            client.InsertRows(datasetId, tableId, _rows);
        }

【问题讨论】:

  • 这是表的一次性迁移吗?如果是这样,将其导出为 CSV 并从 GCS 批量导入会更容易。使用流式传输时可能会发生套接字超时,因为您只是在发出网络请求。您需要在使用流式 API 时考虑这一点,并将某种指数返回并重试逻辑到您的代码中。
  • 这是一个持续的过程。从 RDBMS 迁移所​​有数据后,我们将继续将其与 BigQuery 同步。我正在寻求无缝集成并避免创建额外的 CSV 或 Json 文件,因为这些方法需要额外的开发工作和流程来检查。
  • 您无法在 Oracle 和 BigQuery 之间获得无缝连接。事实上,导出到 CSV 或 JSON 更容易(也更可靠),并且比流式传输更少的工作。就像我说的,如果你使用流媒体,你会得到套接字异常(即网络问题)。您需要编写逻辑来处理这个问题。但是,这取决于你想做什么。

标签: google-bigquery


【解决方案1】:

当遇到 Graham 指出的不常见的套接字异常/网络问题时,您需要编写逻辑来处理它。
您可以使用一些库。
我用过:https://github.com/App-vNext/Polly

Polly 是一个 .NET 弹性和瞬态故障处理库,允许开发人员以流畅且线程安全的方式表达重试、断路器、超时、隔板隔离和回退等策略。 em>

【讨论】:

    猜你喜欢
    • 2012-05-16
    • 2018-04-15
    • 1970-01-01
    • 1970-01-01
    • 2012-12-01
    • 2015-11-29
    • 2018-07-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多