【问题标题】:How can Python Observe Changes to Mongodb's OplogPython如何观察Mongodb的Oplog变化
【发布时间】:2014-01-19 04:42:34
【问题描述】:

我有多个使用 pyMongo 写入 Mongodb 的 Python 脚本。另一个 Python 脚本如何观察对 Mongo 查询的更改并在更改发生时执行某些功能? mongodb 是在启用 oplog 的情况下设置的。

【问题讨论】:

    标签: python mongodb python-2.7 pymongo


    【解决方案1】:

    前段时间,我用 Python 为 MongoDB 编写了一个增量备份工具。该工具通过跟踪oplog 来监控数据更改。这是代码的相关部分。

    更新答案,MongDB 3.6+

    正如 datdinhquoc 在下面的 cmets 中巧妙指出的那样,对于 MongoDB 3.6 及更高版本,有 Change Streams

    更新答案,pymongo 3

    from time import sleep
    
    from pymongo import MongoClient, ASCENDING
    from pymongo.cursor import CursorType
    from pymongo.errors import AutoReconnect
    
    # Time to wait for data or connection.
    _SLEEP = 1.0
    
    if __name__ == '__main__':
        oplog = MongoClient().local.oplog.rs
        stamp = oplog.find().sort('$natural', ASCENDING).limit(-1).next()['ts']
    
        while True:
            kw = {}
    
            kw['filter'] = {'ts': {'$gt': stamp}}
            kw['cursor_type'] = CursorType.TAILABLE_AWAIT
            kw['oplog_replay'] = True
    
            cursor = oplog.find(**kw)
    
            try:
                while cursor.alive:
                    for doc in cursor:
                        stamp = doc['ts']
    
                        print(doc)  # Do something with doc.
    
                    sleep(_SLEEP)
    
            except AutoReconnect:
                sleep(_SLEEP)
    

    另见http://api.mongodb.com/python/current/examples/tailable.html

    原始答案,pymongo 2

    from time import sleep
    
    from pymongo import MongoClient
    from pymongo.cursor import _QUERY_OPTIONS
    from pymongo.errors import AutoReconnect
    from bson.timestamp import Timestamp
    
    # Tailable cursor options.
    _TAIL_OPTS = {'tailable': True, 'await_data': True}
    
    # Time to wait for data or connection.
    _SLEEP = 10
    
    if __name__ == '__main__':
        db = MongoClient().local
    
        while True:
            query = {'ts': {'$gt': Timestamp(some_timestamp, 0)}}  # Replace with your query.
            cursor = db.oplog.rs.find(query, **_TAIL_OPTS)
    
            cursor.add_option(_QUERY_OPTIONS['oplog_replay'])
    
            try:
                while cursor.alive:
                    try:
                        doc = next(cursor)
    
                        # Do something with doc.
    
                    except (AutoReconnect, StopIteration):
                        sleep(_SLEEP)
    
            finally:
                cursor.close()
    

    【讨论】:

    【解决方案2】:

    我今天遇到了这个问题,但在任何地方都没有找到更新的答案。

    从 v3.0 开始,Cursor 类已更改,不再接受 tailableawait_data 参数。此示例将跟踪 oplog 并在找到比它找到的最后一条记录更新的记录时打印 oplog 记录。

    # Adapted from the example here: https://jira.mongodb.org/browse/PYTHON-735
    # to work with pymongo 3.0
    
    import pymongo
    from pymongo.cursor import CursorType
    
    c = pymongo.MongoClient()
    
    # Uncomment this for master/slave.
    oplog = c.local.oplog['$main']
    # Uncomment this for replica sets.
    #oplog = c.local.oplog.rs
    
    first = next(oplog.find().sort('$natural', pymongo.DESCENDING).limit(-1))
    ts = first['ts']
    
    while True:
        cursor = oplog.find({'ts': {'$gt': ts}}, cursor_type=CursorType.TAILABLE_AWAIT, oplog_replay=True)
        while cursor.alive:
            for doc in cursor:
                ts = doc['ts']
                print doc
                # Work with doc here
    

    【讨论】:

      【解决方案3】:

      我有同样的问题。我把这个rescommunes/oplog.py放在一起。检查 cmets 并查看 __main__ 以了解如何在脚本中使用它的示例。

      【讨论】:

        【解决方案4】:

        使用tailable cursor查询oplog。

        实际上很有趣,因为 oplog-monitoring 正是 tailable-cursor 功能最初添加的目的。我发现它对其他事情也非常有用(例如,实现基于 mongodb 的 pubsub,例如参见 this post),但这是最初的目的。

        【讨论】:

        • 那篇文章的作者在评论中说“顺便说一句,我实际上已经放弃了这种方法,转而直接使用 oplog”
        • mongodb 3.6+ 有 client.changestream 属性
        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-04
        • 2021-11-18
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多