【问题标题】:how to run for loop as threads so it speeds up the for loop如何将 for 循环作为线程运行以加快 for 循环
【发布时间】:2021-11-22 18:30:44
【问题描述】:

我有一个 for 循环,它在其中获取每个键的历史数据(historical_data 函数)。现在我有大约200种加密货币,因此需要很长时间才能一一获取历史数据。现在我想知道是否可以在单独的线程中获取每个符号的历史数据。

所以在代码中它贯穿每个符号和每个间隔。然后我给列命名并删除一些不可用的列。在每个循环之后,数据被附加到列表中。如果他们必须一个一个地运行,这需要很长时间,这就是为什么我想尝试在线程中运行它,但我不知道具体如何。

提前致谢!

class crypto:
    symbols = []
    with open('allsymbols', 'r+') as f:
        for line in f:
            symbol = line.strip('\n') + 'BTC'
            symbols.append(symbol)

    intervals = ['1m','5m']

    @staticmethod
    def historical_data():
        historical_list = []
        for i in range(len(crypto.symbols)):
            for j in range(len(crypto.intervals)):
                historical_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1]
                historical_df = pd.DataFrame(historical_data)
                historical_df.columns = ['time', '1', 'high', 'low', 'close', '5', '6', '7', '8', '9', '10', '11']
                historical_df.drop(columns=['1', '5', '6', '7', '8', '9', '10', '11'], axis=1, inplace=True)
                historical_df['interval'] = crypto.intervals[j]
                historical_df['symbol'] = crypto.symbols[i]
                historical_df[['high', 'low', 'close']] = historical_df[['high', 'low', 'close']].apply(pd.to_numeric, axis=1)
                historical_df['time'] = pd.to_datetime(historical_df['time'] / 1000, unit='s')
                historical_list.append(historical_df.to_dict())
        return historical_list

注意:我将列表转换为字典,然后再将字典的值转换为列表。这可能会使过程减慢很多?也许有一种更简单的方法可以做到这一点,但我不知道如何。这不是主要关注点。

    @staticmethod
    def refactor_list(historical_list):
        historical_list_refactored = []
        for i in range(len(historical_list)):
            single_key_data = historical_list[i]
            single_key_data['high'] = list(single_key_data['high'].values())
            single_key_data['low'] = list(single_key_data['low'].values())
            single_key_data['close'] = list(single_key_data['close'].values())
            single_key_data['interval'] = list(single_key_data['interval'].values())
            single_key_data['symbol'] = list(single_key_data['symbol'].values())
            single_key_data['time'] = list(single_key_data['time'].values())
            historical_list_refactored.append(single_key_data)
        return historical_list_refactored

符号列表:

