【问题标题】:Cassandra collections serializer throws exceptionCassandra 集合序列化程序抛出异常
【发布时间】:2021-10-19 12:34:22
【问题描述】:

我有一个方法将一个 Id、一组 UDT 和一个日期插入到表中。

public class MessageQueueNext
{
    public string Id { get; set; }
    public List<UDTMessage> Messages { get; set; }
    public DateTimeOffset Updated { get; set; }
}

private static Lazy<PreparedStatement> InsertStatement = new 
    Lazy<PreparedStatement>(
        () => {
            return CassandraDB.Instance.Session.Prepare
            (
                @"INSERT INTO SomeTable
                (id, messages, updated)
                VALUES (?, ?, ?)"
            );
        });

public async Task InsertAsync(SomeObj obj)
{
    var statement = BindValues(
        InsertStatement.Value,
        new dynamic[] {
            obj.Id, obj.Messages, obj.Updated
        }
    );

    await CassandraDB.Instance.Session.ExecuteAsync(statement);
}

private BoundStatement BindValues(PreparedStatement ps, dynamic[] values)
{
    var statement = ps.Bind
    (
        values[0] ?? Unset.Value, values[1] ?? new List<UDTMessage>(), values[2] ?? Unset.Value
    );

    return statement;
}

此方法有时会引发异常。

集合已修改;枚举操作可能无法执行。 在 System.Collections.Generic.List`1.Enumerator.MoveNextRare() 在 Cassandra.Serialization.CollectionSerializer.Serialize(UInt16 协议版本,IEnumerable 值) 在 Cassandra.Serialization.GenericSerializer.Serialize(ProtocolVersion 版本,对象值) 在 Cassandra.QueryProtocolOptions.Write(FrameWriter wb, Boolean isPrepared) 在 Cassandra.Requests.BaseRequest.WriteFrame(Int16 streamId,MemoryStream 流,ISerializer connectionSerializer) 在 Cassandra.OperationState.WriteFrame(Int16 streamId,MemoryStream memoryStream,ISerializer 序列化程序,Int64 时间戳) 在 Cassandra.Connections.Connection.RunWriteQueueAction() --- 从先前抛出异常的位置结束堆栈跟踪 ---

任何人都可以帮助我解决可能导致这种情况的原因吗?据我了解,obj.Messages 在某处发生了变化?或者像它可能在调用 InsertAsync(...) 的函数中发生了变化,但在调用 InsertAsync(...) 之后它立即更改了 obj.Messages 值?

调用 InsertAsync(...) 的方法

public async Task PersistMessageQueue(UDTMessage msg)
{
    var DAO = new DAO();
    var list = await _cache.GetMessages(msg.Id);
    if (list == null)
    {
        list = new List<UDTMessage>();
    }
    list.Add(msg);
    var someObj = new SomeObj();
    someObj.ToId = msg.Id;
    someObj.Messages = list;
    someObj.Updated = DateTimeOffset.UtcNow;
    await DAO.InsertAsync(messageQueueNext);
    _cache.AddElement(msg);
}

【问题讨论】:

  • 我们需要更大的代码示例来理解这个问题。 BindValues() 方法在做什么?
  • 另外,您提供给语句的参数类型是什么?看起来您正在将 obj.Messages 集合传递给驱动程序,但您的应用程序随后会对其进行修改。
  • @JoãoReis 更新了代码。 Id - 字符串,消息 - 列表,更新 - DateTimeOffset
  • 如果 obj.Messages 正在其他地方被修改,您能否签入您的应用程序? obj.Messages 是 List 吗?看起来当驱动程序迭代 obj.Messages 时,其他东西正在修改它。
  • @JoãoReis 我的意思是,如果 Persist 方法被不同的异步任务调用,那么在其他任务完成迭代之前,某些任务可能正在更改(添加到列表中)?这导致异常?那么,您有什么建议可以避免此类事情,但要保持一致性?

标签: c# .net .net-core cassandra cassandra-3.0


【解决方案1】:

查看父方法(来自您的评论):

public async Task PersistMessageQueue(UDTMessage msg) {    
    var DAO = new DAO();     
    var list = await _cache.GetMessages(msg.Id);   
    if (list == null)
    {   
        list = new List<UDTMessage>();  
    }  
    list.Add(msg);   
    var someObj = new SomeObj();  
    someObj.ToId = msg.Id;  
    someObj.Messages = list;    
    someObj.Updated = DateTimeOffset.UtcNow;  
    await DAO.InsertAsync(messageQueueNext); 
    _cache.AddElement(msg); 
}

您似乎将List 对象存储在缓存中,并将其传递给驱动程序,但每次调用PersistMessageQueue() 都会修改此List 对象,因此您可能会遇到驱动程序迭代的问题List 对象,而对 PersistMessageQueue() 的另一个调用正在修改相同的 List 对象。

解决方案是在调用Session.ExecuteAsync() 之前克隆List 对象,如果这是导致问题的原因。

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-04-06
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多