【问题标题】:Exception error in EReader thread running ibapi运行 ibapi 的 EReader 线程中的异常错误
【发布时间】:2019-12-24 13:58:58
【问题描述】:

我正在运行 Python ib-api 来接收来自盈透证券的实时市场数据。它可以提供我期望的数据,但以“EReader 线程中未处理的异常”结尾。

from ibapi.client import EClient
from ibapi.wrapper import EWrapper
from ibapi.contract import Contract as IBcontract
from threading import Thread
import queue
import pandas as pd
from ibapi.ticktype import TickTypeEnum`

`DEFAULT_PRICE_DATA_ID = 1001`

`FINISHED = object()
STARTED = object()
TIME_OUT = object()`

class finishableQueue(object):

    def __init__(self, queue_to_finish):

        self._queue = queue_to_finish
        self.status = STARTED

    def get(self, timeout):

        contents_of_queue=[]
        finished=False

        while not finished:
            try:
                current_element = self._queue.get(timeout=timeout)
                if current_element is FINISHED:
                    finished = True
                    self.status = FINISHED
                else:
                    contents_of_queue.append(current_element)

            except queue.Empty:
                finished = True
                self.status = TIME_OUT

        return contents_of_queue

    def timed_out(self):
        return self.status is TIME_OUT


class TestWrapper(EWrapper):

    def __init__(self):
        self._my_price_data_dict = {}

    def get_error(self, timeout=5):
        if self.is_error():
            try:
                return self._my_errors.get(timeout=timeout)
            except queue.Empty:
                return None

        return None

    def is_error(self):
        an_error_if=not self._my_errors.empty()
        return an_error_if

    def init_error(self):
        error_queue=queue.Queue()
        self._my_errors = error_queue

    def error(self, id, errorCode, errorString):
        ## Overriden method
        errormsg = "IB error id %d errorcode %d string %s" % (id, errorCode, errorString)
        self._my_errors.put(errormsg)

    def init_ibprices(self, tickerid):
        ibprice_data_queue = self._my_price_data_dict[tickerid] = queue.Queue()

        return ibprice_data_queue

    def tickPrice(self, reqId, tickType, price, attrib):
        tickdata = (TickTypeEnum.to_str(tickType), price)

        price_data_dict = self._my_price_data_dict

        if reqId not in price_data_dict.keys():
            self.init_ibprices(reqId)

        price_data_dict[reqId].put(tickdata)


class TestClient(EClient):

    def __init__(self, wrapper):
        EClient.__init__(self, wrapper)

    def error(self, reqId, errorCode, errorString):
        print("Error: ", reqId, " ", errorCode, " ", errorString)

    def getIBrealtimedata(self, ibcontract, tickerid=DEFAULT_PRICE_DATA_ID):
        ib_data_queue = finishableQueue(self.init_ibprices(tickerid))

        self.reqMktData(
            tickerid,
            ibcontract,
            "",
            False,
            False,
            []
        )

        MAX_WAIT_SECONDS = 5
        print("Getting data from the server... could take %d seconds to complete " % MAX_WAIT_SECONDS)

        price_data = ib_data_queue.get(timeout = MAX_WAIT_SECONDS)

        while self.wrapper.is_error():
            print(self.get_error())

        if ib_data_queue.timed_out():
            print("Exceeded maximum wait for wrapper to confirm finished - seems to be normal behaviour")

        self.cancelMktData(tickerid)

        return price_data

class TestApp(TestWrapper, TestClient):
    def __init__(self, ipaddress, portid, clientid):
        TestWrapper.__init__(self)
        TestClient.__init__(self, wrapper=self)

        self.connect(ipaddress, portid, clientid)

        thread = Thread(target = self.run)
        thread.start()

        setattr(self, "_thread", thread)

        self.init_error()

def main(slist):

    app = TestApp("127.0.0.1", 7497, 1)

    for i in slist:
        ibcontract = IBcontract()
        ibcontract.secType = "STK"
        ibcontract.symbol = i
        ibcontract.exchange ="SEHK"

        Lastprice = app.getIBrealtimedata(ibcontract)

        df = pd.DataFrame(Lastprice)
        print(ibcontract.symbol, df.head())

    app.disconnect()

if __name__ == "__main__":

    seclist = [700,2318,5,12]
    main(seclist)

以下是错误消息:

EReader 线程中未处理的异常 回溯(最近一次通话最后): 文件“D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\reader.py”,第 34 行,>运行中 数据 = self.conn.recvMsg() 文件“D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py”,行 >99,在 recvMsg buf = self._recvAllMsg() 文件“D:\Anaconda3\envs\myweb\lib\site-packages\ibapi\connection.py”,行 >119,在 _recvAllMsg buf = self.socket.recv(4096) OSError: [WinError 10038] 尝试对>不是套接字

的东西进行操作

【问题讨论】:

    标签: python-3.x ib-api


    【解决方案1】:

    启动一个单独的线程来读取来自套接字的传入消息:

    thread = Thread(target = self.run)
    thread.start()
    

    但是这个线程永远不会停止,当你调用disconnect()时它仍然在运行。结果,它尝试访问现在为 None 的套接字对象,从而触发错误。尝试通过设置 done=True 在断开连接之前停止 EReader 线程。

    附带说明,由于此错误发生在程序最后断开连接时,它不应干扰接收预期数据。

    【讨论】:

    • 我在 'app.disconnect()' 之前设置了一个 'app.done = True',错误依然存在。
    • 您可能需要升级您的 API 版本。 groups.io/g/twsapi/message/42580
    • 我在使用较新的 API 版本时遇到了类似的问题。我还尝试在我的代码上添加“app.done = True”,但什么也没发生。运行较旧的 API 版本时,错误会消失(请参阅此处:stackoverflow.com/questions/57618971/…)。但是,我注意到这个旧版本没有按应有的方式返回某些回调的数据。因此,到目前为止,我必须在次优的 API 回调库之间做出选择,或者在运行我的代码时总是收到错误消息。
    • connection.py 中进行了修改,尚未添加到正式版本中。错误已解决。请参考here
    【解决方案2】:

    避免警告的解决方法。

    在您的 EClient / EWrapper 子类中实现此功能:

    1. 创建套接字关闭函数:
    import socket, time
    [...]
    
        def _socketShutdown(self):
            self.conn.lock.acquire()
            try:
                if self.conn.socket is not None:
                    self.conn.socket.shutdown(socket.SHUT_WR)
            finally:
                self.conn.lock.release()
    
    1. 在关闭连接之前使用它:
        self._socketShutdown()
        time.sleep(1) 
        self.disconnect()
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2015-04-15
      • 1970-01-01
      • 2015-09-30
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多