【问题标题】:Write parquet from AWS Kinesis firehose to AWS S3将镶木地板从 AWS Kinesis firehose 写入 AWS S3
【发布时间】:2018-01-07 21:22:16
【问题描述】:

我想将 kinesis firehose 中的数据提取到 s3 中,格式为镶木地板。到目前为止,我刚刚找到了一个暗示创建 EMR 的解决方案,但我正在寻找更便宜、更快的方法,比如直接从 firehose 将接收到的 json 存储为镶木地板或使用 Lambda 函数。

非常感谢, 哈维。

【问题讨论】:

    标签: json amazon-web-services amazon-s3 parquet amazon-kinesis-firehose


    【解决方案1】:

    Amazon Kinesis Firehose 接收流式记录并可以将它们存储在 Amazon S3(或 Amazon Redshift 或 Amazon Elasticsearch Service)中。

    每条记录最大可达 1000KB。

    但是,记录会一起附加到文本文件中,并根据时间或大小进行批处理。传统上,记录是 JSON 格式。

    您将无法发送镶木地板文件,因为它不符合此文件格式。

    可以触发 Lambda 数据转换函数,但这也不能输出 parquet 文件。

    事实上,鉴于 parquet 文件的性质,您不太可能一次构建一条记录。作为一种列式存储格式,我怀疑它们确实需要批量创建,而不是为每条记录附加数据。

    底线:不。

    【讨论】:

    • 嗨@Javi,如果这个或任何答案解决了您的问题,请点击复选标记考虑accepting it。这向更广泛的社区表明您已经找到了解决方案,并为回答者和您自己提供了一些声誉。没有义务这样做。
    • @JohnRotenstein 您能否让 lambda 对 Firehose 中的每个缓冲时间/大小批次进行转换,然后每隔几个小时左右将 Parquet 文件连接到更大的大小?这使您可以通过 Firehose 将 JSON 流式传输到 Parquet,以在 Athena 中获取近乎实时的数据,同时仍然获得 Parquet 的性能优势。
    • @cmclen,Parquet 是一种柱状文件格式。我认为你不能一次只追加一行——这会破坏使用 Parquet 的目的。
    • @JohnRotenstein 您不能(直到 12 天前:cf Vlad 的回答)依赖 Firehose 将转换后的数据为您转储到 S3,但您可以使用 S3FS 或就像布拉卡纳指出的那样。如果您希望 Firehose 显示为已成功,则只需为 Firehose 返回格式正确的行(通常只需添加一个 processes_at 时间戳并按原样返回输入行)。如果您不依赖pandas,也可以直接在lambda中执行此操作,因为pandas的库太大而无法将其打包到Lambda中(最大50MB)。
    【解决方案2】:

    在处理了 AWS 支持服务和一百种不同的实施之后,我想解释一下我所取得的成就。

    最后,我创建了一个 Lambda 函数,它处理 Kinesis Firehose 生成的每个文件,根据负载对我的事件进行分类,并将结果存储在 S3 中的 Parquet 文件中。

    做到这一点并不容易:

    1. 首先,您应该创建一个 Python 虚拟环境,包括所有必需的库(在我的例子中是 Pandas、NumPy、Fastparquet 等)。 由于生成的文件(包括所有库和我的 Lambda 函数很重,需要启动一个 EC2 实例,我使用了免费套餐中包含的那个)。要创建虚拟环境,请按照以下步骤操作:

      • 在 EC2 中登录
      • 创建一个名为 lambda(或任何其他名称)的文件夹
      • 须藤 yum -y 更新
      • 须藤 yum -y 升级
      • sudo yum -y groupinstall "开发工具"
      • sudo yum -y install blas
      • sudo yum -y install lapack
      • sudo yum -y install atlas-sse3-devel
      • sudo yum install python27-devel python27-pip gcc
      • 虚拟环境
      • 源环境/bin/激活
      • pip install boto3
      • pip install fastparquet
      • 点安装熊猫
      • pip install thriftpy
      • 点安装 s3fs
      • pip install(任何其他需要的库)
      • 找到 ~/lambda/env/lib*/python2.7/site-packages/ -name "*.so" | xargs 条
      • 推送 env/lib/python2.7/site-packages/
      • zip -r -9 -q ~/lambda.zip *
      • 流行音乐
      • 推送 env/lib64/python2.7/site-packages/
      • zip -r -9 -q ~/lambda.zip *
      • 流行音乐
    2. 正确创建 lambda_function:

      import json
      import boto3
      import datetime as dt
      import urllib
      import zlib
      import s3fs
      from fastparquet import write
      import pandas as pd
      import numpy as np
      import time
      
      def _send_to_s3_parquet(df):
          s3_fs = s3fs.S3FileSystem()
          s3_fs_open = s3_fs.open
          # FIXME add something else to the key or it will overwrite the file 
          key = 'mybeautifullfile.parquet.gzip'
          # Include partitions! key1 and key2
          write( 'ExampleS3Bucket'+ '/key1=value/key2=othervalue/' + key, df,
                  compression='GZIP',open_with=s3_fs_open)            
      
      def lambda_handler(event, context):
          # Get the object from the event and show its content type
          bucket = event['Records'][0]['s3']['bucket']['name']
          key = urllib.unquote_plus(event['Records'][0]['s3']['object']['key'])
          try:
              s3 = boto3.client('s3')
              response = s3.get_object(Bucket=bucket, Key=key)
              data = response['Body'].read()
              decoded = data.decode('utf-8')
              lines = decoded.split('\n')
              # Do anything you like with the dataframe (Here what I do is to classify them 
              # and write to different folders in S3 according to the values of
              # the columns that I want
              df = pd.DataFrame(lines)
              _send_to_s3_parquet(df)
          except Exception as e:
              print('Error getting object {} from bucket {}.'.format(key, bucket))
              raise e
      
    3. 将 lambda 函数复制到 lambda.zip 并部署 lambda_function:

      • 返回您的 EC2 实例并将所需的 lambda 函数添​​加到 zip:zip -9 lambda.zip lambda_function.py(lambda_function.py 是第 2 步中生成的文件)
      • 将生成的 zip 文件复制到 S3,因为不通过 S3 进行部署就非常繁重。 aws s3 cp lambda.zip s3://support-bucket/lambda_packages/
      • 部署 lambda 函数:aws lambda update-function-code --function-name --s3-bucket support-bucket --s3-key lambda_packages/lambda.zip
    4. 在你喜欢的时候触发要执行的,例如,每次在 S3 中创建一个新文件时,甚至你可以将 lambda 函数关联到 Firehose。 (我没有选择这个选项,因为 'lambda' 限制低于 Firehose 限制,您可以将 Firehose 配置为每 128Mb 或 15 分钟写入一个文件,但如果您将此 lambda 函数关联到 Firehose,则会执行 lambda 函数每 3 分钟或 5MB,在我的情况下,我遇到了生成很多小拼花文件的问题,因为每次启动 lambda 函数时,我都会生成至少 10 个文件)。

    【讨论】:

    • 我是否正确理解此管道每条记录创建一个镶木地板文件? Parquet 是一种列式存储,那么是否需要某种单独的压缩作业来将这些小 Parquet 文件协调为一个更大的文件?
    • 嗨@Tagar,每次调用lamba_handler时它都会写入一个parquet文件并且可以配置,例如,您可以将其配置为每15分钟启动一次,每个文件都会创建一个文件15 分钟内收到的所有事件。
    【解决方案3】:

    好消息,这个功能今天发布了!

    Amazon Kinesis Data Firehose 可以转换您输入数据的格式 从 JSON 到 Apache Parquet 或 Apache ORC,然后将数据存储在 亚马逊 S3。 Parquet 和 ORC 是节省空间的柱状数据格式 并启用更快的查询

    要启用,请转到您的 Firehose 流并单击 编辑。您应该会看到 记录格式转换 部分,如下图所示:

    详见文档:https://docs.aws.amazon.com/firehose/latest/dev/record-format-conversion.html

    【讨论】:

      猜你喜欢
      • 2020-03-23
      • 2018-09-09
      • 2019-12-07
      • 1970-01-01
      • 1970-01-01
      • 2021-05-26
      • 2020-04-22
      • 2021-07-07
      • 2020-08-07
      相关资源
      最近更新 更多