【问题标题】:How do I map the stream filter function of tweepy with the thread pool?tweepy的流过滤功能如何映射到线程池?
【发布时间】:2018-05-07 17:37:12
【问题描述】:

使用库:

from multiprocessing.dummy import Pool as ThreadPool

我正在流式传输推文,然后按如下方式处理它们:

class StreamListener(tweepy.StreamListener):
httpsCheck = 'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
httpCheck = 'http?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'

def on_status(self, status):
    if status.retweeted:
        return
    tweetText = status.text.encode('utf8')
    created_at = status.created_at
    id = status.id
    if (re.findall(self.httpCheck, tweetText) or re.findall(self.httpsCheck, tweetText)):
        return
    if (re.search('[a-zA-Z]', tweetText)):
        response = natural_language_understanding.analyze(
            text=tweetText,
            features=Features(
                entities=EntitiesOptions(
                    emotion=True,
                    sentiment=True,
                    limit=2),
                keywords=KeywordsOptions(
                    emotion=True,
                    sentiment=True,
                    limit=2)),
            language='en'
            )
        response["tweet"] = tweetText
        response["id"] = id
        response["created_at"] = created_at
        with open('#LFCROMA-SF2.csv', 'a') as csv_file:
            writer = csv.writer(csv_file)
            for key, value in response.items():
                writer.writerow([key, value])

def on_error(self, status_code):
    if status_code == 420:
        return False

现在,我想使用多线程来提高速度,使用pool

auth = tweepy.OAuthHandler(settingsTwitter.TWITTER_APP_KEY, settingsTwitter.TWITTER_APP_SECRET)
auth.set_access_token(settingsTwitter.TWITTER_KEY, settingsTwitter.TWITTER_SECRET)
api = tweepy.API(auth)
stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
pool = ThreadPool(4)
pool.map(stream.filter, languages=["en"], track=["#LFC"])

这样做,我得到一个错误:

Traceback(最近一次调用最后一次):文件“atomic.py”,第 66 行,在 pool.map(stream.filter, languages=["en"], track=["#LFC"]) TypeError: map() got an unexpected keyword argument 'languages'

我无法找到在pool.map() 中调用此函数的方法

目的是流式传输推文,应用一些繁重的处理,然后保存结果。就速度而言,流式传输>>处理存在瓶颈。这就是为什么我想以多线程方式流式传输推文。

【问题讨论】:

  • 哪个Threadpool?请edit 在此处添加有意义的代码和问题描述。发布证明您的问题的Minimal, Complete, Verifiable Example 将帮助您获得更好的答案。谢谢!
  • @Prateek 我已经编辑了这个问题。感谢您指出。

标签: python tweepy


【解决方案1】:

警告: from multiprocessing.pool import Pool as ThreadPool 将覆盖 pool.ThreadPool 方法。

所以使用它作为from multiprocessing.pool import Pool

>>>help(Pool.map)
Help on function map in module multiprocessing.pool:
map(self, func, iterable, chunksize=None)
    Apply `func` to each element in `iterable`, collecting the results
    in a list that is returned.

请看documentation

你应该传递函数和一个可迭代对象的实例才能让它工作。

所以删除languages = ['en'],因为地图函数没有语言参数

不如试试

pool.map(function, <a_list you want to pass to function> )

tweepy

Stream 之后,请按照文档中的方式过滤推文

stream.filter(languages=['en'], track=["#LFC"])

这里不需要调用pool.map函数。

stream.filter 会给你很好的结果。

为了调用您的自定义流侦听器,请将stream_listener = StreamListener() 更改为

stream_listener = CustomStreamListener()

您可以拨打pool.map如下:

with Pool(4) as p:
    p.map(stream.filter(languages=['en'], track=["#LFC"]))

但是

Twitter API 带有速率限制和访问限制。如果您使用的是标准版本,您将收到以下错误。

在 当前进程已完成其引导阶段。

这是因为您正在创建线程并尝试多次调用同一个 API, 因为你是单一的访问令牌,所以这是被禁止的。


如果您有多个 API 访问帐户,那么您将执行以下操作:

