【问题标题】:How to consume Amazon S3 objects from Flink?如何从 Flink 使用 Amazon S3 对象?
【发布时间】:2021-06-04 12:39:45
【问题描述】:

问题

我们有一个 Apache Flink 应用程序,旨在从 Kafka 读取事件并将计算结果发送到 ElasticSearch。由于一些资源问题,我们不得不从 Kafka 回退到 Amazon S3。

消息以ndjson 格式小批量发布到 Amazon S3 存储桶。

文件按以下方式组织:/{year}/{month}/{day}/{hour}.
因此,我们每小时都会创建一个新文件夹,在该文件夹下存储最新事件。

设计

正如我们所见,每当创建新对象时,Amazon S3 都会发出 notifications
我们可以将这些通知推送到 SQS 或 Lambda。

  • 正如this topic 中所述,Flink 不支持 SQS。
  • 对于 Lambda,我们可以获取 S3 对象并将其推送到 Kinesis Data Stream

我们还找到了避免编写自定义 Lambda 函数的替代解决方案:

问题

但在所有情况下,我们最终都使用了 KDS。在创建对象时,是否可以将数据从 Amazon S3 推送到 Flink?

【问题讨论】:

  • Flink 源代码只是一个必须实现的接口。您可以开发自己的来源,知道如何从 Amazon SQS 获取数据。
  • @YuvalItzchakov 你能指出一个示例实现吗?
  • @YuvalItzchakov 有什么选项不需要实现我们自己的源函数吗?
  • 我不相信今天有活跃的来源,所以很遗憾。

标签: amazon-web-services amazon-s3 architecture apache-flink amazon-kinesis


【解决方案1】:

一种解决方案是使用 readFile 方法扫描 s3 存储桶以查找新对象。当配置FileProcessingMode.PROCESS_CONTINUOUSLY 和适当的轮询间隔时,这可以很好地工作。关键是用自定义的FilePathFilter 定义TextInputFormat。文件路径过滤器将递归通过您的 s3 存储桶中的“目录”,并且由于您已使用日期部分对它们进行结构化,因此递归可以找到新文件,而无需扫描存储桶中的大量对象。

这是自定义 FilePathFilter 的外观。我一直在使用类似的代码每隔几分钟就发现数百个新文件,并且它可以正常工作。

public class S3FilePathFilter extends FilePathFilter {

    Pattern datePartsFromPath = Pattern.compile("\\/(?<year>\\d{4})\\/?(?<month>\\d{2})?\\/?(?<day>\\d{2})?\\/?(?<hour>\\d{2})?");

    private final Duration ageLimit;

    public S3FilePathFilter(Duration ageLimit) {
        this.ageLimit=ageLimit;
    }
    
    @Override
    public boolean filterPath(Path filePath) {

        Matcher matcher = datePartsFromPath.matcher(filePath.toString());

        if (matcher.find()) {
            ZonedDateTime limit = ZonedDateTime.now(ZoneId.of("UTC")).minus(ageLimit);

            int year = NumberUtils.toInt(matcher.group("year"));
            int month = NumberUtils.toInt(matcher.group("month"), limit.getMonthValue());
            int day = NumberUtils.toInt(matcher.group("day"), limit.getDayOfMonth());
            int hour = NumberUtils.toInt(matcher.group("hour"), limit.getHour());

            if (year != limit.getYear()) {
                return year < limit.getYear();
            }

            if (month != limit.getMonthValue()) {
                return month < limit.getMonthValue();
            }

            if (day != limit.getDayOfMonth()) {
                return day < limit.getDayOfMonth();
            }

            return hour < limit.getHour();
        }

        return true;
    }
}

【讨论】:

  • 为什么我们需要month1month2month3 组?
  • 我清理了正则表达式,现在每个日期部分只有一个捕获组
  • 谢谢,现在对我来说更有意义了:D
猜你喜欢
  • 2018-03-17
  • 1970-01-01
  • 2014-02-17
  • 1970-01-01
  • 1970-01-01
  • 2018-10-14
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多