【问题标题】:Writing data from Kinesis to S3将数据从 Kinesis 写入 S3
【发布时间】:2017-08-07 11:49:43
【问题描述】:

我正在使用 AWS 开发工具包从将数据发布到 Kinesis 流的 Java 应用程序中写入数据。使用以下代码一次分批完成 10 条记录;

// Convert to JSON object, and then to bytes...
                ObjectWriter ow = new ObjectMapper().writer().withDefaultPrettyPrinter();
                String json = ow.writeValueAsString(transaction);

                // Add byte array to PutRecordsRequestEntry
                PutRecordsRequestEntry record = new PutRecordsRequestEntry();
                record.setPartitionKey(String.valueOf(java.util.UUID.randomUUID()));
                record.setData(ByteBuffer.wrap(json.getBytes()));

                // Add to list...
                batch.add(record);

                // Check and send batches
                if(counter>=batchLimit){

                    logger.info("Sending batch of " + batchLimit + " rows.");

                    putRecordsRequest.setRecords(batch);
                    PutRecordsResult result = amazonKinesisClient.putRecords(putRecordsRequest);
                    batch = new ArrayList<>();
                    counter=0;

                }else{
                    counter++;
                }

然后我有一个 nodejs lambda 函数,它会在 Kinesis 上收到的每个事务上触发,其想法是让它写入来自 Kinesis 的事务,并将它们放入一个数据流中,以便将它们保存到 S3 .

    var AWS = require('aws-sdk');
var firehose = new AWS.Firehose();

exports.handler = function(event, context) {

    console.log(event);

    var params = {
        DeliveryStreamName: "transaction-postings",
        Record: { 
            Data:  decodeURIComponent(event)
        }
    };
    firehose.putRecord(params, function(err, data) {
        if (err) console.log(err, err.stack); // an error occurred
        else    {  
            console.log(data);           // successful response
        }

        context.done();
    });
};

但是,在查看 S3 上的数据时,我看到的只是以下内容,而不是我所期望的 JSON 对象列表...

[object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object][object Object]

谁能指出我在将数据从 Kinesis 流式传输到 s3(作为 JSON 对象)时缺少什么?

【问题讨论】:

    标签: java aws-lambda amazon-kinesis amazon-kinesis-firehose


    【解决方案1】:
    Data:  decodeURIComponent(event)
    

    您需要序列化事件,因为 Lambda 会自动反序列化参数。即:

    Data: JSON.stringify(decodeURIComponent(event))
    

    【讨论】:

    • 刚刚做了更改,仍然得到“[object Object]”“[object Object]”“[object Object]”“[object Object]”“[object Object]”“[object Object ]""[对象对象]""[对象对象]""[对象对象]""[对象对象]""[对象对象]""[对象对象]""[对象对象]""[对象对象]" "[作为结果的对象。
    • 实际上,当我将该行代码更改为“数据:事件”时,我在 S3 中得到了例如:{“Records”:[{“kinesis”:{“kinesisSchemaVersion”:“1.0”, “partitionKey”: “2a4bb9d9-a023-4c03-8616-ef3e7c567459”, “的sequenceNumber”: “49571132156681255058105982946244422009241197082071531522”, “数据”:“ewogICJyb3dJZCIgOiA3MjEzMTU0NSwKICAi ......为什么我没有拿到实际的JSON对象发送上述
    • 我想是因为设置数据的时候record.setData(ByteBuffer.wrap(json.getBytes()))...需要转回utf8。
    • 这种格式是您通常从带有 kinesis 的 lambda 收到的格式。获取每个数据元素,对其进行解码(它在 base64 中),您将得到原始数据
    • 在您引导我找到问题时接受您的答案。
    【解决方案2】:

    对于那些想知道需要更改代码的人...要将生产者发送的实际消息写入 S3,需要对 PutRecordsRequestEntry 的数据属性进行解码。换句话说,这些代码块显示了使用的依赖关系,用于解析 Kinesis 流中数据的 lambda...

    var AWS = require('aws-sdk');
    var firehose = new AWS.Firehose();
    var firehoseStreamName = "transaction-postings";
    
    exports.handler = function(event, context) {
    
        // This is the actual transaction, encapsulated with Kinesis Put properties
        var transaction = event;
    
        // Convert data object because this is all that we need
        var buf = new Buffer(transaction.data, "base64");
    
        // Convert to actual string which is readable
        var jsonString = buf.toString("utf8");
    
        // Prepare storage to postings firehose stream...
        var params = { 
            DeliveryStreamName: firehoseStreamName, 
            Record: { 
                Data:  jsonString
            }
        };
    
        // Store data!
        firehose.putRecord(params, function(err, data) {
            if (err) {
    
                // This needs to be fired to Kinesis in the future...
                console.log(err, err.stack); 
            }
            else{  
                console.log(data);            
            }
    
            context.done();
        });
    };
    

    这是因为使用下面的 AWS 生产者依赖项发送的记录

    <dependency>
            <groupId>com.amazonaws</groupId>
            <artifactId>amazon-kinesis-producer</artifactId>
            <version>0.12.3</version>
        </dependency>
    

    看起来像这样;

    {
      "kinesisSchemaVersion": "1.0",
      "partitionKey": "cb3ff3cd-769e-4d48-969d-918b5378e81b",
      "sequenceNumber": "49571132156681255058105982949134963643939775644952428546",
      "data": "[base64 string]",
      "approximateArrivalTimestamp": 1490191017.614
    }
    

    【讨论】:

      猜你喜欢
      • 2016-03-31
      • 1970-01-01
      • 1970-01-01
      • 2018-01-07
      • 2019-09-06
      • 2021-05-11
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多