【问题标题】:How to restart tweepy script in case of error?如果出现错误,如何重新启动 tweepy 脚本?
【发布时间】:2014-06-29 09:05:41
【问题描述】:

我有一个 python 脚本,可以将与跟踪关键字相关的推文连续存储到文件中。但是,由于下面附加的错误,该脚本往往会反复崩溃。如何编辑脚本以使其自动重新启动?我见过很多解决方案,包括这个 (Restarting a program after exception),但我不确定如何在我的脚本中实现它。

import sys
import tweepy
import json
import os

consumer_key=""
consumer_secret=""
access_key = ""
access_secret = ""

auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_key, access_secret)
api = tweepy.API(auth)
# directory that you want to save the json file
os.chdir("C:\Users\json_files")
# name of json file you want to create/open and append json to
save_file = open("12may.json", 'a')

class CustomStreamListener(tweepy.StreamListener):
    def __init__(self, api):
        self.api = api
        super(tweepy.StreamListener, self).__init__()

        # self.list_of_tweets = []

    def on_data(self, tweet):
        print tweet
        save_file.write(str(tweet))

    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
        print "Stream restarted"

    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream
        print "Stream restarted"

sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
sapi.filter(track=["test"])

================================================ ==============================

Traceback (most recent call last):
  File "C:\Users\tweets_to_json.py", line 41, in <module>
    sapi.filter(track=["test"])
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 316, in filter
    self._start(async)
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 235, in _start
    self._run()
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 165, in _run
    self._read_loop(resp)
  File "C:\Python27\lib\site-packages\tweepy-2.3-py2.7.egg\tweepy\streaming.py", line 206, in _read_loop
    for c in resp.iter_content():
  File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\models.py", line 541, in generate
    chunk = self.raw.read(chunk_size, decode_content=True)
  File "C:\Python27\lib\site-packages\requests-1.2.3-py2.7.egg\requests\packages\urllib3\response.py", line 171, in read
    data = self._fp.read(amt)
  File "C:\Python27\lib\httplib.py", line 543, in read
    return self._read_chunked(amt)
  File "C:\Python27\lib\httplib.py", line 603, in _read_chunked
    value.append(self._safe_read(amt))
  File "C:\Python27\lib\httplib.py", line 660, in _safe_read
    raise IncompleteRead(''.join(s), amt)
IncompleteRead: IncompleteRead(0 bytes read, 1 more expected)

【问题讨论】:

  • 发生崩溃时是否进入on_error
  • 我不这么认为,因为它不打印'Encountered error with status code:'
  • 尝试将 sapi=tweepy 放入 try 和 except..但这不是好方法。试试吧
  • (''.join(s), amt) 你的程序中的这一行
  • 这样吗? while True: try: sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api)) sapi.filter(track=["Sony", "Xperia", "Samsung", "s4", "s5", "note" "3", "HTC", "Blackberry", "q5", "q10", "z10", "Nokia", "Lumia", "Nexus", "LG", "Huawei", "Motorola"]) except: pass

标签: python restart tweepy


【解决方案1】:

想出了如何通过为流编写一个新函数来合并 while/try 循环:

def start_stream():
    while True:
        try:
            sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
            sapi.filter(track=["Samsung", "s4", "s5", "note" "3", "HTC", "Sony", "Xperia", "Blackberry", "q5", "q10", "z10", "Nokia", "Lumia", "Nexus", "LG", "Huawei", "Motorola"])
        except: 
            continue

start_stream()

我通过使用 CMD + C 手动中断程序来测试自动重启。不过,很高兴听到更好的方法来测试此类功能。

【讨论】:

  • 我必须抓住 KeyboardInterrupt 才能退出脚本,除了 KeyboardInterrupt: break
【解决方案2】:

我最近遇到了这个问题,想分享更多关于它的详细信息。

导致它的错误是因为选择的流过滤器太宽test。因此,您接收流的速度超出了您可以接受的速度,这会导致 IncompleRead 错误。

