解决这个问题的最佳方法是:
- 单独流式传输推文,不要对其使用线程。
- 编写一个函数来读取推文,然后将其发送以进行处理(本例中为 Watson NLU 处理)
- 以
tail -f的方式以并行方式(多线程)从文件中读取推文将确保没有处理瓶颈。
用一个例子来理解这一点:
# -*- coding: utf-8 -*-
import re
import csv
import json
import settingsWatson
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, CategoriesOptions
import os
import time
import settingsTwitter
import tweepy
import datetime
from multiprocessing.dummy import Pool as ThreadPool
natural_language_understanding = NaturalLanguageUnderstandingV1(
username=settingsWatson.username,
password=settingsWatson.password,
version='2018-03-16')
class StreamListener(tweepy.StreamListener):
tweet = {}
httpsCheck = 'https?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
httpCheck = 'http?://(?:[-\w.]|(?:%[\da-fA-F]{2}))+'
idSelf = 0
fieldNames = ['tweet', 'sequence', 'created_at', 'id']
def on_status(self, status):
if status.retweeted:
return
tweetText = status.text.encode('utf8')
created_at = status.created_at
id = status.id
if (re.findall(self.httpCheck, tweetText) or re.findall(self.httpsCheck, tweetText)):
return
if (re.search('[a-zA-Z]', tweetText)):
self.idSelf += 1
self.tweet["tweet"] = tweetText
self.tweet["id"] = id
self.tweet["sequence"] = self.idSelf
self.tweet["created_at"] = created_at
with open('#ELCLASICO-2018-05-07.csv', 'a') as csv_file:
#json.dump(self.tweet, json_file, sort_keys=True, indent=4, default = str)
writer = csv.DictWriter(csv_file, self.tweet.keys())
#for key, value in self.tweet.items():
#a = [self.tweet]
#print a[0]['tweet']
writer.writerow(self.tweet)
def on_error(self, status_code):
if status_code == 420:
return False
auth = tweepy.OAuthHandler(settingsTwitter.TWITTER_APP_KEY, settingsTwitter.TWITTER_APP_SECRET)
auth.set_access_token(settingsTwitter.TWITTER_KEY, settingsTwitter.TWITTER_SECRET)
api = tweepy.API(auth)
print "Twitter API Authentication is successful!"
stream_listener = StreamListener()
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
print "Streaming begins!"
def startStream():
while True:
try:
stream = tweepy.Stream(auth=api.auth, listener=stream_listener)
stream.filter(languages=["en"], track=["#ElClasico"])
except:
continue
startStream()
上面的代码用于在特定条件下流式传输推文,然后将它们保存在 CSV 文件中。
但是,这从来都不是问题。问题在于我的“原子”方法。我不应该通过将整个过程(流、进程、保存)集中在一起,而是通过分布式计算方法(流是一项任务,进程是另一项任务)来避免瓶颈。请注意,此方法也会失败(因为处理永远无法匹配流的速度)。为了克服这个问题,我以分布式方式应用了多线程:
- 串流推文
- 保存推文
[这(下)是多线程方法的起点]
- 以tail -f方式阅读推文
- 将它们发送到 Watson 进行处理
- 执行数据文件处理等
这是处理东西的代码:
# -*- coding: utf-8 -*-
import re
import csv
import json
import settingsWatson
from watson_developer_cloud import NaturalLanguageUnderstandingV1
from watson_developer_cloud.natural_language_understanding_v1 import Features, EntitiesOptions, KeywordsOptions, CategoriesOptions
import os
import time
import settingsTwitter
import tweepy
import datetime
from multiprocessing.dummy import Pool as ThreadPool
natural_language_understanding = NaturalLanguageUnderstandingV1(
username=settingsWatson.username,
password=settingsWatson.password,
version='2018-03-16')
class FileTailer(object):
def __init__(self, file, delay=1):
self.file = file
self.delay = delay
def __iter__(self):
while True:
where = self.file.tell()
line = self.file.readline()
if line and line.endswith('\n'): # only emit full lines
yield line
#response = self.naturalLanguageProcessing(line)
else:
print "Waiting for new line"
# for a partial line, pause and back up
time.sleep(self.delay) # ...not actually a recommended approach.
self.file.seek(where)
class watson:
entityDict = {"Messi":["Lionel Messi", "Leo", "Messi"], "Ronaldo":["Cristiano Ronaldo", "Cristiano", "Ronaldo"], "Iniesta":["Andres Iniesta", "Iniesta"], "Barcelona":["Barca", "Barcelona", "FC Barcelona", "#FCBarcelona"], "Real Madrid":["Real Madrid", "Madrid", "#RMA", "#RealMadrid"]}
date = "2018-05-06"
def createFiles(self):
for entity in self.entityDict:
fileName = str(entity) + "-" + str(self.date) + ".csv"
print fileName
with open(fileName, 'wb') as myFile:
wr = csv.writer(myFile, quoting=csv.QUOTE_ALL)
print "This is the entity defined list:"
print self.entityDict
for i in self.entityDict:
for j in self.entityDict[i]:
print j
def naturalLanguageProcessing(self,tweetText):
print "NLP is called with this text: " + tweetText
try:
response = natural_language_understanding.analyze(
text=tweetText,
features=Features(
entities=EntitiesOptions(
emotion=True,
sentiment=True,
limit=2),
keywords=KeywordsOptions(
emotion=True,
sentiment=True,
limit=2)),
language='en'
)
response["tweet"] = tweetText
self.saveResults(response)
except:
print "Error occured. Sleeping for one"
time.sleep(1)
return None
def saveResults(self, response):
print "Saving the results"
entitiesTweet = response["entities"]
print "Printing entitiesTweet"
print entitiesTweet
for entity in entitiesTweet:
try:
for i in self.entityDict:
for j in self.entityDict[i]:
if(j == entity["text"]):
fileName = str(self.entityDict[i]) + "-" + str(self.date) + ".csv"
with open(fileName, 'a') as myFile:
wr = csv.writer(myFile, quoting=csv.QUOTE_ALL)
entity["tweet"] = response["tweet"]
wr.writerow([entity])
except Exception as e:
print (e)
#entityToBeInserted = entity
#entityToBeInserted["tweet"] = response["tweet"]
#fileName = str(entityItem) + "-" + str(date) + ".csv"
#with open(fileName, 'a') as myFile:
#wr = csv.writer(myfile, quoting=csv.QUOTE_ALL)
#wr.writerow([entityToBeInserted])
with open('#ELCLASICO-2018-05-07-Watson.csv', 'a') as csv_file:
writer = csv.writer(csv_file)
writer.writerow([response])
#def saveEntityResults(self, entities, date):
def initiator(farzi):
csv_reader = csv.reader(FileTailer(open('#ELCLASICO-2018-05-07.csv')))
ob = watson()
for row in csv_reader:
tweet = row[1]
ob.naturalLanguageProcessing(tweet)
pool = ThreadPool(4)
farzi = "farzi"
pool.map(initiator, farzi)