【发布时间】:2020-02-14 17:07:14
【问题描述】:
我正在尝试使用节点 SDK 将项目写入 AWS dynamo db。我面临的问题是,当我使用线程以 parallel 将批处理项目写入 AWS 时,某些项目没有写入数据库。写入的项目数是随机的。例如,如果我运行我的代码 3 次,一次是 150,接下来是 200,第三次可能是 135。此外,当我在没有线程的情况下顺序编写项目时,即使这样,有些项目也是没有写。但是,在这种情况下,项目丢失较少。例如,如果项目总数为 300,则写入的项目为 298。我调查了问题以查看是否有任何未处理的项目,但 batchWrite 方法没有返回任何内容。这意味着所有项目都被正确处理。请注意,我对各自的数据库有 OnDemand 规定,因此我预计不会出现任何限制问题。所以这是我的代码。
exports.run = async function() {
**This is the function which runs first !!!!!**
const data = await getArrayOfObjects();
console.log("TOTAL PRICE CHANGES")
console.log(data.length)
const batchesOfData = makeBatches(data)
const threads = new Set();
console.log("**********")
console.log(batchesOfData.length)
console.log("**********")
for(let i = 0; i < batchesOfData.length; i++) {
console.log("BATCH!!!!!")
console.log(i)
console.log(batchesOfData[i].length)
// Sequential Approach
const response = await compensationHelper.createItems(batchesOfData[i])
console.log("RESPONSE")
console.log(response)
Parallel approach
// const workerResult = await runService(batchesOfData[i])
// console.log("WORKER RESUULT!!!!")
// console.log(workerResult);
}
}
exports.updateItemsInBatch = async function(data, tableName) {
console.log("WRITING DATA")
console.log(data.length)
const batchItems = {
RequestItems: {},
};
batchItems.RequestItems[tableName] = data;
try {
const result = await documentClient.batchWrite(batchItems).promise();
console.log("UNPROCESSED ITEMS")
console.log(result)
if (result instanceof Error) {
console.log(`[Error]: ${JSON.stringify(Error)}`);
throw new Error(result);
}
return Promise.resolve(true);
} catch (err) {
console.error(`[Error]: ${JSON.stringify(err.message)}`);
return Promise.reject(new Error(err));
}
};
exports.convertToAWSCompatibleFormat = function(data) {
const awsCompatibleData = [];
data.forEach(record => awsCompatibleData.push({ PutRequest: { Item: record } }));
return awsCompatibleData;
};
const createItems = async function(itemList) {
try {
const objectsList = [];
for (let index = 0; index < itemList.length; index++) {
try {
const itemListObj = itemList[index];
const ObjToBeInserted = {
// some data assignments here
};
objectsList.push(ObjToBeInserted);
if (
objectsList.length >= AWS_BATCH_SIZE ||
index === itemList.length - 1
) {
const awsCompatiableFormat = convertToAWSCompatibleFormat(
objectsList
);
await updateItemsInBatch(
awsCompatiableFormat,
process.env.myTableName
);
}
} catch (error) {
console.log(`[Error]: ${JSON.stringify(error)}`);
}
}
return Promise.resolve(true);
} catch (err) {
return Promise.reject(new Error(err));
}
};
const makeBatches = products => {
const productBatches = [];
let countr = -1;
for (let index = 0; index < products.length; index++) {
if (index % AWS_BATCH_SIZE === 0) {
countr++;
productBatches[countr] = [];
if (countr === MAX_BATCHES) {
break;
}
}
try {
productBatches[countr].push(products[index]);
} catch (error) {
continue;
}
}
return productBatches;
};
async function runService(workerData) {
return new Promise((resolve, reject) => {
const worker = new Worker(path.join(__dirname, './worker.js'), { workerData });
worker.on('message', resolve);
worker.on('error', reject);
worker.on('exit', (code) => {
if (code !== 0)
reject(new Error(`Worker stopped with exit code ${code}`));
})
})
}
// My worker file
'use strict';
const { workerData, parentPort } = require('worker_threads')
const creatItems = require('myscripts')
// You can do any heavy stuff here, in a synchronous way
// without blocking the "main thread"
console.log("I AM A NEW THREAD")
createItems(workerData)
// console.log('Going to write tons of content on file '+workerData);
parentPort.postMessage({ fileName: workerData, status: 'Done' })
【问题讨论】:
标签: node.js amazon-web-services amazon-dynamodb