['1INCHBTC', 'AAVEBTC', 'ACMBTC', 'ADABTC', 'ADXBTC', 'AERGOBTC', 'AGIXBTC', 'AGLDBTC', 'AIONBTC', 'AKROBTC', 'ALGOBTC', 'ALICEBTC', 'ALPACABTC', 'ALPHABTC', 'AMBBTC', 'ANKRBTC', 'ANTBTC', 'APPCBTC', 'ARDRBTC', 'ARPABTC', 'ARBTC', 'ARKBTC', 'ASRBTC', 'ASTBTC', 'ATABTC', 'ATMBTC', 'ATOMBTC', 'AUCTIONBTC', 'AUDIOBTC', 'AUTOBTC', 'AVABTC', 'AVAXBTC', 'AXSBTC', 'BADGERBTC', 'BAKEBTC', 'BALBTC', 'BANDBTC', 'BARBTC', 'BATBTC', 'BCDBTC', 'BEAMBTC', 'BELBTC', 'BETABTC', 'BLZBTC', 'BNBBTC', 'BNTBTC', 'BNXBTC', 'BONDBTC', 'BRDBTC', 'BTCSTBTC', 'BTGBTC', 'BTSBTC', 'BZRXBTC', 'C98BTC', 'CAKEBTC', 'CELOBTC', 'CELRBTC', 'CFXBTC', 'CHESSBTC', 'CHRBTC', 'CHZBTC', 'CITYBTC', 'CKBBTC', 'CLVBTC', 'CNDBTC', 'COMPBTC', 'COSBTC', 'COTIBTC', 'CRVBTC', 'CTKBTC', 'CTSIBTC', 'CTXCBTC', 'CVCBTC', 'DARBTC', 'DASHBTC', 'DATABTC', 'DCRBTC', 'DEGOBTC', 'DGBBTC', 'DIABTC', 'DNTBTC', 'DOCKBTC', 'DODOBTC', 'DOGEBTC', 'DOTBTC', 'DREPBTC', 'DUSKBTC', 'DYDXBTC', 'EGLDBTC', 'ELFBTC', 'ENJBTC', 'ENSBTC', 'EOSBTC', 'EPSBTC', 'ETCBTC', 'ETHBTC', 'EVXBTC', 'EZBTC', 'FARMBTC', 'FETBTC', 'FIDABTC', 'FILBTC', 'FIOBTC', 'FIROBTC', 'FISBTC', 'FLMBTC', 'FLOWBTC', 'FORBTC', 'FORTHBTC', 'FRONTBTC', 'FTMBTC', 'FTTBTC', 'FUNBTC', 'FXSBTC', 'GALABTC', 'GASBTC', 'GLMBTC', 'GNOBTC', 'GOBTC', 'GRSBTC', 'GRTBTC', 'GTCBTC', 'GTOBTC', 'GXSBTC', 'HARDBTC', 'HBARBTC', 'HIVEBTC', 'HNTBTC', 'ICPBTC', 'ICXBTC', 'IDEXBTC', 'ILVBTC', 'INJBTC', 'IOSTBTC', 'IOTABTC', 'IOTXBTC', 'IRISBTC', 'JASMYBTC', 'JSTBTC', 'JUVBTC', 'KAVABTC', 'KEEPBTC', 'KLAYBTC', 'KMDBTC', 'KNCBTC', 'KSMBTC', 'LAZIOBTC', 'LINABTC', 'LINKBTC', 'LITBTC', 'LOOMBTC', 'LPTBTC', 'LRCBTC', 'LSKBTC', 'LTCBTC', 'LTOBTC', 'LUNABTC', 'MANABTC', 'MATICBTC', 'MBOXBTC', 'MDABTC', 'MDTBTC', 'MDXBTC', 'MINABTC', 'MIRBTC', 'MITHBTC', 'MKRBTC', 'MLNBTC', 'MOVRBTC', 'MTHBTC', 'MTLBTC', 'NANOBTC', 'NASBTC', 'NAVBTC', 'NEARBTC', 'NEBLBTC', 'NEOBTC', 'NKNBTC', 'NMRBTC', 'NUBTC', 'NULSBTC', 'NXSBTC', 'OAXBTC', 'OCEANBTC', 'OGBTC', 'OGNBTC', 'OMBTC', 'OMGBTC', 'ONEBTC', 'ONGBTC', 'ONTBTC', 'ORNBTC', 'OXTBTC', 'PAXGBTC', 'PERLBTC', 'PERPBTC', 'PHABTC', 'PHBBTC', 'PIVXBTC', 'PNTBTC', 'POLSBTC', 'POLYBTC', 'PONDBTC', 'PORTOBTC', 'POWRBTC', 'PROMBTC', 'PSGBTC', 'QIBTC', 'QKCBTC', 'QLCBTC', 'QNTBTC', 'QSPBTC', 'QTUMBTC', 'QUICKBTC', 'RADBTC', 'RAMPBTC', 'RAREBTC', 'RDNBTC', 'REEFBTC', 'RENBTC', 'RENBTCBTC', 'REPBTC', 'REQBTC', 'RGTBTC', 'RIFBTC', 'RLCBTC', 'ROSEBTC', 'RSRBTC', 'RUNEBTC', 'RVNBTC', 'SANDBTC', 'SCBTC', 'SCRTBTC', 'SFPBTC', 'SKLBTC', 'SNMBTC', 'SNTBTC', 'SNXBTC', 'SOLBTC', 'SRMBTC', 'SSVBTC', 'STEEMBTC', 'STMXBTC', 'STORJBTC', 'STPTBTC', 'STRAXBTC', 'STXBTC', 'SUPERBTC', 'SUSHIBTC', 'SXPBTC', 'SYSBTC', 'TCTBTC', 'TFUELBTC', 'THETABTC', 'TKOBTC', 'TLMBTC', 'TOMOBTC', 'TORNBTC', 'TRBBTC', 'TRIBEBTC', 'TRUBTC', 'TRXBTC', 'TVKBTC', 'TWTBTC', 'UMABTC', 'UNFIBTC', 'UNIBTC', 'UTKBTC', 'VETBTC', 'VGXBTC', 'VIBBTC', 'VIDTBTC', 'VITEBTC', 'WABIBTC', 'WANBTC', 'WAVESBTC', 'WAXPBTC', 'WBTCBTC', 'WINGBTC', 'WNXMBTC', 'WRXBTC', 'WTCBTC', 'XEMBTC', 'XLMBTC', 'XMRBTC', 'XRPBTC', 'XTZBTC', 'XVGBTC', 'XVSBTC', 'YFIBTC', 'YFIIBTC', 'YGGBTC', 'YOYOBTC', 'ZECBTC', 'ZENBTC', 'ZILBTC', 'ZRXBTC']

