【问题标题】:Flink StreamingFileSink to Amazon S3 with flink-s3-fs-hadoop token expired exception带有 flink-s3-fs-hadoop 令牌过期异常的 Flink StreamingFileSink 到 Amazon S3
【发布时间】:2022-09-28 20:01:12
【问题描述】:

我正在尝试将一些数据从 kafka 流式传输到 s3(使用 s3a 协议)。

该管道可以运行一个小时,但一个小时后(与我的 AWS 令牌到期设置相同),抛出一个(来自 StreamingFileSink):

Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken; Request ID: 7YFGVQ92YT51DP0K; S3 Extended Request ID: sx6UJJ548o0wpwJbkoWJ16jKRVih3ZV9XQdbThNhq5kUU7A7yCx58tcCGELVs5tqGWaMMPfZxZM=; Proxy: webproxy)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
    at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
...

我正在使用 AWSCredentialsProvider 实现,它实现了getCredentials,并使用来自 aws 的新解析的密钥每 15 分钟刷新一次令牌。

我的假设是问题在于我如何在工作本身中初始化StreamingFileSink

 StreamExecutionEnvironment env = getStreamExecutionEnvironment();

 StreamingFileSink<FELEvent> sink = StreamingFileSink
                .forBulkFormat(<my Path Settings with basePath s3a://bucket/path/to/dir>)
                .withRollingPolicy(OnCheckpointRollingPolicy.build())
                .withNewBucketAssigner(<My custom bucket assigner>)
                .build();


env.fromSource(<Kafka source>)
                .map(<Some operation>)
                .filter(<Some filtergin>)
                .addSink(sink)
                .name(\"name\").uid(\"uid\");

env.execute(\"TAG\");


如果插件为已经初始化的 StreamingFileSink 刷新令牌有什么想法吗? 如果不是,那么处理这种情况的最佳方法是什么?

(由于与 zookeeper 的兼容性问题,我使用的是 flink 14.3。)

编辑:

我检查了 hadoop-fs 插件代码,它似乎在 FileSink 的初始化中只用提供的(读取)令牌初始化了一个 S3 对象。寻找以某种方式重新初始化它的方法。

    标签: java amazon-s3 apache-kafka apache-flink


    【解决方案1】:

    环境

    fs.s3a.aws.credentials.provider:com.amazonaws.auth.profile.ProfileCredentialsProvider
    

    在作业管理器属性中,将环境变量 AWS_PROFILES 与有效的 AWS 配置文件(例如 /.aws/config)一起修复了该问题。

    确保您正在刷新您的令牌。

    更多信息:https://hadoop.apache.org/docs/stable/hadoop-aws/tools/hadoop-aws/index.html

    【讨论】:

      猜你喜欢
      • 2019-12-16
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-05-06
      • 2020-09-20
      • 1970-01-01
      • 2021-11-05
      • 1970-01-01
      相关资源
      最近更新 更多