【问题标题】:BatchWrite in AWS dynamo db skipping some itemsAWS 发电机数据库中的 BatchWrite 跳过一些项目
【发布时间】:2020-02-14 17:07:14
【问题描述】:

我正在尝试使用节点 SDK 将项目写入 AWS dynamo db。我面临的问题是,当我使用线程以 parallel 将批处理项目写入 AWS 时,某些项目没有写入数据库。写入的项目数是随机的。例如,如果我运行我的代码 3 次,一次是 150,接下来是 200,第三次可能是 135。此外,当我在没有线程的情况下顺序编写项目时,即使这样,有些项目也是没有写。但是,在这种情况下,项目丢失较少。例如,如果项目总数为 300,则写入的项目为 298。我调查了问题以查看是否有任何未处理的项目,但 batchWrite 方法没有返回任何内容。这意味着所有项目都被正确处理。请注意,我对各自的数据库有 OnDemand 规定,因此我预计不会出现任何限制问题。所以这是我的代码。

 exports.run = async function() {

  **This is the function which runs first !!!!!**

  const data = await getArrayOfObjects();
  console.log("TOTAL PRICE CHANGES")  
  console.log(data.length)
  const batchesOfData = makeBatches(data)
  const threads = new Set();
  console.log("**********")
  console.log(batchesOfData.length)
  console.log("**********")
  for(let i = 0; i < batchesOfData.length; i++) {
    console.log("BATCH!!!!!")
    console.log(i)
    console.log(batchesOfData[i].length)  
    // Sequential Approach
    const response = await compensationHelper.createItems(batchesOfData[i])
    console.log("RESPONSE")
    console.log(response)

    Parallel approach
    // const workerResult = await runService(batchesOfData[i])
    // console.log("WORKER RESUULT!!!!")
    // console.log(workerResult);

  }
}

exports.updateItemsInBatch = async function(data, tableName) {
  console.log("WRITING DATA")
  console.log(data.length)
  const batchItems = {
    RequestItems: {},
  };

  batchItems.RequestItems[tableName] = data;
  try {
    const result = await documentClient.batchWrite(batchItems).promise();
    console.log("UNPROCESSED ITEMS")
    console.log(result)
    if (result instanceof Error) {
      console.log(`[Error]: ${JSON.stringify(Error)}`);
      throw new Error(result);
    }
    return Promise.resolve(true);
  } catch (err) {
    console.error(`[Error]: ${JSON.stringify(err.message)}`);
    return Promise.reject(new Error(err));
  }
};

exports.convertToAWSCompatibleFormat = function(data) {
  const awsCompatibleData = [];
  data.forEach(record => awsCompatibleData.push({ PutRequest: { Item: record } }));
  return awsCompatibleData;
};

const createItems = async function(itemList) {
  try {
    const objectsList = [];
    for (let index = 0; index < itemList.length; index++) {
      try {
        const itemListObj = itemList[index];
        const ObjToBeInserted = {
          // some data assignments here
        };

        objectsList.push(ObjToBeInserted);
        if (
          objectsList.length >= AWS_BATCH_SIZE ||
          index === itemList.length - 1
        ) {
            const awsCompatiableFormat = convertToAWSCompatibleFormat(
              objectsList
            );
            await updateItemsInBatch(
              awsCompatiableFormat,
              process.env.myTableName
            );
        }
      } catch (error) {
        console.log(`[Error]: ${JSON.stringify(error)}`);
      }
    }

    return Promise.resolve(true);
  } catch (err) {
    return Promise.reject(new Error(err));
  }
};

const makeBatches = products => {
  const productBatches = [];
  let countr = -1;
  for (let index = 0; index < products.length; index++) {
    if (index % AWS_BATCH_SIZE === 0) {
      countr++;
      productBatches[countr] = [];
      if (countr === MAX_BATCHES) {
        break;
      }
    }
    try {
      productBatches[countr].push(products[index]);
    } catch (error) {
      continue;
    }
  }
  return productBatches;
};

async function runService(workerData) {
  return new Promise((resolve, reject) => {
    const worker = new Worker(path.join(__dirname, './worker.js'), { workerData });
    worker.on('message', resolve);
    worker.on('error', reject);
    worker.on('exit', (code) => {
      if (code !== 0)
        reject(new Error(`Worker stopped with exit code ${code}`));
    })
  })
}

// My worker file
'use strict';

const { workerData, parentPort } = require('worker_threads')
const creatItems = require('myscripts')
// You can do any heavy stuff here, in a synchronous way
// without blocking the "main thread"
console.log("I AM A NEW THREAD")
createItems(workerData)
// console.log('Going to write tons of content on file '+workerData);
parentPort.postMessage({ fileName: workerData, status: 'Done' })

【问题讨论】:

    标签: node.js amazon-web-services amazon-dynamodb


    【解决方案1】:

    来自boto3 documentation

    如果以下一项或多项为真,则 DynamoDB 拒绝整个批量写入操作:

    One or more tables specified in the BatchWriteItem request does not exist.
    Primary key attributes specified on an item in the request do not match those in the corresponding table's primary key schema.
    You try to perform multiple operations on the same item in the same BatchWriteItem request. For example, you cannot put and delete the same item in the same BatchWriteItem request.
    Your request contains at least two items with identical hash and range keys (which essentially is two put operations).
    There are more than 25 requests in the batch.
    Any individual item in a batch exceeds 400 KB.
    The total request size exceeds 16 MB.
    

    在我看来,这其中有些是真的。在我的工作中,我们还遇到了一个问题,即批次中包含 2 个相同的主键和辅助键,因此整个批次都被丢弃了。我知道这不是 node.js,但我们使用 this 来解决这个问题。

    它是batch_writer(overwrite_by_pkeys),用于覆盖批处理中最后一次出现的相同主键和最后一个键。如果您的数据只有一小部分是重复数据并且您不需要保存它,您可以使用它。但如果您需要保存所有数据,我不建议您使用此功能。

    【讨论】:

    • 问题是updateItemsInBatch方法没有返回错误。如果我假设我的批次打破了这些条件之一,那么它将返回一些错误。但这种情况并非如此。如果我故意添加重复的键,那么它会返回错误但现在我没有收到错误:(.
    • 还有一件事需要注意,当我以串行方式发送请求时,如果我的整个批次都被丢弃,那么这个数字应该减少 25,但事实并非如此,只有 2 个被丢弃。
    • 只是为了方便起见,我的排序键是createdAt,它是2020-02-09T08:02:36.71 这意味着它作为重复的机会非常少,因为它在几分之一秒内完成。我还没有遇到任何重复键的问题。
    【解决方案2】:

    我看不到您在哪里查看UnprocessedItems 的回复。批处理操作通常会返回未处理的项目列表。据记载,BatchWriteItem“最多可写入 16 MB 的数据,其中可包含多达 25 个放置或删除请求。”

    【讨论】:

    • 我正在检查 updateItemsInBatch 方法中的响应。
    • 还有 try and catch 以防我遇到一些错误,然后它会在控制台上打印错误。但目前,我没有打印任何错误。
    • 您正在检查是否有错误,但如果没有错误,则会填充 UnprocessedItems。
    • 我实际上也打印了它们,但没有未处理项目的列表。
    【解决方案3】:

    我遇到了重复键问题,这意味着主键和排序键在批处理中具有重复值,但是,在我的情况下,如果我的时间戳以秒为单位2020-02-09T08:02:36.71,AWS BatchWrite 方法不会返回此错误,这有点令人惊讶。我解决了这个问题,使我的 createdAt(sort key) 像这样更细化 => 2020-02-09T08:02:36.7187 从而使其不重复。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2019-03-16
      • 2017-04-17
      • 2022-07-07
      • 2016-12-24
      • 2015-03-18
      • 1970-01-01
      相关资源
      最近更新 更多