【发布时间】:2022-09-28 20:01:12
【问题描述】:
我正在尝试将一些数据从 kafka 流式传输到 s3(使用 s3a 协议)。
该管道可以运行一个小时,但一个小时后(与我的 AWS 令牌到期设置相同),抛出一个(来自 StreamingFileSink):
Caused by: com.amazonaws.services.s3.model.AmazonS3Exception: The provided token has expired. (Service: Amazon S3; Status Code: 400; Error Code: ExpiredToken; Request ID: 7YFGVQ92YT51DP0K; S3 Extended Request ID: sx6UJJ548o0wpwJbkoWJ16jKRVih3ZV9XQdbThNhq5kUU7A7yCx58tcCGELVs5tqGWaMMPfZxZM=; Proxy: webproxy)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleErrorResponse(AmazonHttpClient.java:1819)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.handleServiceErrorResponse(AmazonHttpClient.java:1403)
at com.amazonaws.http.AmazonHttpClient$RequestExecutor.executeOneRequest(AmazonHttpClient.java:1372)
...
我正在使用 AWSCredentialsProvider 实现,它实现了getCredentials,并使用来自 aws 的新解析的密钥每 15 分钟刷新一次令牌。
我的假设是问题在于我如何在工作本身中初始化StreamingFileSink:
StreamExecutionEnvironment env = getStreamExecutionEnvironment();
StreamingFileSink<FELEvent> sink = StreamingFileSink
.forBulkFormat(<my Path Settings with basePath s3a://bucket/path/to/dir>)
.withRollingPolicy(OnCheckpointRollingPolicy.build())
.withNewBucketAssigner(<My custom bucket assigner>)
.build();
env.fromSource(<Kafka source>)
.map(<Some operation>)
.filter(<Some filtergin>)
.addSink(sink)
.name(\"name\").uid(\"uid\");
env.execute(\"TAG\");
如果插件为已经初始化的 StreamingFileSink 刷新令牌有什么想法吗? 如果不是,那么处理这种情况的最佳方法是什么?
(由于与 zookeeper 的兼容性问题,我使用的是 flink 14.3。)
编辑:
我检查了 hadoop-fs 插件代码,它似乎在 FileSink 的初始化中只用提供的(读取)令牌初始化了一个 S3 对象。寻找以某种方式重新初始化它的方法。
标签: java amazon-s3 apache-kafka apache-flink