【问题标题】:How do you filter updates to specific fields from ChangeStream in MongoDB如何从 MongoDB 中的 ChangeStream 过滤特定字段的更新
【发布时间】:2018-09-24 22:22:00
【问题描述】:

我正在设置一个 ChangeStream 以在集合中的文档发生更改时通知我,以便我可以将该文档的“LastModified”元素插入到事件发生的时间。由于此更新会导致 ChangeStream 上发生新事件,因此我需要过滤掉这些更新以防止无限循环(更新 LastModified 元素,因为 LastModified 元素刚刚更新...)。

当我指定确切的字段时,我有以下代码可以工作:

ChangeStreamOptions options = new ChangeStreamOptions();
options.ResumeAfter = resumeToken;

string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields.LastModified': { $exists: false } } ] }";
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<BsonDocument>>().Match(filter);

var cursor = collection.Watch(pipeline, options, cancelToken);

但是,我不想对“updateDescription.updatedFields.LastModified”进行硬编码,而是提供一个我不想在 updatedFields 文档中存在的元素名称列表。

我尝试过:

string filter = "{ $and: [ { operationType: { $in: ['replace','insert','update'] } }, { 'updateDescription.updatedFields': { $nin: [ 'LastModified' ] } } ] }";

但这并没有按预期工作(我仍然收到 LastModified 更改的更新事件。

我最初使用的是过滤器生成器:

FilterDefinitionBuilder<ChangeStreamDocument<BsonDocument>> filterBuilder = Builders<ChangeStreamDocument<BsonDocument>>.Filter;
FilterDefinition<ChangeStreamDocument<BsonDocument>> filter = filterBuilder.In("operationType", new string[] { "replace", "insert", "update" });  //Only include the change if it was one of these types.  Available types are: insert, update, replace, delete, invalidate
filter &= filterBuilder.Nin("updateDescription.updatedFields", ChangedFieldsToIgnore); //If this is an update, only include it if the field(s) updated contains 1+ fields not in the ChangedFieldsToIgnore list

其中 ChangedFieldsToIgnore 是一个列表,其中包含我不想为其获取事件的字段名称。

任何人都可以帮助我使用需要使用的语法吗?还是我需要围绕我的 ChangedFieldsToIgnore 列表创建一个循环,并在过滤器中为每个项目创建一个新条目以“$exists:false”? (这似乎效率不高)。

编辑:

我根据@wan-bachtiar 的回答尝试了以下代码,但我的 enumerator.MoveNext() 调用出现异常:

var match1 = new BsonDocument { { "$match", new BsonDocument { { "operationType", new BsonDocument { { "$in", new BsonArray(new string[] { "replace", "insert", "update" }) } } } } } };
var match2 = new BsonDocument { { "$addFields", new BsonDocument { { "tmpfields", new BsonDocument { { "$objectToArray", "$updateDescription.updatedFields" } } } } } };
var match3 = new BsonDocument { { "$match", new BsonDocument { { "tmpfields.k", new BsonDocument { { "$nin", new BsonArray(updatedFieldsToIgnore) } } } } } };
var pipeline = new[] { match1, match2, match3 };

var cursor = collection.Watch<ChangeStreamDocument<BsonDocument>>(pipeline, options, Profile.CancellationToken);
enumerator = cursor.ToEnumerable().GetEnumerator();

enumerator.MoveNext();
ChangeStreamDocument<BsonDocument> doc = enumerator.Current;

例外是:"{"Invalid field name: \"tmpfields\"."}"

我怀疑问题可能是我收到不包含 updateDescription 字段的“替换”和“插入”事件,因此 $addFields/$objectToArray 失败。我太新了,无法弄清楚语法,但我认为我需要使用一个过滤器:

{ $match: { "operationType": { $in: ["replace", "insert"] } } }
OR
{ $eq: { "operationTYpe": "update" }} AND { $addFields....}

此外,C# 驱动程序似乎不包含帮助 $addFields 和 $objectToArray 操作的生成器。我只能使用new BsonDocument {...} 方法来构建管道变量。

