【问题标题】:AWS Kinesis KCL skips records added before startupAWS Kinesis KCL 跳过启动前添加的记录
【发布时间】:2020-08-04 11:26:20
【问题描述】:

我开始使用KPLKCL 在服务之间交换数据。但是每当consumer service 离线时,KPL 发送的所有数据都将永远丢失。所以我只得到那些在consumer service 启动并且它的shardConsumer 准备好时发送的数据块。我需要从上次消费点或以其他方式处理留下的数据开始。

这是我的ShardProcessor 代码:

@Override
    public void initialize(InitializationInput initializationInput) {

    }

    @Override
    public void processRecords(ProcessRecordsInput processRecordsInput) {
        processRecordsInput.records()
                .forEach(record -> {
                    //my logic
                });
    }

    @Override
    public void leaseLost(LeaseLostInput leaseLostInput) {

    }

    @Override
    public void shardEnded(ShardEndedInput shardEndedInput) {
        try {
            shardEndedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shard Ended", e);
        }
    }

    @Override
    public void shutdownRequested(ShutdownRequestedInput shutdownRequestedInput) {
        try {
            shutdownRequestedInput.checkpointer().checkpoint();
        } catch (ShutdownException | InvalidStateException e) {
            LOG.error("Kinesis error on Shutdown Requested", e);

        }
    }

及配置代码:

public void configure(String streamName, ShardRecordProcessorFactory factory) {

        Region region = Region.of(awsRegion);

        KinesisAsyncClient kinesisAsyncClient =
                KinesisClientUtil.createKinesisAsyncClient(KinesisAsyncClient.builder().region(region));

        DynamoDbAsyncClient dynamoClient = DynamoDbAsyncClient.builder().region(region).build();
        CloudWatchAsyncClient cloudWatchClient = CloudWatchAsyncClient.builder().region(region).build();
        ConfigsBuilder configsBuilder =
                new ConfigsBuilder(streamName, appName, kinesisAsyncClient, dynamoClient, cloudWatchClient,
                        UUID.randomUUID().toString(), factory);

        Scheduler scheduler = new Scheduler(
                configsBuilder.checkpointConfig(),
                configsBuilder.coordinatorConfig(),
                configsBuilder.leaseManagementConfig(),
                configsBuilder.lifecycleConfig(),
                configsBuilder.metricsConfig(),
                configsBuilder.processorConfig(),
                configsBuilder.retrievalConfig()
                        .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
        );

        Thread schedulerThread = new Thread(scheduler);
        schedulerThread.setDaemon(true);
        schedulerThread.start();
    }

【问题讨论】:

    标签: amazon-web-services amazon-kinesis amazon-kcl amazon-kinesis-kpl


    【解决方案1】:

    有两种方法可以解决这个问题。首先,问题。

    默认情况下,KCL 配置为从LATEST 开始读取流。此设置告诉流读取器在“当前”时间戳处获取流。

    在您的情况下,您在“现在”之前放置在该流中的数据。为了读取该数据,您可能需要考虑读取流中最早的数据。如果您设置了默认流,则该流将存储数据 24 小时。

    要从该流的“开始”或启动 KCL 应用程序前 24 小时读取数据,您需要将流读取器设置为 TRIM_HORIZON。此设置称为initialPositionInStream。你可以阅读它hereAPI 中记录了三种不同的设置。

    如第一个链接中所述,要解决您的问题,首选方法是向属性文件添加一个条目。如果您不使用属性文件,您可以简单地将其添加到您的 Scheduler ctor:

    Scheduler scheduler = new Scheduler(
        configsBuilder.checkpointConfig(),
        configsBuilder.coordinatorConfig(),
        configsBuilder.leaseManagementConfig(),
        configsBuilder.lifecycleConfig(),
        configsBuilder.metricsConfig(),
        configsBuilder.processorConfig(),
        configsBuilder.retrievalConfig()
            .initialPositionInStreamExtended(InitialPositionInStreamExtended.newInitialPosition(TRIM_HORIZON))
            .retrievalSpecificConfig(new PollingConfig(streamName, kinesisAsyncClient))
    );
    

    使用此设置要记住的一件事是,当您在流中有数据并且您从 TRIM_HORIZON 开始时启动功能。在这种情况下,RecordProcessor 将尽可能快地遍历记录。这可能会在 Kinesis API 甚至下游系统(无论您在 RecordProcessor 拥有数据后将数据发送到何处)产生性能问题,

    【讨论】:

    • 听起来很合理,但对配置进行更改并不能解决问题。我已经停止了消费者服务,由生产者推送了几个事件,启动了消费者,然后......什么也没收到。
    • 不幸的是,这里有几件事可能会出错。您是否确认消费者使用TRIM_HORIZON 启动?
    • 抱歉,还有一个想法:您实际上是在为您的流设置检查点吗?一旦processRecords 完成处理,它需要检查点流。我不确定您上面的代码是否包含您的完整实现。如果是这样,您需要确保您正在检查点。请参阅AWS sample code 中的示例。您需要使用TRIM_HORIZON,您需要使用checkpoint()
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-04-15
    • 2017-10-09
    • 2017-10-16
    • 2021-02-13
    • 2017-10-16
    • 1970-01-01
    • 2017-08-03
    相关资源
    最近更新 更多