【问题标题】:How to write more than 25 items/rows into Table for DynamoDB?如何将超过 25 个项目/行写入 DynamoDB 表?
【发布时间】:2015-09-12 23:28:18
【问题描述】:

我对 Amazon DynamoDB 很陌生。我目前有 20000 行需要添加到表中。但是,根据我所阅读的内容,似乎我一次最多只能写入 25 行,使用带有 25 个 WriteRequest 的 BatchWriteItem 类。有可能增加这个吗?如何一次写入超过 25 行?目前写入所有 20000 行大约需要 15 分钟。谢谢。

【问题讨论】:

    标签: database amazon-web-services amazon-dynamodb amazon-redshift


    【解决方案1】:

    您最多只能在一个 BatchWriteItem 请求中发送 25 个项目,但您可以一次发送任意数量的 BatchWriteItem 请求。假设您有provisioned enough write throughput,您应该能够通过在多个线程/进程/主机之间拆分这 20k 行并将它们并行推送到数据库来显着加快速度。

    对于这么小的数据集来说,它可能有点重量级,但您可以使用 AWS Data Pipeline 从 S3 提取数据。它基本上自动化了创建 Hadoop 集群的过程,以从 S3 中吸取您的数据,并通过一组并行 BatchWriteItem 请求将其发送到 DynamoDB。

    【讨论】:

    • 谢谢大卫。我会尝试使用一些并行线程。什么是预置吞吐量?
    • 如果我使用 AWS Data Pipeline,这是否意味着我应该将我的应用程序中的所有数据输出到 S3 中?输出到 S3 ==> 数据管道 ==> DynamoDB 的好处是与。直接写DynamoDB写文件到S3的速度?
    • 我在一个链接中编辑了有关预置吞吐量的更多信息,但简短的故事是,您在创建表时提前告诉 DynamoDB 您希望能够执行多少次读取/写入它每秒。如果您发送请求的速度比这快,超出的请求将被拒绝。
    • 是的,您需要以某种方式将所有数据传输到 S3 才能使用数据管道。如果它已经全部在您的应用程序本地,那么首先将其上传到 S3 然后导入到 DynamoDB 几乎肯定会比直接并行上传到 DynamoDB 慢。如果您的应用程序从其他地方获取它并且该地方可能是 S3,那么数据管道将使您不必编写代码来进行并行传输。
    • 谢谢大卫!很有帮助。
    【解决方案2】:

    我一直在寻找一些代码来使用 JavaScript SDK 执行此操作。我没找到,所以我自己整理了一下。我希望这对其他人有帮助!

    function multiWrite(table, data, cb) {
        var AWS = require('aws-sdk');
        var db = new AWS.DynamoDB.DocumentClient({region: 'us-east-1'});
    
        // Build the batches
        var batches = [];
        var current_batch = [];
        var item_count = 0;
        for(var x in data) {
            // Add the item to the current batch
            item_count++;
            current_batch.push({
                PutRequest: {
                    Item: data[x]
                }
            });
            // If we've added 25 items, add the current batch to the batches array
            // and reset it
            if(item_count%25 == 0) {
                batches.push(current_batch);
                current_batch = [];
            }
        }
        // Add the last batch if it has records and is not equal to 25
        if(current_batch.length > 0 && current_batch.length != 25) batches.push(current_batch);
    
        // Handler for the database operations
        var completed_requests = 0;
        var errors = false;
        function handler(request) {
            return function(err, data) {
                // Increment the completed requests
                completed_requests++;
    
                // Set the errors flag
                errors = (errors) ? true : err;
    
                // Log the error if we got one
                if(err) {
                    console.error(JSON.stringify(err, null, 2));
                    console.error("Request that caused database error:");
                    console.error(JSON.stringify(request, null, 2));
                }
    
                // Make the callback if we've completed all the requests
                if(completed_requests == batches.length) {
                    cb(errors);
                }
            }
        }
    
        // Make the requests
        var params;
        for(x in batches) {
            // Items go in params.RequestItems.id array
            // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
            params = '{"RequestItems": {"' + table + '": []}}';
            params = JSON.parse(params);
            params.RequestItems[table] = batches[x];
    
            // Perform the batchWrite operation
            db.batchWrite(params, handler(params));
        }
    }
    

    【讨论】:

    【解决方案3】:
    function putInHistory(data,cb) {
      var arrayOfArray25 = _.chunk(data, 25);
      async.every(arrayOfArray25, function(arrayOf25, callback) {
       var params = {
         RequestItems: {
        [TABLES.historyTable]: []
       }
     };
     arrayOf25.forEach(function(item){
      params.RequestItems[TABLES.historyTable].push({
        PutRequest: {
          Item: item
        }
      })
     });
     docClient.batchWrite(params, function(err, data) {
       if (err){ 
         console.log(err);
         callback(err);
       } else {
         console.log(data);
         callback(null, true);
       };
     });
    }, function(err, result) {
     if(err){
       cb(err);
     } else {
       if(result){
         cb(null,{allWritten:true});
       } else {
        cb(null,{allWritten:false});
       }
     }
    });
    }
    

    您可以使用 lodash 从数组中生成数据块,然后使用异步库的 each/every 方法对 25 个元素的块执行 batchWrite

    【讨论】:

      【解决方案4】:

      来自@Geerek 的回答是带有 lambda 函数的解决方案:

      exports.handler = (event, context, callback) => {
        console.log(`EVENT: ${JSON.stringify(event)}`);
      
        var AWS = require('aws-sdk');
      
        AWS.config.update({ region: process.env.REGION })
      
        var docClient = new AWS.DynamoDB.DocumentClient();
      
        const {data, table, cb} = event
      
        // Build the batches
        var batches = [];
        var current_batch = [];
        var item_count = 0;
      
        for (var i = 0; i < data.length; i++) {
          // Add the item to the current batch
          item_count++
          current_batch.push({
            PutRequest: {
              Item: data[i],
            },
          })
          // If we've added 25 items, add the current batch to the batches array
          // and reset it
          if (item_count % 25 === 0) {
            batches.push(current_batch)
            current_batch = []
          }
        }
      
        // Add the last batch if it has records and is not equal to 25
        if (current_batch.length > 0 && current_batch.length !== 25) {
          batches.push(current_batch)
        }
      
        // Handler for the database operations
        var completed_requests = 0
        var errors = false
      
        function handler (request) {
      
          console.log('in the handler: ', request)
      
          return function (err, data) {
            // Increment the completed requests
            completed_requests++;
      
            // Set the errors flag
            errors = (errors) ? true : err;
      
            // Log the error if we got one
            if(err) {
              console.error(JSON.stringify(err, null, 2));
              console.error("Request that caused database error:");
              console.error(JSON.stringify(request, null, 2));
              callback(err);
            }else {
              callback(null, data);
            }
      
            // Make the callback if we've completed all the requests
            if(completed_requests === batches.length) {
              cb(errors);
            }
          }
        }
      
        // Make the requests
        var params;
        for (var j = 0; j < batches.length; j++) {
          // Items go in params.RequestItems.id array
          // Format for the items is {PutRequest: {Item: ITEM_OBJECT}}
          params = '{"RequestItems": {"' + table + '": []}}'
          params = JSON.parse(params)
          params.RequestItems[table] = batches[j]
      
          console.log('before db.batchWrite: ', params)
      
          // Perform the batchWrite operation
          docClient.batchWrite(params, handler(params))
        }
      };
      

      【讨论】:

        【解决方案5】:

        我写了一个npm package,它应该可以作为batchWrite 方法的简单替代品,您只需将dynamoDB 实例作为第一个参数传递就可以了: https://www.npmjs.com/package/batch-write-all

        查看项目自述文件中的示例:

        // Use bellow instead of this: dynamodb.batchWrite(params).promise();
        batchWriteAll(dynamodb, params).promise();
        

        【讨论】:

          【解决方案6】:

          使用 aws cli 和 aws-vault,这就是我所做的。

          假设您有以下包含 1000 行的文件 (data.json)

          { "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
          { "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
          { "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}},
          ... to 1000
          

          你需要将它拆分成块文件,每个文件有 25 行!

          我在 LinqPad 中使用以下 c# 代码生成 .sh 文件和 json 块,以便能够使用 aws cli 将它们插入到 dynamodb 中

          void Main()
          {
          var sourcePath= @"D:\data\whereYourMainJsonFileIsLocated\";
          var sourceFilePath = @"data.json";
          
          var awsVaultProfileName = "dev";
          var env = "dev"; 
          var tableName = "dynamodb-table-name";
          
          var lines = System.IO.File.ReadAllLines(sourcePath + sourceFilePath);
          
          var destinationPath = Path.Combine(sourcePath, env);
          var destinationChunkPath = Path.Combine(sourcePath, env, "chunks");
          
          if (!System.IO.Directory.Exists(destinationChunkPath))
              System.IO.Directory.CreateDirectory(destinationChunkPath);
          
          System.Text.StringBuilder shString= new System.Text.StringBuilder();
          
          for (int i = 0; i < lines.Count(); i = i+25)
          {
              var pagedLines = lines.Skip(i).Take(25).ToList().Distinct().ToList();
          
              System.Text.StringBuilder sb = new System.Text.StringBuilder();
              sb.AppendLine("{");
              sb.AppendLine($"  \"{tableName}\": [");
              
              foreach (var element in pagedLines)
              {
                  if (element == pagedLines.Last())
                      sb.AppendLine(element.Substring(0, element.Length-1));
                  else
                      sb.AppendLine(element);
              }
              
              sb.AppendLine("]");
              sb.AppendLine("}");
          
              var fileName = $"chunk{i / 25}.json";
              System.IO.File.WriteAllText(Path.Combine(destinationChunkPath, fileName), sb.ToString(), Encoding.Default);
          
          
              shString.AppendLine($@"aws-vault.exe exec {awsVaultProfileName} -- aws dynamodb batch-write-item --request-items file://chunks/{fileName}");
          }
          
          System.IO.File.WriteAllText(Path.Combine(destinationPath, $"{tableName}-{env}.sh"), shString.ToString(), Encoding.Default);
          }
          

          结果将是块文件,如 chunk0.json、chunk1.json 等

          {
            "dynamodb-table-name": [
              { "PutRequest": { "Item": { "PKey": { "S": "1" }, "SKey": { "S": "A" }}}},
              { "PutRequest": { "Item": { "PKey": { "S": "2" }, "SKey": { "S": "B" }}}},
              { "PutRequest": { "Item": { "PKey": { "S": "3" }, "SKey": { "S": "C" }}}}
            ]
          }
          

          和.sh文件

          aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk0.json
          aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk1.json
          aws-vault.exe exec dev -- aws dynamodb batch-write-item --request-items file://chunks/chunk2.json
          

          最后只需运行 .sh 文件,您的表中就有了所有数据!

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2020-02-19
            • 1970-01-01
            • 1970-01-01
            • 2016-12-07
            • 1970-01-01
            • 2020-03-13
            • 1970-01-01
            • 1970-01-01
            相关资源
            最近更新 更多