【问题标题】:Can I customize partitioning in Kinesis Firehose before delivering to S3?我可以在交付到 S3 之前在 Kinesis Firehose 中自定义分区吗?
【发布时间】:2018-12-21 04:11:27
【问题描述】:

我有一个 Firehose 流,旨在从不同来源和不同事件类型中提取数百万个事件。流应将所有数据传送到一个 S3 存储桶,作为原始\未更改数据的存储。

我正在考虑根据嵌入在事件消息中的元数据(如事件源、事件类型和事件日期)在 S3 中对这些数据进行分区。

但是,Firehose 遵循基于记录到达时间的默认分区。是否可以自定义此分区行为以满足我的需求?

更新:已接受的答案已更新为新答案表明该功能自 2021 年 9 月起可用

【问题讨论】:

  • @JohnRotenstein 不幸的是,答案并没有解决这个问题。两者都建议附加一个 lambda 函数,该函数将基于特定 ID 将传入数据路由到不同的流。这个问题和另一个问题是解决是否可以定义消防软管的分区方法。不过还是谢谢你的参考!!

标签: amazon-s3 amazon-kinesis-firehose


【解决方案1】:

自 2021 年 9 月 1 日起,AWS Kinesis Firehose 支持此功能。阅读the announcement blog post here

来自文档:

您可以使用 Key 和 Value 字段来指定要用作动态分区键和 jq 查询的数据记录参数以生成动态分区键值。 ...

从 UI 看是这样的:

【讨论】:

  • 这太棒了!
【解决方案2】:

没有。您不能根据事件内容进行“分区”。

一些选项是:

  • 发送到单独的 Firehose 流
  • 发送到 Kinesis Data Stream(而不是 Firehose)并编写您自己的自定义 Lambda 函数来处理和保存数据(请参阅:AWS Developer Forums: Athena and Kinesis Firehose
  • 使用 Kinesis Analytics 处理消息并将其“定向”到不同的 Firehose 流

如果您打算将输出与 Amazon Athena 或 Amazon EMR 一起使用,您还可以考虑将其转换为 Parquet 格式,其中包含 much better performance。这需要将 S3 中的数据作为批处理进行后处理,而不是在数据到达流时对其进行转换。

【讨论】:

    【解决方案3】:

    根据 John 的回答,如果您没有近乎实时的流式传输要求,我们发现使用 Athena 进行批处理对我们来说是一个简单的解决方案。

    Kinesis 流到给定表 unpartitioned_event_data,它可以使用 native record arrival time partitioning

    我们定义了另一个 Athena 表 partitioned_event_table,可以使用自定义分区键进行定义,并利用 Athena 拥有的 INSERT INTO 功能。 Athena 将自动以您想要的格式重新分区您的数据,而无需任何自定义使用者或新的基础架构来管理。这可以使用 cron、SNS 或 Airflow 之类的东西来安排。

    很酷的是,您可以创建一个视图,对两个表执行UNION,以便在一个地方查询历史和实时数据。

    我们实际上在 Radar 和 talk about more trade-offs in this blog post 处理过这个问题。

    【讨论】:

      【解决方案4】:

      在撰写本文时,Vlad 提到的动态分区功能仍然很新。我需要它成为 CloudFormation 模板的一部分,该模板仍未正确记录。我必须添加DynamicPartitioningConfiguration 才能使其正常工作。 MetadataExtractionQuery 语法也没有正确记录。

        MyKinesisFirehoseStream:
          Type: AWS::KinesisFirehose::DeliveryStream
          ...
          Properties:
            ExtendedS3DestinationConfiguration:
              Prefix: "clients/client_id=!{client_id}/dt=!{timestamp:yyyy-MM-dd}/"
              ErrorOutputPrefix: "errors/!{firehose:error-output-type}/"
              DynamicPartitioningConfiguration:
                Enabled: "true"
                RetryOptions:
                  DurationInSeconds: "300"
              ProcessingConfiguration:
                Enabled: "true"
                Processors:
                  - Type: AppendDelimiterToRecord
                  - Type: MetadataExtraction
                    Parameters:
                      - ParameterName: MetadataExtractionQuery
                        ParameterValue: "{client_id:.client_id}"
                      - ParameterName: JsonParsingEngine
                        ParameterValue: JQ-1.6
      
      

      【讨论】:

        【解决方案5】:

        为了扩展 Murali 的答案,我们在 CDK 中实现了它:

        我们传入的 json 数据如下所示:

        {
            "data": 
                {
                "timestamp":1633521266990,
                "defaultTopic":"Topic",
                "data":
                {
                    "OUT1":"Inactive",
                    "Current_mA":3.92
                }
            }
        }
        

        CDK 代码如下:

        const DeliveryStream = new CfnDeliveryStream(this, 'deliverystream', {
          deliveryStreamName: 'deliverystream',
          extendedS3DestinationConfiguration: {
            cloudWatchLoggingOptions: {
              enabled: true,
            },
            bucketArn: Bucket.bucketArn,
            roleArn: deliveryStreamRole.roleArn,
            prefix: 'defaultTopic=!{partitionKeyFromQuery:defaultTopic}/!{timestamp:yyyy/MM/dd}/',
            errorOutputPrefix: 'error/!{firehose:error-output-type}/',
            bufferingHints: {
              intervalInSeconds: 60,
            },
            dynamicPartitioningConfiguration: {
              enabled: true,
            },
            processingConfiguration: {
              enabled: true,
              processors: [
                {
                  type: 'MetadataExtraction',
                  parameters: [
                    {
                      parameterName: 'MetadataExtractionQuery',
                      parameterValue: '{Topic: .data.defaultTopic}',
                    },
                    {
                      parameterName: 'JsonParsingEngine',
                      parameterValue: 'JQ-1.6',
                    },
                  ],
                },
                {
                  type: 'AppendDelimiterToRecord',
                  parameters: [
                    {
                      parameterName: 'Delimiter',
                      parameterValue: '\\n',
                    },
                  ],
                },
              ],
            },
          },
        })
        

        【讨论】:

        • 你知道如何将 2 个字段用作 2 个单独的值吗?
        猜你喜欢
        • 2019-05-06
        • 2018-06-02
        • 1970-01-01
        • 2018-01-03
        • 2019-05-09
        • 2016-02-03
        • 2019-09-05
        • 2017-06-27
        • 2018-01-07
        相关资源
        最近更新 更多