【问题标题】:How do I loop my python code for Twitter API v2 recent search?如何为 Twitter API v2 最近搜索循环我的 python 代码?
【发布时间】:2021-03-14 11:51:55
【问题描述】:

我对 python 很陌生,所以我正在寻求解决这个问题的帮助。我的目标是收集大约 10,000 条包含图像的推文并将其保存到 csv 文件中。由于 Twitter 的速率限制是每 15 分钟 450 个请求,理想情况下我想自动化这个过程。我看到的指南只使用了 tweepy 模块,但由于我不太了解它,所以我使用了 Twitter 上给出的示例 python 代码:

import requests
import pandas as pd
import os
import json

# To set your enviornment variables in your terminal run the following line:
os.environ['BEARER_TOKEN']=''


def auth():
    return os.environ.get("BEARER_TOKEN")


def create_url():
    query = "has:images lang:en -is:retweet"
    tweet_fields = "tweet.fields=attachments,created_at,author_id"
    expansions = "expansions=attachments.media_keys"
    media_fields = "media.fields=media_key,preview_image_url,type,url"
    max_results = "max_results=100"
    url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
        query, tweet_fields, expansions, media_fields, max_results
    )
    return url


def create_headers(bearer_token):
    headers = {"Authorization": "Bearer {}".format(bearer_token)}
    return headers


def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)
    print(response.status_code)
    if response.status_code != 200:
        raise Exception(response.status_code, response.text)
    return response.json()

def save_json(file_name, file_content):
    with open(file_name, 'w', encoding='utf-8') as write_file:
        json.dump(file_content, write_file, sort_keys=True, ensure_ascii=False, indent=4)

def main():
    bearer_token = auth()
    url = create_url()
    headers = create_headers(bearer_token)
    json_response = connect_to_endpoint(url, headers)
    
    #Save the data as a json file
    #save_json('collected_tweets.json', json_response)
    
    #save tweets as csv
    #df = pd.json_normalize(data=json_response)
    df1 = pd.DataFrame(json_response['data'])
    df1.to_csv('tweets_data.csv', mode="a")
    df2 = pd.DataFrame(json_response['includes'])
    df2.to_csv('tweets_includes_media.csv', mode="a")
    print(json.dumps(json_response['meta'], sort_keys=True, indent=4))

if __name__ == "__main__":
    main()

我应该如何更改此代码,以使其在 Twitter 的 v2 速率限制内循环,还是改用 tweepy 会更好?

顺便说一句,我确实意识到我保存为 csv 的代码有问题,但这是我现在能做的最好的事情。

【问题讨论】:

  • 需要考虑的一点是,v2 API 早期访问具有每月推文上限以及调用次数的速率限制。您可能不想将其浪费在过多的测试运行上。

标签: python twitter


【解决方案1】:

这里有几件事要记住。

  • Tweepy 尚未更新为使用新版本的 Twitter API (V2),因此您在 Twitter 文档中发现的大部分时间可能与 Tweepy 提供的内容不符。 Tweepy 在 V1 上仍然可以很好地工作,但是,某些推文匹配功能可能会有所不同,您只需要小心。
  • 鉴于您提到的目标,尚不清楚您是否要使用“最近搜索”端点。例如,使用sample stream 启动 1% 的流可能更容易。这是该端点的Twitter's example code。这样做的主要好处是,您可以在“后台”(参见下面的注释)中运行它,并带有一个条件,一旦您收集了 10k 条推文,就会终止该过程。这样,您就不必担心达到推文限制 - 默认情况下,Twitter 将您限制为查询量的约 1%(在您的情况下,"has:images lang:en -is:retweet"),并且只是实时收集这些推文。如果您试图获得两个时间段之间的非转推英文推文的完整记录,您需要将这些时间点添加到您的查询中,然后按照您上面的要求管理限制.在API reference docs 中查看start_timeend_time

注意:要在后台运行脚本,请编写程序,然后从终端使用nohup nameofstreamingcode.py > logfile.log 2>&1 & 执行它。任何正常的终端输出(即打印行和/或错误)都将被写入一个名为logfile.log 的新文件,并且命令末尾的& 使进程在后台运行(因此您可以关闭您的终端并稍后返回)。

  • 要使用“最近搜索”端点,您需要向connect_to_endpoint(url, headers) 函数添加大量数据。此外,您可以使用另一个函数 pause_until,它是为我正在开发的 Twitter V2 API 包编写的 (link to function code)。
