【发布时间】:2021-02-04 10:03:21
【问题描述】:
以下代码从币安下载从开始日期到结束日期的历史 OHLCV 数据。由于 Binance 一次只允许我们下载 1000 根蜡烛,所以我照原样做了DownloadAsync。任何关于代码的建议,也很感激。
实际的问题是关于使DownloadAsync 多线程,以加快进程,因为假设以 5m 的间隔下载 2018 年到 2021 年的蜡烛。我更喜欢使用System.Reactive,但我想也欢迎其他解决方案,因为很难将代码表示为多线程版本。
下面的代码可以测试。
using Binance.Net;
using Binance.Net.Enums;
using Binance.Net.Interfaces;
using Newtonsoft.Json;
using System;
using System.Collections.Generic;
using System.IO;
using System.Threading.Tasks;
using System.Linq;
using System.Text.RegularExpressions;
using System.Reactive.Linq;
using System.Threading;
namespace DownloadCandleDataTest
{
public class DataProvider
{
private Exchange _exchange;
public DataProvider(Exchange exchange)
{
_exchange = exchange;
}
public async Task<List<OHLCV>> DownloadAsync(string pair, KlineInterval interval, DateTime startDate, DateTime endDate, int startupCandleCount)
{
DateTime start = startDate;
DateTime end = endDate;
var tempStartDate = start;
var tempEndDate = end;
var tempList = new List<OHLCV>();
for (int i = 0; tempStartDate < tempEndDate; i++)
{
var candles = await _exchange.GetCandlesAsync(pair, interval, tempStartDate, tempEndDate, 100).ConfigureAwait(false);
if (candles.Count > 0)
{
// Remove the first candle when i > 0, to prevent duplicates
if (i > 0)
{
candles.RemoveAt(0);
}
var first = candles.First();
var last = candles.Last();
Console.WriteLine($"First: {first.Timestamp} | Last: {last.Timestamp}");
tempList.AddRange(candles);
tempStartDate = last.Timestamp;
}
}
// Find duplicates
var groups = tempList.GroupBy(g => g.Timestamp).Where(e => e.Skip(1).Any());
foreach (var group in groups)
{
Console.WriteLine(group.Key);
foreach (var ohclv in group)
{
Console.WriteLine("\t" + ohclv.Timestamp);
}
}
return null;
}
}
class Program
{
public static void StartBackgroundWork()
{
Console.WriteLine("Shows use of Start to start on a background thread:");
var o = Observable.Start(() =>
{
//This starts on a background thread.
Console.WriteLine("From background thread. Does not block main thread.");
Console.WriteLine("Calculating...");
Thread.Sleep(3000);
Console.WriteLine("Background work completed.");
}).Finally(() => Console.WriteLine("Main thread completed."));
Console.WriteLine("\r\n\t In Main Thread...\r\n");
o.Wait(); // Wait for completion of background operation.
}
static async Task Main(string[] args)
{
using var exchange = new Exchange();
var dataProvider = new DataProvider(exchange);
await dataProvider.DownloadAsync("TRXUSDT", KlineInterval.FiveMinutes, new DateTime(2019, 1, 1), new DateTime(2019, 1, 2), 100).ConfigureAwait(false);
Console.ReadLine();
}
}
public class OHLCV
{
public DateTime Timestamp { get; set; }
public decimal Open { get; set; }
public decimal High { get; set; }
public decimal Low { get; set; }
public decimal Close { get; set; }
public decimal Volume { get; set; }
}
public static class Extensions
{
public static OHLCV ToCandle(this IBinanceKline candle)
{
return new OHLCV
{
Timestamp = candle.OpenTime,
Open = candle.Open,
High = candle.High,
Low = candle.Low,
Close = candle.Close,
Volume = candle.BaseVolume,
};
}
}
public class Exchange : IDisposable
{
private readonly IBinanceClient _client;
public Exchange()
{
_client = new BinanceClient();
}
public async Task<List<OHLCV>> GetCandlesAsync(string pair, KlineInterval interval, DateTime? startTime = null, DateTime? endTime = null, int? limit = null)
{
var result = await _client.Spot.Market.GetKlinesAsync(pair, interval, startTime, endTime, limit).ConfigureAwait(false);
if (result.Success)
{
return result.Data?.Select(e => e.ToCandle()).ToList();
}
return null;
}
public void Dispose()
{
if (_client != null)
{
_client.Dispose();
}
}
}
}
【问题讨论】:
-
真正的问题是什么,你想同时做更多的下载吗?下载后的处理是否占用 CPU 时间,您想并行化它吗?
-
我想要更多的并行下载,是的。在 Python (ccxt) 中,由于多线程,它下载蜡烛的速度非常快。 Python (ccxt lib) 使用 asyncio。 github.com/freqtrade/freqtrade/blob/…
标签: c# multithreading system.reactive .net-5