【问题讨论】:

  • 您不想看到包含updateDescription.updatedFields.UnwantedField 的整个更新事件吗?因为updatedFields 可能包含多个字段,例如它可能包含LastModified 和另一个在同一操作中更新的字段。
  • 我的应用程序是唯一应该更新 LastModified 字段的应用程序,所以在我的情况下可以。但是,如果您有一个过滤器提案,它只排除包含要忽略的字段列表子集的更新,我很乐意看到它。
  • 我想知道是否有办法使用 $redact 聚合运算符来做到这一点。

标签: c# mongodb mongodb-.net-driver


【解决方案1】:

ChangedFieldsToIgnore 是一个列表,其中包含我不想为其获取事件的字段名称。

如果您想基于多个键进行过滤(updatedFields 是否包含某些字段),如果您先将键转换为值会更容易。

您可以使用聚合运算符$objectToArray 将包含在updatedFields 中的文档转换为值。例如:

pipeline = [{"$addFields": {
             "tmpfields":{
               "$objectToArray":"$updateDescription.updatedFields"}
            }}, 
            {"$match":{"tmpfields.k":{
                       "$nin":["LastModified", "AnotherUnwantedField"]}}}
];

上述聚合管道添加了一个名为tmpfields 的临时字段。这个新字段会将updateDescription.updatedFields 的内容转为{name:value}[{k:name, v:value}]。一旦我们将这些键作为值,我们就可以使用$nin 作为过滤器数组。

更新

您收到 tmpfields 无效异常的原因是因为结果被转换为 ChangeStreamDocument 模型,该模型没有名为 tmpfields 的可识别字段。

在这种情况下,当它是没有字段updateDescription.updatedFields 的不同操作时,tmpfields 的值将只是null

以下是使用 MongoDB .Net driver v2.5 的 MongoDB ChangeStream .Net/C# 示例,以及修改输出更改流的聚合管道。

这个例子不是类型安全的,会返回BsonDocument

var database = client.GetDatabase("database");            
var collection = database.GetCollection<BsonDocument>("collection");

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };

// Aggregation Pipeline
var addFields = new BsonDocument { 
                    { "$addFields", new BsonDocument { 
                       { "tmpfields", new BsonDocument { 
                         { "$objectToArray", 
                           "$updateDescription.updatedFields" } 
                       } } 
                 } } };
var match = new BsonDocument { 
                { "$match", new BsonDocument { 
                  { "tmpfields.k", new BsonDocument { 
                    { "$nin", new BsonArray{"LastModified", "Unwanted"} } 
            } } } } };

var pipeline = new[] { addFields, match };

// ChangeStreams
var cursor = collection.Watch<BsonDocument>(pipeline, options);

foreach (var change in cursor.ToEnumerable())
{
    Console.WriteLine(change.ToJson());
}

【讨论】:

  • 我认为这已经接近我需要的地方,但是我遇到了一个关于 tmpfields 是无效字段名称的新异常。我怀疑这是当文档不是 opType“更新”,但是是“插入”或“替换”。我已经根据您的回答尝试编辑了 OP。
  • @JerrenSaunders ,我已经用示例代码更新了我的答案。这不是由于不同的操作类型,而只是模型演员。即ChangeStreamDocument.
  • 谢谢。这让我足够接近我需要去的地方。我仍然需要将此 BsonDocument 转换为我已经设法做到的 ChangeStreamDocument ,但我确信有更好的方法可以将我现在放在一起的内容...我会问不过在一个新问题中。
【解决方案2】:

我写了下面的代码,因为我遇到了和你一样的问题。无需乱用 BsonObjects ...

//The operationType can be one of the following: insert, update, replace, delete, invalidate
//ignore the field lastrun as we would end in an endles loop
var pipeline = new EmptyPipelineDefinition<ChangeStreamDocument<ATask>>()
    .Match("{ operationType: { $in: [ 'replace', 'update' ] } }")
    .Match(@"{ ""updateDescription.updatedFields.LastRun"" : { $exists: false } }")
    .Match(@"{ ""updateDescription.updatedFields.IsRunning"" : { $exists: false } }");

var options = new ChangeStreamOptions { FullDocument = ChangeStreamFullDocumentOption.UpdateLookup };
var changeStream = Collection.Watch(pipeline, options);    

while (changeStream.MoveNext())
{
    var next = changeStream.Current;
    foreach (var obj in next)
        yield return obj.FullDocument;
}

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-04-23
    • 2019-12-29
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多