【问题标题】:Optimization: Dumping JSON from a Streaming API to Mongo优化:将 JSON 从 Streaming API 转储到 Mongo
【发布时间】:2012-06-07 00:05:50
【问题描述】:

背景: 我设置了一个python 模块,用于从流式API 中获取JSON 对象,并使用pymongo 将它们(一次批量插入25 个)存储在MongoDB 中。为了比较,我还有一个来自同一流 API 的 bash 命令到 curlpipe 它到 mongoimport。这两种方法都将数据存储在单独的集合中。

我定期监控集合的count() 以检查它们的表现。

到目前为止,我看到 python 模块落后于 curl | mongoimport 方法大约 1000 个 JSON 对象。

问题: 如何优化我的python 模块以与 curl | mongoimport 同步?

我不能使用tweetstream,因为我使用的不是 Twitter API,而是第 3 方流媒体服务。

有人可以帮我吗?

Python模块:


class StreamReader:
    def __init__(self):
        try:
            self.buff = ""
            self.tweet = ""
            self.chunk_count = 0
            self.tweet_list = []
            self.string_buffer = cStringIO.StringIO()
            self.mongo = pymongo.Connection(DB_HOST)
            self.db = self.mongo[DB_NAME]
            self.raw_tweets = self.db["raw_tweets_gnip"]
            self.conn = pycurl.Curl()
            self.conn.setopt(pycurl.ENCODING, 'gzip')
            self.conn.setopt(pycurl.URL, STREAM_URL)
            self.conn.setopt(pycurl.USERPWD, AUTH)
            self.conn.setopt(pycurl.WRITEFUNCTION, self.handle_data)
            self.conn.perform()
        except Exception as ex:
            print "error ocurred : %s" % str(ex)

    def handle_data(self, data):
        try:
            self.string_buffer = cStringIO.StringIO(data)
            for line in self.string_buffer:
                try:
                    self.tweet = json.loads(line)
                except Exception as json_ex:
                    print "JSON Exception occurred: %s" % str(json_ex)
                    continue

                if self.tweet:
                    try:
                        self.tweet_list.append(self.tweet)
                        self.chunk_count += 1
                        if self.chunk_count % 1000 == 0
                            self.raw_tweets.insert(self.tweet_list)
                            self.chunk_count = 0
                            self.tweet_list = []

                    except Exception as insert_ex:
                        print "Error inserting tweet: %s" % str(insert_ex)
                        continue
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
            print repr(self.buff)

    def __del__(self):
        self.string_buffer.close()

感谢阅读。

【问题讨论】:

  • 您插入的文档是否有“_id”字段?
  • 你用的是什么版本的mongo和什么版本的pymongo?
  • @AsyaKamsky Python 2.7、MongoDb 2.0.4 和 PyMongo 2.2。
  • 你的 bash curl 脚本/命令是什么样的?
  • 我更新了我的答案 - 我认为这是确定的词 :)

标签: python json mongodb pymongo http-streaming


【解决方案1】:

摆脱了 StringIO 库。由于WRITEFUNCTION 回调handle_data,在这种情况下,每行都会调用,只需直接加载JSON。但是,有时数据中可能包含两个JSON 对象。抱歉,我无法发布我使用的 curl 命令,因为它包含我们的凭据。但是,正如我所说,这是适用于任何流式 API 的普遍问题。


def handle_data(self, buf): 
    try:
        self.tweet = json.loads(buf)
    except Exception as json_ex:
        self.data_list = buf.split('\r\n')
        for data in self.data_list:
            self.tweet_list.append(json.loads(data))    

【讨论】:

    【解决方案2】:

    原来您的代码中有一个错误。

                    if self.chunk_count % 50 == 0
                        self.raw_tweets.insert(self.tweet_list)
                        self.chunk_count = 0
    

    您重置了 chunk_count,但没有重置 tweet_list。因此,您第二次尝试插入 100 个项目(50 个新项目加上 50 个之前已发送到 DB 的项目)。您已解决此问题,但仍发现性能有所不同。

    整个批量大小的事情结果是一个红鲱鱼。我尝试使用一个大的 json 文件并通过 python 加载它,而不是通过 mongoimport 加载它,Python 总是更快(即使在安全模式下 - 见下文)。

    仔细查看您的代码,我意识到问题在于流式 API 实际上是以块的形式向您传递数据。您应该只获取这些块并将它们放入数据库中(这就是 mongoimport 正在做的事情)。您的 python 为拆分流、将其添加到列表然后定期向 Mongo 发送批次所做的额外工作可能是我看到的和您看到的之间的区别。

    为你的 handle_data() 试试这个 sn-p

    def handle_data(self, data):
        try:
            string_buffer = StringIO(data)
            tweets = json.load(string_buffer)
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
        try:
            self.raw_tweets.insert(tweets)
        except Exception as ex:
            print "Exception occurred: %s" % str(ex)
    

    需要注意的是您的python inserts are not running in "safe mode" - 您应该通过在插入语句中添加参数safe=True 来更改它。然后,您将在任何插入失败时获得异常,并且您的 try/catch 将打印暴露问题的错误。

    它的性能成本也不高 - 我目前正在运行一个测试,大约五分钟后,两个集合的大小为 14120 14113。

    【讨论】:

    • 顺便说一句,我尝试了您的代码 - 通过修复,Python 插入数据的速度大约是 mongoimport 的两倍。这是因为默认情况下“安全”插入是关闭的。通过打开安全写入(通过 safe=True 来插入)Python 插入仍然是 75% 的 mongoimport 时间。
    • 感谢您指出!我进行了必要的更改(也更新了上面的代码):在 self.chunk_count = 0 之后添加了“self.tweet_list = []”并将批量大小增加到 1000。它似乎仍然滞后 - python 模块计数为 5000,而curl mongoimport 组合为 5718。(它是 4000:5662)。有什么见解吗?
    • 由于您一次只插入 1,000 个,因此您总是会看到 1,000 的倍数 - 看起来它实际上并没有落后...
    • 是的,但是 4000 : 5662 意味着在最小时仍有 600 的滞后。正确的?可以在这两个地方进行任何优化 - self.string_buffer = cStringIO.StringIO(data) for line in self.string_buffer:?
    • 我在没有 curl 或 pycurl 的情况下进行了测试 - 我只是转储了一个大的 .json 文件并使用 mongoimport 和 pymongo 加载它。这就是为什么我怀疑你的问题不是插入速度不够快,也许是它没有足够快地获取数据(通过 pycurl)。
    猜你喜欢
    • 2012-07-11
    • 1970-01-01
    • 2018-01-02
    • 2023-03-09
    • 2020-07-25
    • 1970-01-01
    • 1970-01-01
    • 2021-11-16
    • 2019-11-19
    相关资源
    最近更新 更多