【问题讨论】:

  • 您使用什么库从远程服务器获取数据?从您的代码示例中不清楚您是如何获取所有数据的。
  • @ArtiomKozyrev 我使用 binance-client 库来获取历史数据。您可以在以下行中看到:`history_data = client.get_historical_klines(crypto.symbols[i], crypto.intervals[j], '11/19/2021', limit=1000)[-22:-1] `
  • @Jellevs 是 python-binance 还是其他?我找不到任何名为 binance-client 的流行库
  • @ArtiomKozyrev 是的,它是 python-binance。它是这样导入的:from binance import Client
  • @Jellevs 我为您创建了工作示例,它展示了如何更快地获取数据。但是您仍然需要添加您想要对结果数据执行的操作。

标签: python multithreading python-asyncio


【解决方案1】:

这是您可以使用python-binance向币安API服务器发送多个请求的方法

import asyncio
from binance import AsyncClient

RESULTS = []  # let's store all results here


class GetAllBinanceData:
    def __init__(self, workers_num: int = 10):
        self.workers_num: int = workers_num
        self.task_q: asyncio.Queue = asyncio.Queue(maxsize=10)

    async def get_symbols_from_somewhere(self):
        """Get symbols and distribute them among workers"""
        # imagine the symbols are from some file
        symbols = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100
        for i in symbols:
            await self.task_q.put(i)

        for i in range(self.workers_num):
            await self.task_q.put(None)

    async def get_historical_klines(self, client: AsyncClient):
        """Get data and print it"""
        while True:
            symbol = await self.task_q.get()
            if symbol is None:
                break
            klines = await client.get_historical_klines(
                symbol=symbol,
                interval=AsyncClient.KLINE_INTERVAL_1MINUTE,
                start_str="2021-11-23 10:00:00",
                end_str="2021-11-23 10:01:00"
            )
            print(klines)  # just print
            RESULTS.append(klines)  # send somewhere else

    async def amain(self) -> None:
        """Main async wrapper fucntion"""
        client = await AsyncClient.create()
        await asyncio.gather(
            self.get_symbols_from_somewhere(),
            *[self.get_historical_klines(client) for _ in range(self.workers_num)]
        )

        await client.close_connection()


if __name__ == "__main__":
    loop = asyncio.get_event_loop()
    loop.run_until_complete(GetAllBinanceData().amain())
    print("*" * 100)
    print(RESULTS)

另一种非常直接且效率较低的方法。并不是说当你创建多​​个连接时 Binance 会生气并开始无视你。

from binance import Client
from concurrent.futures import ThreadPoolExecutor

RESULTS = []  # let's store all results here
SYMBOLS = ["BNBBTC", "ETHBTC", "NEOBTC"] * 100


def get_historical_klines(symbol):
    try:
        client = Client()

        klines = client.get_historical_klines(
            symbol=symbol,
            interval=Client.KLINE_INTERVAL_1MINUTE,
            start_str="2021-11-23 10:00:00",
            end_str="2021-11-23 10:01:00"
        )
        print(klines)  # just print
        RESULTS.append(klines)  # send somewhere else
    finally:
        client.close_connection()


if __name__ == "__main__":
    with ThreadPoolExecutor(max_workers=10) as pool:
        pool.map(get_historical_klines, SYMBOLS)

    print("*" * 100)
    print(len(RESULTS))

【讨论】:

  • 非常感谢您的所有努力!我确实用踩踏法禁止了IP。我可能不得不尝试理解异步方法。你有什么好的教程推荐吗?
  • @Jellevs老实说,在长时间寻找关于asyncio的最佳教程之后,我不幸得出结论,最好的教程是 Python 3.8、3.9 等后期版本的官方文档。你可以查看 RealPython 教程realpython.com/async-io-python但还是比官方文档差。
  • @Jellevs 我想说理解asyncio 的最好方法是将其视为智能while 循环,它可以在任务之间跳转。
  • 好的,谢谢!我要去尝试理解你发给我的代码!
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2021-10-01
  • 2021-02-13
相关资源
最近更新 更多