【问题标题】:How can I get latest records in Amazon Kinesis consumer when working with Javascript SDK?使用 Javascript SDK 时,如何在 Amazon Kinesis 消费者中获取最新记录?
【发布时间】:2019-10-04 08:08:17
【问题描述】:

我正在使用 AWS Javascript SDK 从 Kinesis Data Stream 消费。我想获取分片中的最新记录。

当我将 ShardIterator 类型指定为“最新”时,我没有得到任何记录。但是,当我使用“TRIM_HORIZON”时,我会取回所有记录。

kinesis.describeStream(describeParams, function(err, data) {
            if (err) {
                console.log(err, err.stack);    // an error occurred
            }
            else {
                var getParams = {
                    ShardId: data.StreamDescription.Shards[0].ShardId,
                    ShardIteratorType: "TRIM_HORIZON",     //get oldest package
                    StreamName: streamName,
                };

                if(shardIteratorType){
                    console.log("you have passed some shardIteratorType:" + shardIteratorType);
                    getParams.ShardIteratorType = shardIteratorType;
                }

                kinesis.getShardIterator(getParams, function(err, result) {
                    if (err) {
                        console.log("Error in getShardIterator()");
                        console.log(err);
                    } else {
                        console.log("calling getRecord with shard iterator");
                        // Get records from the Kinesis stream
                        getRecord(result.ShardIterator);
                    }
                });
            }
        });

function getRecord(shard_iterator) {
    console.log("getRecord was called.");
    var getRecParams = {
        ShardIterator: shard_iterator
    };

    kinesis.getRecords(getRecParams, function(err, result) {
        if (err) {
            console.log("Error in getRecords() from the Kinesis stream.");
            console.log(err);
        } else {
            try {

            if(result.Records.length > 0) {
                    // Loop through all the packages
               for(var i = 0; i < result.Records.length; i++) {
                   if(result.Records[i] != undefined) {
                        var getData = JSON.parse( decodeURIComponent
(escape(result.Records[i].Data)));
                        console.log(getData);
                        var table = document.getElementById("myTable");
                        var row = table.insertRow(0);
                        var j = i + 1 ;
                        var cell1 = row.insertCell(0);


                        cell1.innerHTML = getData ;

                    }
                }
            }
            } catch(err) {
                console.log("Error parsing the package.");
                console.log(err);
            }
        }
    });
}

在使用“LATEST”而不是“TRIM_HORIZON”作为分片迭代器类型时,我希望只获得最新记录而不是所有历史记录。我做错了什么?

【问题讨论】:

    标签: javascript amazon-kinesis


    【解决方案1】:

    来自文档https://docs.aws.amazon.com/kinesis/latest/APIReference/API_GetShardIterator.html

    请求中可以指定分片迭代器类型AT_TIMESTAMP从任意时间点读取记录,TRIM_HORIZON使ShardIterator指向系统分片中最后一条未修剪的记录(分片中最旧的数据记录) ) 或 LATEST,以便您始终读取分片中的最新数据。

    这里 LATEST 意味着“现在”跳过最新检查点和现在之间的所有记录。

    使用 LATEST 作为分片迭代器类型,你可以认为它读取的是使用 getShardIterator 函数返回的分片迭代器之后的记录。

    您可以参考下面的示例,它如何与“最新”一起使用:

    kinesis.describeStream(describeParams, (err, streamData) => {
        // Skipping error handling
    
        kinesis.getShardIterator(getShardIteratorParams, (err, shardIterData) => {
            let shardIterator = shardIterData.ShardIterator;
    
            // Keep reading records from the stream
            while (true) {
                let getRecParams = {
                    ShardIterator: shardIterator
                };
                kinesis.getRecords(getRecParams, (err, recData) => {
                    // Skipping error handling
    
                    if (recData.Records.length > 0) {
                       // Do something
    
                       shardIterator = recData.NextShardIterator;
                    }
                });
                // Break if you need
            }
        });
    });
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-28
      • 2022-01-20
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多