【发布时间】:2012-06-07 00:05:50
【问题描述】:
背景:
我设置了一个python 模块,用于从流式API 中获取JSON 对象,并使用pymongo 将它们(一次批量插入25 个)存储在MongoDB 中。为了比较,我还有一个来自同一流 API 的 bash 命令到 curl 和 pipe 它到 mongoimport。这两种方法都将数据存储在单独的集合中。
我定期监控集合的count() 以检查它们的表现。
到目前为止,我看到 python 模块落后于 curl | mongoimport 方法大约 1000 个 JSON 对象。
问题:
如何优化我的python 模块以与 curl | mongoimport 同步?
我不能使用tweetstream,因为我使用的不是 Twitter API,而是第 3 方流媒体服务。
有人可以帮我吗?
Python模块:
class StreamReader:
def __init__(self):
try:
self.buff = ""
self.tweet = ""
self.chunk_count = 0
self.tweet_list = []
self.string_buffer = cStringIO.StringIO()
self.mongo = pymongo.Connection(DB_HOST)
self.db = self.mongo[DB_NAME]
self.raw_tweets = self.db["raw_tweets_gnip"]
self.conn = pycurl.Curl()
self.conn.setopt(pycurl.ENCODING, 'gzip')
self.conn.setopt(pycurl.URL, STREAM_URL)
self.conn.setopt(pycurl.USERPWD, AUTH)
self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
self.conn.perform()
except Exception as ex:
print "error ocurred : %s" % str(ex)
def handle_data(self, data):
try:
self.string_buffer = cStringIO.StringIO(data)
for line in self.string_buffer:
try:
self.tweet = json.loads(line)
except Exception as json_ex:
print "JSON Exception occurred: %s" % str(json_ex)
continue
if self.tweet:
try:
self.tweet_list.append(self.tweet)
self.chunk_count += 1
if self.chunk_count % 1000 == 0
self.raw_tweets.insert(self.tweet_list)
self.chunk_count = 0
self.tweet_list = []
except Exception as insert_ex:
print "Error inserting tweet: %s" % str(insert_ex)
continue
except Exception as ex:
print "Exception occurred: %s" % str(ex)
print repr(self.buff)
def __del__(self):
self.string_buffer.close()
感谢阅读。
【问题讨论】:
-
您插入的文档是否有“_id”字段?
-
你用的是什么版本的mongo和什么版本的pymongo?
-
@AsyaKamsky Python 2.7、MongoDb 2.0.4 和 PyMongo 2.2。
-
你的 bash curl 脚本/命令是什么样的?
-
我更新了我的答案 - 我认为这是确定的词 :)
标签: python json mongodb pymongo http-streaming