这可以通过优化搜索或使用更具体的异常来解决:

from http.client import IncompleteRead
...
try:
    sapi = tweepy.streaming.Stream(auth, CustomStreamListener(api))
    sapi.filter(track=["test"])
except IncompleRead:
    pass

【讨论】:

    【解决方案3】:

    最好使用递归调用而不是无限while循环。看看下面的 filter 功能。例如

    from tweepy import Stream
    from service.twitter.listener.tweety_listener import TweetyStreamDataListener
    from settings import twitter_config
    
    class Tweety(object):
        def __init__(self, listener=TweetyStreamDataListener()):
            self.listener = listener
            self.__auth__ = None
    
        def __authenticate__(self):
            from tweepy import OAuthHandler
            if self.__auth__ is None:
                self.__auth__ = OAuthHandler(twitter_config['consumer_key'], twitter_config['consumer_secret'])
                self.__auth__.set_access_token(twitter_config['access_token'], twitter_config['access_token_secret'])
            return self.__auth__ is not None
    
        def __streamer__(self):
            is_authenticated = self.__authenticate__()
            if is_authenticated:
                return Stream(self.__auth__, self.listener)
            return None
    
        def filter(self, keywords=None, async=True):
            streamer = self.__streamer__()
            try:
                print "[STREAM] Started steam"
                streamer.filter(track=keywords, async=async)
            except Exception as ex:
                print "[STREAM] Stream stopped! Reconnecting to twitter stream"
                print ex.message, ex.args
                self.filter(keywords=keywords, async=async)
    

    【讨论】:

    • 我认为正好相反:用递归而不是循环来处理这种情况最终会导致内存问题,特别是对于设计为长时间生产的服务。
    【解决方案4】:

    一种选择是尝试modulemultiprocessing。我会争论有两个原因。

    1. 能够在设定的时间段内运行进程,而不必“杀死”整个脚本/进程。
    2. 您可以将它放在一个 for 循环中,并让它在它死亡或您选择杀死它时重新开始。

    我完全采用了不同的方法,但这部分是因为我定期(或应该定期)保存我的推文。 @Eugeune Yan,我认为try except 是一种简单而优雅的处理问题的方法。虽然,希望有人对此发表评论;你真的不知道该方法何时或是否失败,但如果这真的很重要(而且很容易写几行来实现)。

    import tiipWriter #Twitter & Textfile writer I wrote with Tweepy.
    from add import ThatGuy # utility to supply log file names that won't overwrite old ones.
    import multiprocessing
    
    
    if __name__ == '__main__':
            #number of time increments script needs to run        
            n = 60
            dir = "C:\\Temp\\stufffolder\\twiitlog"
            list = []
            print "preloading logs"
            ThatGuy(n,dir,list) #Finds any existing logs in the folder and one-ups it
    
            for a in list:
                print "Collecting Tweets....."
                # this is my twitter/textfile writer process
                p = multiprocessing.Process(target=tiipWriter.tiipWriter,args = (a,)) 
                p.start()
                p.join(1800) # num of seconds the process will run
                if p.is_alive():
                    print " \n Saving Twitter Stream log   @  " + str(a)
                    p.terminate()
                    p.join()
                a = open(a,'r')
                a.close()
                if a.closed == True:
                    print "File successfully closed"
                else: a.close()
                print "jamaica" #cuz why not
    

    【讨论】:

      【解决方案5】:

      我使用 tweepy 编写了一个 2 进程流式传输。它下载、压缩并将数据转储到每小时轮换的文件中。 该程序每小时重新启动一次,它可以定期检查流媒体进程,以查看是否下载了任何新推文。 如果没有,它会重新启动整个系统。

      代码可以在here找到。 请注意,对于压缩,它使用管道。如果不需要压缩,修改源很容易。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2013-12-04
        • 1970-01-01
        相关资源
        最近更新 更多