def connect_to_endpoint(url, headers):
    response = requests.request("GET", url, headers=headers)

    # Twitter returns (in the header of the request object) how many
    # requests you have left. Lets use this to our advantage
    remaining_requests = int(response.headers["x-rate-limit-remaining"])
    
    # If that number is one, we get the reset-time
    #   and wait until then, plus 15 seconds (your welcome Twitter).
    # The regular 429 exception is caught below as well,
    #   however, we want to program defensively, where possible.
    if remaining_requests == 1:
        buffer_wait_time = 15
        resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
        print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
        pause_until(resume_time)  ## Link to this code in above answer

    # We still may get some weird errors from Twitter.
    # We only care about the time dependent errors (i.e. errors
    #   that Twitter wants us to wait for).
    # Most of these errors can be solved simply by waiting
    #   a little while and pinging Twitter again - so that's what we do.
    if response.status_code != 200:

        # Too many requests error
        if response.status_code == 429:
            buffer_wait_time = 15
            resume_time = datetime.fromtimestamp( int(response.headers["x-rate-limit-reset"]) + buffer_wait_time )
            print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer

        # Twitter internal server error
        elif response.status_code == 500:
            # Twitter needs a break, so we wait 30 seconds
            resume_time = datetime.now().timestamp() + 30
            print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer

        # Twitter service unavailable error
        elif response.status_code == 503:
            # Twitter needs a break, so we wait 30 seconds
            resume_time = datetime.now().timestamp() + 30
            print(f"Waiting on Twitter.\n\tResume Time: {resume_time}")
            pause_until(resume_time)  ## Link to this code in above answer

        # If we get this far, we've done something wrong and should exit
        raise Exception(
            "Request returned an error: {} {}".format(
                response.status_code, response.text
            )
        )

    # Each time we get a 200 response, lets exit the function and return the response.json
    if response.ok:
        return response.json()

由于完整的查询结果将远大于您在每次查询时请求的 100 条推文,因此您需要在更大的查询中跟踪您的位置。这是通过next_token 完成的。

要获得next_token,其实很简单。只需从响应中的元字段中获取它。为了清楚起见,您可以像这样使用上述功能...

# Get response
response = connect_to_endpoint(url, headers)
# Get next_token
next_token = response["meta"]["next_token"]

然后需要在查询详细信息中传递此令牌,这些详细信息包含在您使用 create_url() 函数创建的 url 中。这意味着您还需要将您的 create_url() 函数更新为如下所示...

def create_url(pagination_token=None):
    query = "has:images lang:en -is:retweet"
    tweet_fields = "tweet.fields=attachments,created_at,author_id"
    expansions = "expansions=attachments.media_keys"
    media_fields = "media.fields=media_key,preview_image_url,type,url"
    max_results = "max_results=100"
    if pagination_token == None:
        url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}".format(
            query, tweet_fields, expansions, media_fields, max_results
        )
    else:
        url = "https://api.twitter.com/2/tweets/search/recent?query={}&{}&{}&{}&{}&{}".format(
            query, tweet_fields, expansions, media_fields, max_results, pagination_token
        )
    return url

更改上述函数后,您的代码应按以下方式流动。

  1. 提出请求
  2. response["meta"]["next_token"] 获取next_token
  3. 更新查询参数以包含next_tokencreate_url()
  4. 冲洗并重复,直到:
    1. 您可以获得 10k 条推文
    2. 查询停止

最后一点:我不会尝试使用 pandas 数据框来编写您的文件。我将创建一个空列表,将每个新查询的结果附加到该列表,然后将字典对象的最终列表写入 json 文件(有关详细信息,请参阅this question)。我已经了解到原始推文和熊猫数据框并不能很好地发挥作用。习惯 json 对象和字典的工作方式会更好。

【讨论】:

    【解决方案2】:

    尝试使用调度器:

    import sched
    import time
    
    scheduler = sched.scheduler(time.time, time.sleep)
    scheduler.enter(delay=16 * 60, priority=1, action=connect_to_endpoint)
    

    延迟

    是两个事件之间的时间量。

    动作

    是每 16 分钟执行一次的方法(在本例中)。

    考虑重复的确切时间和确切方法。

    【讨论】:

      猜你喜欢
      • 2021-10-21
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-06-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多