【问题标题】:run crypto feed in a concurrent thread: There is no current event loop in thread 'ThreadPoolExecutor-0_0'在并发线程中运行加密馈送:线程“ThreadPoolExecutor-0_0”中没有当前事件循环
【发布时间】:2021-09-24 02:49:00
【问题描述】:

我正在尝试使用加密提要同时下载数据。

f = FeedHandler()
f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
f.run()

上面这段代码可以成功运行。但是,我试图在后台运行它。所以我正在使用并发期货来提供帮助。

executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(f.run)

但是,我得到了错误:

job2.result()


---------------------------------------------------------------------------
RuntimeError                              Traceback (most recent call last)
<ipython-input-54-f96e35ee3c66> in <module>
----> 1 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    430                 raise CancelledError()
    431             elif self._state == FINISHED:
--> 432                 return self.__get_result()
    433 
    434             self._condition.wait(timeout)

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-0_0'.

谁能帮帮我?非常感谢!

编辑:关注

def threadable():    
    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()


executor = concurrent.futures.ThreadPoolExecutor(16)
job2 = executor.submit(threadable)
job2.done()
job2.result()

我收到了错误:似乎我仍然收到有关事件循环的相同错误...可以解决吗?

RuntimeError                              Traceback (most recent call last)
<ipython-input-47-05c023dd326f> in <module>
     11 job2.done()
     12 
---> 13 job2.result()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in result(self, timeout)
    437                 raise CancelledError()
    438             elif self._state == FINISHED:
--> 439                 return self.__get_result()
    440             else:
    441                 raise TimeoutError()

~/anaconda3/lib/python3.8/concurrent/futures/_base.py in __get_result(self)
    386     def __get_result(self):
    387         if self._exception:
--> 388             raise self._exception
    389         else:
    390             return self._result

~/anaconda3/lib/python3.8/concurrent/futures/thread.py in run(self)
     55 
     56         try:
---> 57             result = self.fn(*self.args, **self.kwargs)
     58         except BaseException as exc:
     59             self.future.set_exception(exc)

<ipython-input-47-05c023dd326f> in threadable()
      2     f = FeedHandler()
      3     f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
----> 4     f.run()
      5 
      6 

~/anaconda3/lib/python3.8/site-packages/cryptofeed/feedhandler.py in run(self, start_loop, install_signal_handlers, exception_handler)
    145             raise ValueError(txt)
    146 
--> 147         loop = asyncio.get_event_loop()
    148         # Good to enable when debugging or without code change: export PYTHONASYNCIODEBUG=1)
    149         # loop.set_debug(True)

~/anaconda3/lib/python3.8/asyncio/events.py in get_event_loop(self)
    637 
    638         if self._local._loop is None:
--> 639             raise RuntimeError('There is no current event loop in thread %r.'
    640                                % threading.current_thread().name)
    641 

RuntimeError: There is no current event loop in thread 'ThreadPoolExecutor-1_0'.

【问题讨论】:

    标签: python concurrency python-asyncio


    【解决方案1】:

    在您的代码的单线程版本中,所有这三个语句都以简单的顺序方式在同一个线程中执行:

    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    f.run()
    

    在多线程版本中,您只将最后一行提交给 Executor,因此它将在辅助线程中运行。但是,据我从您提供的代码中可以看出,这些语句仍然在主线程中执行:

    f = FeedHandler()
    f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
    

    你怎么知道它会起作用?一般来说,这将取决于 Gateio 和 Feedhandler 的实现细节。您需要非常小心地将程序分割成片段以在不同的线程中运行,尤其是在涉及第三方库调用时。所以,祝你好运。

    你可以试试这个:

    def threadable():
        f = FeedHandler()
        f.add_feed(Gateio(channels=[TRADES], symbols=list_tmp, callbacks={ TRADES: TradePostgresGateio(**postgres_cfg)}))
        f.run()
    
    ...
    
    executor = concurrent.futures.ThreadPoolExecutor(16)
    job2 = executor.submit(threadable)
    

    然后,至少,您的整个步骤序列将在 SAME 线程中执行。

    不过,我会担心这些回调。它们现在将在辅助线程中运行,您需要了解其后果。他们是否与用户界面程序交互?您的 UI 可能不支持多线程。

    这里使用 Executor 协议有点奇怪,因为你的函数没有返回值。 Executor 在用于聚合返回值时最有用。您最好使用 threading 模块中的方法启动所需的线程。

    【讨论】:

    • 非常感谢您的帮助。请查看我的编辑...我仍然收到错误...
    • 进一步思考:函数asyncio.get_event_loop() 的工作方式不同,具体取决于它是从主线程调用还是从辅助线程调用。在主线程中,函数调用总是成功。在辅助线程中,如果您没有为该线程显式设置事件循环,它将失败。因此,尝试在threadable 的开头插入两行: (1) loop = asyncio.new_event_loop() (2) asyncio.set_event_loop(loop)。这是猜测,但如果它有效,你就在做生意。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-07-13
    • 2020-06-13
    • 2021-02-20
    • 1970-01-01
    相关资源
    最近更新 更多