【问题标题】:Running Time Estimate for Stream Twitter with Location Filter in Tweepy在 Tweepy 中使用位置过滤器估算流 Twitter 的运行时间
【发布时间】:2017-07-31 06:51:38
【问题描述】:

问题已解决,见文末解决方案

我需要帮助来估算我的 tweepy 程序使用位置过滤器调用 Twitter Stream API 的运行时间。

我启动它后,它已经运行了 20 多分钟,比我预期的要长。我是 Twitter Stream API 的新手,并且只使用了 REST API 几天。在我看来,REST API 会在几秒钟内给我 50 条推文,很简单。但是这个 Stream 请求需要更多时间。我的程序没有死在我身上或出现任何错误。所以不知道是不是有什么问题。如果有,请指出。

总之,如果你认为我的代码是正确的,你能提供一个运行时间的估计吗?如果您认为我的代码有误,您能帮我修复它吗?

提前谢谢你!

代码如下:

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

box = [-86.33,41.63,-86.20,41.74]

class CustomStreamListener(tweepy.StreamListener):
    def on_error(self, status_code):
        print >> sys.stderr, 'Encountered error with status code:', status_code
        return True # Don't kill the stream
    def on_timeout(self):
        print >> sys.stderr, 'Timeout...'
        return True # Don't kill the stream

stream = tweepy.streaming.Stream(auth, CustomStreamListener()).filter(locations=box).items(50)
stream

我尝试了http://docs.tweepy.org/en/v3.4.0/auth_tutorial.html#auth-tutorial 中的方法,显然它对我不起作用……这是我的代码。您介意提供任何意见吗?如果您有一些工作代码,请告诉我。谢谢!

# Import Tweepy, sys, sleep, credentials.py
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text)
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
myStream.filter(track=['python'], locations=(box), async=True)

这是错误信息:

Traceback (most recent call last):
  File "test.py", line 26, in <module>
    myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
TypeError: 'MyStreamListener' object is not callable

问题已解决!请参阅下面的解决方案

经过另一轮调试,以下是可能对同一主题感兴趣的人的解决方案:

# Import Tweepy, sys, sleep, credentials.py
try:
    import json
except ImportError:
    import simplejson as json
import tweepy, sys
from time import sleep
from credentials import *

# Access and authorize our Twitter credentials from credentials.py
auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
auth.set_access_token(access_token, access_token_secret)
api = tweepy.API(auth)

# Assign coordinates to the variable
box = [-74.0,40.73,-73.0,41.73]

import tweepy
#override tweepy.StreamListener to add logic to on_status
class MyStreamListener(tweepy.StreamListener):

    def on_status(self, status):
        print(status.text.encode('utf-8'))
    def on_error(self, status_code):
        if status_code == 420:
            #returning False in on_data disconnects the stream
            return False

myStreamListener = MyStreamListener()
myStream = tweepy.Stream(api.auth, listener=myStreamListener)
myStream.filter(track=['NYC'], locations=(box), async=True)

【问题讨论】:

    标签: python twitter geolocation tweepy time-estimation


    【解决方案1】:

    核心问题: 我认为您误解了 Stream 的含义。

    Tl;dr: 您的代码正在运行,您只是没有对返回的数据做任何事情。

    其余 API 调用是对信息的单一调用。您提出请求,Twitter 会发回一些信息,这些信息会分配给您的变量。

    来自 Tweepy 的 StreamObject(您已创建为 stream)使用您的搜索参数打开与 twitter 的连接,然后 Twitter 将推文流式传输到它。永远。

    来自 Tweepy 文档:

    流 api 与 REST api 完全不同,因为 REST api 用于从 twitter 中提取数据,但流 api 将消息推送到持久会话。这允许流式 API 实时下载比使用 REST 完成的更多的数据 API。

    因此,您需要构建一个处理程序(streamListener,在 tweepy 的术语中),例如 this one that prints out the tweets.

    附加

    来自痛苦经验的警告词 - 如果您要尝试将推文保存到数据库中:Twitter 可以并且将会以比您将它们保存到数据库的速度更快的速度将对象流式传输给您。这将导致您的 Stream 断开连接,因为推文会在 Twitter 上备份,并且超过一定程度的备份(不是实际的短语),它们只会断开您的连接。

    我通过使用 django-rq 将保存的作业放入作业队列来处理这个问题 - 这样,我可以每秒处理数百条推文(在高峰期),并且它会变得平滑。你可以在下面看到我是如何做到的。如果您不使用 django 作为框架,Python-rq 也可以使用。 read both 方法只是一个从推文中读取并将其保存到 postgres 数据库的函数。在我的具体情况下,我通过 Django ORM 使用 django_rq.enqueue 函数来做到这一点。

    __author__ = 'iamwithnail'
    
    from django.core.management.base import BaseCommand, CommandError
    from django.db.utils import DataError
    from harvester.tools import read_both
    import django_rq
    
    class Command(BaseCommand):
    
        args = '<search_string search_string>'
        help = "Opens a listener to the Twitter stream, and tracks the given string or list" \
               "of strings, saving them down to the DB as they are received."
    
    
        def handle(self, *args, **options):
            try:
                import urllib3.contrib.pyopenssl
                urllib3.contrib.pyopenssl.inject_into_urllib3()
            except ImportError:
                pass
    
            consumer_key = '***'
            consumer_secret = '****'
            access_token='****'
            access_token_secret_var='****'
            import tweepy
            import json
    
            # This is the listener, responsible for receiving data
            class StdOutListener(tweepy.StreamListener):
                def on_data(self, data):
                    decoded = json.loads(data)
                    try:
                        if decoded['lang'] == 'en':
                            django_rq.enqueue(read_both, decoded)
                        else:
                            pass
                    except KeyError,e:
                        print "Error on Key", e
                    except DataError, e:
                        print "DataError", e
                    return True
    
    
                def on_error(self, status):
                    print status
    
    
            l = StdOutListener()
            auth = tweepy.OAuthHandler(consumer_key, consumer_secret)
            auth.set_access_token(access_token, access_token_secret_var)
            stream = tweepy.Stream(auth, l)
    stream.filter(track=args)
    

    编辑:你后续的问题是错误调用监听器造成的。

    myStreamListener = MyStreamListener() #creates an instance of your class
    

    你有这个:

    myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener())
    

    当您使用() 时,您正试图将侦听器作为函数调用。所以应该是:

    myStream = tweepy.Stream(auth = api.auth, listener=myStreamListener)
    

    事实上,也许可以更简洁地写成:

    myStream = tweepy.Stream(api.auth,myStreamListener)
    

    【讨论】:

    • 很好,@Withnail,我稍后会看看你的答案,并研究所有好的细节。提前感谢您的时间和关注!我有预感,这对我来说将是一个很好的对话和学习机会。兴奋。
    • 我尝试了您在docs.tweepy.org/en/v3.4.0/auth_tutorial.html#auth-tutorial 中提到的方法,显然它对我不起作用...我的测试代码已添加到我上面的帖子中。您介意提供任何意见吗?如果您有一些工作代码,请告诉我。谢谢!
    • 查看修改后的答案!
    • 非常感谢。删除()后,它起作用了。周末愉快。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2017-07-03
    • 2017-08-02
    • 2015-08-25
    • 2016-11-11
    • 2014-06-28
    • 2021-12-16
    相关资源
    最近更新 更多