【发布时间】:2019-02-08 22:13:31
【问题描述】:
我有一个需要迁移到 BigQuery 的 Oracle 表。我用 C# 编写了一个简单的控制台应用程序并开始流式插入。但有时应用程序会引发以下错误。我的代码很简单,下面也是。有没有人知道可能导致此错误的原因?提前致谢。
Unhandled Exception: System.Net.Http.HttpRequestException: An error occurred whi
le sending the request. ---> System.Net.WebException: The underlying connection
was closed: A connection that was expected to be kept alive was closed by the se
rver. ---> System.IO.IOException: Unable to read data from the transport connect
ion: An existing connection was forcibly closed by the remote host. ---> System.
Net.Sockets.SocketException: An existing connection was forcibly closed by the r
emote host
at System.Net.Sockets.Socket.EndReceive(IAsyncResult asyncResult)
at System.Net.Sockets.NetworkStream.EndRead(IAsyncResult asyncResult)
--- End of inner exception stack trace ---
at System.Net.Security._SslStream.EndRead(IAsyncResult asyncResult)
at System.Net.TlsStream.EndRead(IAsyncResult asyncResult)
at System.Net.PooledStream.EndRead(IAsyncResult asyncResult)
at System.Net.Connection.ReadCallback(IAsyncResult asyncResult)
--- End of inner exception stack trace ---
at System.Net.HttpWebRequest.EndGetResponse(IAsyncResult asyncResult)
at System.Net.Http.HttpClientHandler.GetResponseCallback(IAsyncResult ar)
--- End of inner exception stack trace ---
at Google.Apis.Http.ConfigurableMessageHandler.<SendAsync>d__58.MoveNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNot
ification(Task task)
at Google.Apis.Requests.ClientServiceRequest`1.<ExecuteUnparsedAsync>d__33.Mo
veNext()
--- End of stack trace from previous location where exception was thrown ---
at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
at Google.Apis.Requests.ClientServiceRequest`1.Execute()
at Google.Cloud.BigQuery.V2.BigQueryClientImpl.InsertRows(TableReference tabl
eReference, IEnumerable`1 rows, InsertOptions options)
at Google.Cloud.BigQuery.V2.BigQueryClient.InsertRows(String datasetId, Strin
g tableId, BigQueryInsertRow[] rows)
at BigQueryStreamer.Program.UploadJsonStreamingSync(String datasetId, String
tableId, BigQueryClient client, BigQueryInsertRow[] _rows) in C:\Projects\BigQue
ryStreamer\BigQueryStreamer\Program.cs:line 330
at BigQueryStreamer.Program.Main(String[] args) in C:\Projects\BigQueryStream
er\BigQueryStreamer\Program.cs:line 185
我的代码块是:
List<BigQueryInsertRow> _list = new List<BigQueryInsertRow>();
while (oracleReader.Read())
{
BigQueryInsertRow bigQueryInsertRow = new BigQueryInsertRow();
Dictionary<string, object> dictionary = new Dictionary<string, object>();
for (int ordinal = 0; ordinal < oracleReader.FieldCount; ++ordinal)
{
typeof(Decimal).ToString();
string str = oracleReader.GetValue(ordinal).GetType().ToString();
object obj = (str == "System.Decimal" || str== "System.Double" || str == "System.Float") ?
(object)double.Parse(oracleReader.GetValue(ordinal).ToString()) :
(str == "System.DBNull" ? (object)null : oracleReader.GetValue(ordinal));
dictionary.Add(oracleReader.GetName(ordinal), obj);
}
bigQueryInsertRow.Add(dictionary);
_list.Add(bigQueryInsertRow);
}
List<BigQueryInsertRow> _SendList = new List<BigQueryInsertRow>();
//To Stream in 1000 rows at a time, I set _batcSize to 1000 in application configuration
for (int i = 0; i < _list.Count; i++)
{
_SendList.Add(_list[i]);
if (_SendList.Count == _batchSize)
{
System.Threading.Thread.Sleep(150);
UploadJsonStreamingSync(_datasetid, _target, _client, _SendList.ToArray());
Console.WriteLine("Offset: " + ((ubound + 1) * _batchSize).ToString());
ubound++;
_SendList.Clear();
}
}
if (_SendList.Count > 0)
{
System.Threading.Thread.Sleep(150);
UploadJsonStreamingSync(_datasetid, _target, _client, _SendList.ToArray());
Console.WriteLine("Offset: " + (_SendList.Count).ToString());
ubound++;
_SendList.Clear();
}
_list.Clear();
_list = null;
//Streaming Insert Function
public static void UploadJsonStreamingSync(string datasetId, string tableId, BigQueryClient client, BigQueryInsertRow[] _rows)
{
client.InsertRows(datasetId, tableId, _rows);
}
【问题讨论】:
-
这是表的一次性迁移吗?如果是这样,将其导出为 CSV 并从 GCS 批量导入会更容易。使用流式传输时可能会发生套接字超时,因为您只是在发出网络请求。您需要在使用流式 API 时考虑这一点,并将某种指数返回并重试逻辑到您的代码中。
-
这是一个持续的过程。从 RDBMS 迁移所有数据后,我们将继续将其与 BigQuery 同步。我正在寻求无缝集成并避免创建额外的 CSV 或 Json 文件,因为这些方法需要额外的开发工作和流程来检查。
-
您无法在 Oracle 和 BigQuery 之间获得无缝连接。事实上,导出到 CSV 或 JSON 更容易(也更可靠),并且比流式传输更少的工作。就像我说的,如果你使用流媒体,你会得到套接字异常(即网络问题)。您需要编写逻辑来处理这个问题。但是,这取决于你想做什么。
标签: google-bigquery