【问题标题】:Twitter Streaming API - urllib3.exceptions.ProtocolError: ('Connection broken: IncompleteReadTwitter 流 API - urllib3.exceptions.ProtocolError: ('连接中断:IncompleteRead
【发布时间】:2019-04-18 23:51:31
【问题描述】:

使用 tweepy 运行 python 脚本,该脚本在英语推文的随机样本中流式传输(使用 twitter 流 API)一分钟,然后交替搜索(使用 twitter 搜索 API)一分钟,然后返回。我发现的问题是大约 40 多秒后流式传输崩溃并出现以下错误:

完全错误:

urllib3.exceptions.ProtocolError: ('连接中断: IncompleteRead(0 bytes read)', IncompleteRead(0 bytes read))

读取的字节数可以在 0 到 1000 之间变化。

第一次看到流过早中断并且搜索功能提前启动时,搜索功能完成后它再次返回流,并且在此错误的第二次重复出现时代码崩溃。

我运行的代码是:

# Handles date time calculation
def calculateTweetDateTime(tweet):
    tweetDateTime = str(tweet.created_at)

    tweetDateTime = ciso8601.parse_datetime(tweetDateTime)
    time.mktime(tweetDateTime.timetuple())
    return tweetDateTime

# Checks to see whether that permitted time has past.
def hasTimeThresholdPast():
    global startTime
    if time.clock() - startTime > 60:
        return True
    else:
        return False

#override tweepy.StreamListener to add logic to on_status
class StreamListener(StreamListener):

    def on_status(self, tweet):
        if hasTimeThresholdPast():
            return False

        if hasattr(tweet, 'lang'):
            if tweet.lang == 'en':

                try:
                    tweetText = tweet.extended_tweet["full_text"]
                except AttributeError:
                    tweetText = tweet.text

                tweetDateTime = calculateTweetDateTime(tweet)

                entityList = DataProcessing.identifyEntities(True, tweetText)
                DataStorage.storeHotTerm(entityList, tweetDateTime)
                DataStorage.storeTweet(tweet)


    def on_error(self, status_code):
        def on_error(self, status_code):
            if status_code == 420:
                # returning False in on_data disconnects the stream
                return False


def startTwitterStream():

    searchTerms = []

    myStreamListener = StreamListener()
    twitterStream = Stream(auth=api.auth, listener=StreamListener())
    global geoGatheringTag
    if geoGatheringTag == False:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an'], async=True, stall_warnings=True)

    if geoGatheringTag == True:
        twitterStream.filter(track=['the', 'this', 'is', 'their', 'though', 'a', 'an', 'they\'re'],
                             async=False, locations=[-4.5091, 55.7562, -3.9814, 55.9563], stall_warnings=True)



# ----------------------- Twitter API Functions ------------------------
# XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
# --------------------------- Main Function ----------------------------

startTime = 0


def main():
    global startTime
    userInput = ""
    userInput.lower()
    while userInput != "-1":
        userInput = input("Type ACTiVATE to activate the Crawler, or DATABASE to access data analytic option (-1 to exit): \n")
        if userInput.lower() == 'activate':
            while(True):
                startTime = time.clock()

                startTwitterStream()

                startTime = time.clock()
                startTwitterSearchAPI()

if __name__ == '__main__':
    main() 

我已经删除了搜索功能和数据库处理方面,因为它们是独立的,以避免代码混乱。

如果有人知道为什么会发生这种情况以及我如何解决它,请告诉我,我很想知道任何见解。


我尝试过的解决方案:
带有 http.client.IncompleteRead:
的 Try/Except 块 根据Error-while-fetching-tweets-with-tweepy

将 Stall_Warning = 设置为 True:
根据Incompleteread-error-when-retrieving-twitter-data-using-python

删除英语过滤器。

【问题讨论】:

    标签: python twitter tweepy twitter-streaming-api


    【解决方案1】:

    已解决。

    致那些好奇或遇到类似问题的人:经过一些实验,我发现传入推文的积压是问题所在。每次系统收到一条推文时,我的系统都会运行一个实体识别和存储过程,这会花费一小段时间,并且随着收集数百到数千条推文的时间,这个积压越来越大,直到 API 无法处理它并且抛出了那个错误。

    解决方案:将“on_status/on_data/on_success”函数剥离到最基本的部分,并在流会话关闭后单独处理任何计算,即存储或实体识别。或者,您可以让您的计算更加高效,并让时间差距变得微不足道,这取决于您。

    【讨论】:

    • 这对我帮助很大,我也遇到了同样的问题。基本上解决方案是只转储数据并按照您正确提到的那样单独进行处理..
    • 您好。谢谢你,但是剥离“on_status/on_data/on_success”功能是什么意思?我想我对您甚至没有在 StreamListener 中实现该功能这一事实感到困惑。
    • @SteakOverflow 您好,所以“on_status”是在 SteamListener 下声明的第一个函数:“class StreamListener(StreamListener): def on_status(self, tweet):” 我用 on_data 提供的其他名称/on_success 是该类型函数的常用替代名称。无论您选择什么名称,关键是在流处于活动状态时最大限度地减少对数据进行的处理强度,因为它可能会使其过载并导致其崩溃。无论您有什么功能正在读取数据,都将被归类为“on_data”功能。
    • 如何查看 tweepy 的积压?我不太确定如何验证该过程。
    【解决方案2】:

    我只是根据关注用户 Chris Cookman 的结果来分享我的经验。按照他的建议做了之后,我和你遇到的同样的问题就消失了。但就我而言,我将它与 discord.py 一起使用。所以我所做的是创建一个通用列表 (status_list),每当 tweepy on_status 启动时,它都会附加到该通用列表中。

    然后我使用 discord.py 设置了一个@tasks.loop(seconds=10) 来每隔几秒监视 status_list 是否不为空,然后如果它检测到它有内容,它将循环遍历它然后启动每个列表的进程。

    【讨论】:

      猜你喜欢
      • 2021-09-13
      • 2022-01-22
      • 1970-01-01
      • 2018-08-10
      • 2013-03-21
      • 2019-08-04
      • 2019-01-18
      • 1970-01-01
      相关资源
      最近更新 更多