【问题标题】:Get Records from Kinesis using sequence number and partition ID使用序列号和分区 ID 从 Kinesis 获取记录
【发布时间】:2016-07-06 05:38:24
【问题描述】:

我编写了一个代码来从 kinesis 流中获取记录到一个 lambda 函数,该函数给出数据、分区 ID 和序列号的输出负载,然后我尝试调用第二个 lambda 以从第一个获取序列号和分区 ID Lambda,然后第二个 lambda 从 kinesis 流中提取数据。我一直坚持使用序列号和分区 ID 从 kinesis 流中获取数据。

下面是调用一个 lambda 到另一个的代码。

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
        private static final String regionName = "us-east-1";
        private static final String functionName = "Test1";


    @Override
    public Object handleRequest(KinesisEvent input, Context context) {
        context.getLogger().log("Input: " + input);
        List<KinesisEventRecord> records = input.getRecords();
        for (KinesisEventRecord rec : records){
            ByteBuffer recdata = rec.getKinesis().getData();
             String data = new String( recdata.array(), Charset.forName("UTF-8") );
            context.getLogger().log("Data: " +data);
            context.getLogger().log("Partition key: " +rec.getKinesis().getPartitionKey());
            context.getLogger().log("Sequence Number: " +rec.getKinesis().getSequenceNumber());
        }

        //call another lambda function
        try {
            AWSLambdaClient lambda = new AWSLambdaClient();
            Region region = Region.getRegion(Regions.fromName(regionName));
            lambda.setRegion(region);
            InvokeRequest invokeRequest = new InvokeRequest();
            invokeRequest.setFunctionName(functionName);
            invokeRequest.setPayload("\" AWS Lambda Test - internal call\"");
            System.out.println(
                    lambda.invoke(invokeRequest).getPayload());
        } catch (Exception e) {
            System.out.println(e.getMessage());

        }

        // TODO: implement your handler
        return null;
    }

}

这是我尝试使用序列号和分区 ID 获取记录的代码。

public class LambdaFunctionHandler implements RequestHandler<KinesisEvent, Object> {
private static final String streamName = "Test";
private static final String partitionKey = "123456676454";
private static final String sequenceNumber = "12345"

public Object handleRequest(KinesisEvent input, Context context) {
    context.getLogger().log("Input: " + input);
    GetRecordsRequest getRecordsRequest = new GetRecordsRequest();
    getRecordsRequest.setStreamName(streamName);
    getRecordsRequest.setPartitionKey(partitionKey);
    getRecordsRequest.setsequenceNumber(sequenceNumber);
    KinesisEvent.getRecord(getRecord);

}
        }

请告诉我一种使用序列号和分区 ID 从 kinesis 流中获取记录的方法。

【问题讨论】:

    标签: java amazon-web-services lambda amazon-kinesis


    【解决方案1】:

    在 Lambda 函数内,您无法决定要读取的流或位置。 在运行 Lambda 函数之前,您需要在您的 kinesis 流和此 lambda 函数之间创建映射。 这将在每次新记录到达时触发 lambda。

    基本上,Lambda 函数无法决定自己的输入。

    【讨论】:

      猜你喜欢
      • 2020-07-26
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多