【发布时间】:2019-10-04 08:08:17
【问题描述】:
我正在使用 AWS Javascript SDK 从 Kinesis Data Stream 消费。我想获取分片中的最新记录。
当我将 ShardIterator 类型指定为“最新”时,我没有得到任何记录。但是,当我使用“TRIM_HORIZON”时,我会取回所有记录。
kinesis.describeStream(describeParams, function(err, data) {
if (err) {
console.log(err, err.stack); // an error occurred
}
else {
var getParams = {
ShardId: data.StreamDescription.Shards[0].ShardId,
ShardIteratorType: "TRIM_HORIZON", //get oldest package
StreamName: streamName,
};
if(shardIteratorType){
console.log("you have passed some shardIteratorType:" + shardIteratorType);
getParams.ShardIteratorType = shardIteratorType;
}
kinesis.getShardIterator(getParams, function(err, result) {
if (err) {
console.log("Error in getShardIterator()");
console.log(err);
} else {
console.log("calling getRecord with shard iterator");
// Get records from the Kinesis stream
getRecord(result.ShardIterator);
}
});
}
});
function getRecord(shard_iterator) {
console.log("getRecord was called.");
var getRecParams = {
ShardIterator: shard_iterator
};
kinesis.getRecords(getRecParams, function(err, result) {
if (err) {
console.log("Error in getRecords() from the Kinesis stream.");
console.log(err);
} else {
try {
if(result.Records.length > 0) {
// Loop through all the packages
for(var i = 0; i < result.Records.length; i++) {
if(result.Records[i] != undefined) {
var getData = JSON.parse( decodeURIComponent
(escape(result.Records[i].Data)));
console.log(getData);
var table = document.getElementById("myTable");
var row = table.insertRow(0);
var j = i + 1 ;
var cell1 = row.insertCell(0);
cell1.innerHTML = getData ;
}
}
}
} catch(err) {
console.log("Error parsing the package.");
console.log(err);
}
}
});
}
在使用“LATEST”而不是“TRIM_HORIZON”作为分片迭代器类型时,我希望只获得最新记录而不是所有历史记录。我做错了什么?
【问题讨论】: