【发布时间】:2021-10-25 14:06:25
【问题描述】:
我想将 amazon-s3 中的数据读入 kafka。我找到了 camel-aws-s3-kafka-connector 源,我尝试使用它并且它可以工作,但是......我想从 s3 读取数据而不删除文件,但对每个消费者执行一次,没有重复。仅使用配置文件可以做到这一点吗?我已经创建了如下所示的文件:
name=CamelSourceConnector
connector.class=org.apache.camel.kafkaconnector.awss3.CamelAwss3SourceConnector
key.converter=org.apache.kafka.connect.storage.StringConverter
value.converter=org.apache.camel.kafkaconnector.awss3.converters.S3ObjectConverter
camel.source.maxPollDuration=10000
topics=ReadTopic
#prefix=WriteTopic
camel.source.endpoint.prefix=full/path/to/WriteTopic2
camel.source.path.bucketNameOrArn=BucketName
camel.source.endpoint.autocloseBody=false
camel.source.endpoint.deleteAfterRead=false
camel.sink.endpoint.region=xxxx
camel.component.aws-s3.accessKey=xxxx
camel.component.aws-s3.secretKey=xxxx
除了上面的配置,我不能只从“WriteTopic”中读取,而是从 s3 中的所有文件夹中读取,是否也可以配置?
S3Bucket folders with files
【问题讨论】:
-
S3 没有“文件夹”。
WriteTopic/的 S3 前缀将排除WriteTopic2/数据 -
我的例子很糟糕,即使我输入了前缀 WriteTopic2 它也会从 WriteTopic 和 WriteTopic2 中读取。
-
当我使用 camel.source.endpoint.prefix=full/path/to/WriteTopic2 而不是 prefix=WriteTopic2 它工作。 :)
-
如果这是您正在寻找的解决方案,请随时在下面回答您自己的帖子
-
这是我正在寻找的解决方案的一小部分,对我来说更重要的是在顶部找到答案:“如何从 s3 中准确地为每个消费者读取一次数据而不重复且不删除s3 中的数据”。或者换句话说:“如何强制 Camel Source 连接器在不删除的情况下读取一次数据。”
标签: amazon-s3 apache-kafka apache-camel apache-kafka-connect