【问题标题】:Porting entities between partitions in table storage在表存储中的分区之间移植实体
【发布时间】:2015-05-22 08:13:05
【问题描述】:

我需要将 Azure 表存储中表中的整个记录​​分区从 Partition1 移植到 Partition2。数千,甚至数百万。

我知道在 Azure 表存储中无法将实体从一个分区移植到另一个分区,您需要删除旧的并插入一个新的,并更新 PartitionKey,所以我的任务是做同样的事情许多记录。

有什么标准吗?

我想出了以下解决方案(简化):

public async Task Migrate(string oldPartition, string newPartition)
{
    TableContinuationToken token = null;

    List<Task> migrationTasks = new List<Task>();

    do
    {
         TableQuerySegment<MyTableEntity> entries = await GetEntriesSegment(
             oldPartition, 
             token);

         token = entries.ContinuationToken;

         migrationTasks.Add(MigrateEntries(entries, newPartition));
    } while (token != null)

    await Task.WhenAll(migrationTasks);
}

private async Task MigrateEntries(IEnumerable<MyTableEntity> entries, string newPartition)
{
    await Task.WhenAll(
        InsertInBatches(entries.Select(
            e => GetEntryWithUpdatedPartitionKey(e, newPartition)),
        DeleteInBatches(entries));
}
  • GetEntriesSegment封装了访问表并获取段的逻辑
  • GetEntryWithUpdatedPartitionKey 只是将所有字段从一个MyTableEntity 类型的对象复制到一个新创建的对象中,但使用不同的PartitionKey
  • InsertInBatches 负责将条目集合拆分为 100 个批次(Azure 表存储限制)并并行执行所有批次插入(通过另外一个 await Task.WhenAll(insertTasks) 内部)
  • DeleteInBatches 负责将条目集合拆分为 100 个批次(Azure 表存储限制)并并行执行所有批次删除(通过内部的另一个 await Task.WhenAll(deleteTasks)

我的主要目标是并行处理所有内容。即,应在删除已读取的条目并插入新条目的同时读取新条目。

这个解决方案看起来合理吗?您知道任何经过时间证明(经过充分测试,用于生产项目)的替代方案吗?

【问题讨论】:

标签: .net async-await task-parallel-library azure-table-storage tpl-dataflow


【解决方案1】:

你可以试试https://msdn.microsoft.com/library/hh228603.aspx

任务并行库 (TPL) 提供数据流组件来帮助提高支持并发的应用程序的稳健性。

【讨论】:

  • 谢谢,绝对值得一试。我仍然想知道是否有任何现成的解决方案可以解决我的具体问题:Azure 表存储中分区之间的数据迁移。
  • 好的,我现在正在尝试。我仍然看不到如何创建一个从表存储接收数据并立即将其传递给处理的块。也就是说,即使现在正在处理另一个批次,一个新线程开始处理新数据,同时我们继续从表存储中读取数据。
【解决方案2】:

您可以使用 TPL Dataflow 库来完成此操作,该库还可以优雅地处理将操作批处理为每个表批处理 100 个操作的要求。我还要强调的是,这段代码不会阻塞等待整个分区被读取,而是一次流式传输一批。这就是 Dataflow 库为您提供的。

public async Task MigratePartitionAsync<T>(CloudTable table, string oldPartitionKey, string newPartitionKey)
    where T : TableEntity, new()
{
    // batch up to 100 records per table operation
    var buffer = new BatchBlock<T>(100);

    // migrate the records
    var migrator = new ActionBlock<T[]>(async x => await MigrateRecordsAsync(table, x, newPartitionKey));

    // link the blocks and set them to propogate their completion
    buffer.LinkTo(migrator, new DataflowLinkOptions { PropagateCompletion = true });

    // read the old partition
    await ReadPartitionAsync(table, buffer, oldPartitionKey);

    await migrator.Completion;
}

public async Task ReadPartitionAsync<T>(CloudTable table, ITargetBlock<T> buffer, string partitionKey)
    where T : TableEntity, new()
{
    var results = table.CreateQuery<T>().Where(x => x.PartitionKey == partitionKey); 

    foreach (var record in results)
    {
        await buffer.SendAsync(record);
    }

    buffer.Complete();
}

public async Task MigrateRecordsAsync<T>(CloudTable table, IEnumerable<T> records, string newPartitionKey)
    where T : TableEntity, new()
{
    var deleteBatch = new TableBatchOperation();

    foreach (var element in records)
    {
        deleteBatch.Delete(element);
    }

    await table.ExecuteBatchAsync(deleteBatch);

    var insertBatch = new TableBatchOperation();

    foreach (var element in records)
    {
        element.PartitionKey = newPartitionKey;
        insertBatch.Insert(element);
    }

    await table.ExecuteBatchAsync(insertBatch);
}

你会这样使用它:

CloudTable table = GetCloudTable();

await MigratePartitionAsync<MyTableEntityClass>(table, "OldPartitionKey", "NewPartitionKey");

【讨论】:

    猜你喜欢
    • 2022-09-27
    • 1970-01-01
    • 1970-01-01
    • 2013-10-07
    • 2014-09-20
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多