auth_list = [auth1,auth2,auth3,auth4] #.... if more
with Pool(4) as p:
    p.map(stream.filter(languages=['en'], track=["#LFC"]),auth_list)

请检查企业版是否没有这样的limitations


注意:为避免被阻止,请使用wait_on_rate_limit=True, wait_on_rate_limit_notify=True
api = tweepy.API(auth, wait_on_rate_limit=True, wait_on_rate_limit_notify=True)

【讨论】:

  • 我收到一个错误:TypeError: unsupported operand type(s) for //: 'int' and 'list' 更改:pool.map(stream.filter, ['en'], ['#LFC'])
  • 不需要 [#LFC] ,我已经更新了答案,请检查
  • 你知道吗,如果我写t = stream.filter(languages=['en'], track=["#LFC"]),它会调用 on_status 函数,然后在没有线程的情况下调用所需的函数?此外,您在 pool.map() 中编写的函数就是 stream.filter 函数本身。
  • github.com/avisrivastava254084/bubble-avi/blob/master/atomic.py 这是完整的代码。 1. 我创建一个 StreamListener 的实例 2. 我创建一个 stream 变量 3. 我用参数调用 stream.filter 函数
  • 添加类。我的错。
【解决方案2】:

解决这个问题的最佳方法是:

  • 单独流式传输推文,不要对其使用线程。
  • 编写一个函数来读取推文,然后将其发送以进行处理(本例中为 Watson NLU 处理)
  • tail -f的方式以并行方式(多线程)从文件中读取推文将确保没有处理瓶颈。

用一个例子来理解这一点:

# -*- coding: utf-8 -*-
import re
import csv
import json
import settingsWatson
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, CategoriesOptions
import os
import time
import settingsTwitter
import tweepy
import datetime
from multiprocessing.dummy import Pool as ThreadPool

natural_language_understanding = NaturalLanguageUnderstandingV1(
  username=settingsWatson.username,
  password=settingsWatson.password,
  version='2018-03-16')

class StreamListener(tweepy.StreamListener):
  tweet = {}
  httpsCheck = 'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
  httpCheck = 'http?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
  idSelf = 0
  fieldNames = ['tweet', 'sequence', 'created_at', 'id']

  def on_status(self, status):
    if status.retweeted:
      return
    tweetText = status.text.encode('utf8')
    created_at = status.created_at
    id = status.id
    if (re.findall(self.httpCheck, tweetText) or re.findall(self.httpsCheck, tweetText)):
      return
    if (re.search('[a-zA-Z]', tweetText)):
      self.idSelf += 1
      self.tweet["tweet"] = tweetText
      self.tweet["id"] = id
      self.tweet["sequence"] = self.idSelf
      self.tweet["created_at"] = created_at
      with open('#ELCLASICO-2018-05-07.csv', 'a') as csv_file:
        #json.dump(self.tweet, json_file, sort_keys=True, indent=4, default = str)
        writer = csv.DictWriter(csv_file, self.tweet.keys())
        #for key, value in self.tweet.items():
        #a = [self.tweet]
        #print a[0]['tweet']
        writer.writerow(self.tweet)

  def on_error(self, status_code):
    if status_code == 420:
      return False

auth = tweepy.OAuthHandler(settingsTwitter.TWITTER_APP_KEY, settingsTwitter.TWITTER_APP_SECRET)
auth.set_access_token(settingsTwitter.TWITTER_KEY, settingsTwitter.TWITTER_SECRET)
api = tweepy.API(auth)
print "Twitter API Authentication is successful!"
stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
print "Streaming begins!"

def startStream():
  while True:
    try:
      stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
      stream.filter(languages=["en"], track=["#ElClasico"])
    except:
      continue

startStream()

上面的代码用于在特定条件下流式传输推文,然后将它们保存在 CSV 文件中。

但是,这从来都不是问题。问题在于我的“原子”方法。我不应该通过将整个过程(流、进程、保存)集中在一起,而是通过分布式计算方法(流是一项任务,进程是另一项任务)来避免瓶颈。请注意,此方法也会失败(因为处理永远无法匹配流的速度)。为了克服这个问题,我以分布式方式应用了多线程:

  • 串流推文
  • 保存推文 [这(下)是多线程方法的起点]
  • 以tail -f方式阅读推文
  • 将它们发送到 Watson 进行处理
  • 执行数据文件处理等

这是处理东西的代码:

# -*- coding: utf-8 -*-
import re
import csv
import json
import settingsWatson
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, CategoriesOptions
import os
import time
import settingsTwitter
import tweepy
import datetime
from multiprocessing.dummy import Pool as ThreadPool

natural_language_understanding = NaturalLanguageUnderstandingV1(
    username=settingsWatson.username,
    password=settingsWatson.password,
    version='2018-03-16')

class FileTailer(object):

    def __init__(self, file, delay=1):
        self.file = file
        self.delay = delay

    def __iter__(self):
        while True:
            where = self.file.tell()
            line = self.file.readline()
            if line and line.endswith('\n'): # only emit full lines
                yield line
                #response = self.naturalLanguageProcessing(line)
            else:
                print "Waiting for new line"
                # for a partial line, pause and back up
                time.sleep(self.delay)       # ...not actually a recommended approach.
                self.file.seek(where)


class watson:

  entityDict = {"Messi":["Lionel Messi", "Leo", "Messi"], "Ronaldo":["Cristiano Ronaldo", "Cristiano", "Ronaldo"], "Iniesta":["Andres Iniesta", "Iniesta"], "Barcelona":["Barca", "Barcelona", "FC Barcelona", "#FCBarcelona"], "Real Madrid":["Real Madrid", "Madrid", "#RMA", "#RealMadrid"]}
  date = "2018-05-06"

  def createFiles(self):
    for entity in self.entityDict:
      fileName = str(entity) + "-" + str(self.date) + ".csv"
      print fileName
      with open(fileName, 'wb') as myFile:
        wr = csv.writer(myFile, quoting=csv.QUOTE_ALL)
    print "This is the entity defined list:"
    print self.entityDict
    for i in self.entityDict:
      for j in self.entityDict[i]:
        print j

  def naturalLanguageProcessing(self,tweetText):
    print "NLP is called with this text: " + tweetText
    try:
      response = natural_language_understanding.analyze(
        text=tweetText,
        features=Features(
          entities=EntitiesOptions(
            emotion=True,
            sentiment=True,
            limit=2),
          keywords=KeywordsOptions(
            emotion=True,
            sentiment=True,
            limit=2)),
        language='en'
        )
      response["tweet"] = tweetText
      self.saveResults(response)
    except:
      print "Error occured. Sleeping for one"
      time.sleep(1)
      return None

  def saveResults(self, response):
    print "Saving the results"
    entitiesTweet = response["entities"]
    print "Printing entitiesTweet"
    print entitiesTweet
    for entity in entitiesTweet:
      try:
        for i in self.entityDict:
          for j in self.entityDict[i]:
            if(j == entity["text"]):
              fileName = str(self.entityDict[i]) + "-" + str(self.date) + ".csv"
              with open(fileName, 'a') as myFile:
                wr = csv.writer(myFile, quoting=csv.QUOTE_ALL)
                entity["tweet"] = response["tweet"]
                wr.writerow([entity])
      except Exception as e:
        print (e)

          #entityToBeInserted = entity
          #entityToBeInserted["tweet"] = response["tweet"]
          #fileName = str(entityItem) + "-" + str(date) + ".csv"
          #with open(fileName, 'a') as myFile:
            #wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
            #wr.writerow([entityToBeInserted])
    with open('#ELCLASICO-2018-05-07-Watson.csv', 'a') as csv_file:
      writer = csv.writer(csv_file)
      writer.writerow([response])


  #def saveEntityResults(self, entities, date):



def initiator(farzi):
  csv_reader = csv.reader(FileTailer(open('#ELCLASICO-2018-05-07.csv')))
  ob = watson()
  for row in csv_reader:
    tweet = row[1]
    ob.naturalLanguageProcessing(tweet)

pool = ThreadPool(4) 
farzi = "farzi"
pool.map(initiator, farzi)

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 2017-01-07
    • 1970-01-01
    • 1970-01-01
    • 2021-07-05
    • 1970-01-01
    • 2015-04-26
    • 2016-11-11
    • 1970-01-01
    相关资源
    最近更新 更多