【问题标题】:KStreams - How to handle delay of messages on one topicKStreams - 如何处理一个主题的消息延迟
【发布时间】:2020-04-24 02:41:18
【问题描述】:

我有一个基于 Spring Boot 的 KStreams 应用程序,我在其中连接多个主题的数据。当某个主题出现延迟时,处理某种情况的最佳做法是什么?我已经阅读了How to manage Kafka KStream to Kstream windowed join? 等链接。

这是我的示例代码(Spring Boot 应用程序),用于为 2 个主题(员工和财务)生成模拟数据。员工主题代码如下:

private void sendEmpData() {
    IntStream.range(0, 1).forEach(index -> {
        EmployeeKey key = new EmployeeKey();
        key.setEmployeeId(1);

        Employee employee = new Employee();
        employee.setDepartmentId(1000);
        employee.setEmployeeFirstName("John);
        employee.setEmployeeId(1);
        employee.setEmployeeLastName("Doe");

        kafkaTemplateForEmp.send(EMP_TOPIC, key, employee);
    });
}

金融主题也是如此:

private void sendFinanceData() {
    IntStream.range(0, 1).forEach(index -> {
        FinanceKey key = new FinanceKey();
        key.setEmployeeId(1);
        key.setDepartmentId(1000);

        Finance finance = new Finance();
        finance.setDepartmentId(1000);
        finance.setEmployeeId(1);
        finance.setSalary(2000);

        kafkaTemplateForFinance.send(FINANCE_TOPIC, key, finance);
    });
}

与这些记录关联的时间戳类型是 TimeStampType.CREATE_TIME,我假设它与 事件时间 在 Streams 中。

我有一个简单的 KStreams 应用程序,它重新设置财务主题的密钥以使财务流密钥与员工流密钥匹配,然后按如下方式进行连接:

employeeKStream.join(reKeyedStream,
            (employee, finance) -> new EmployeeFinance(employee.getEmployeeId(),
                    employee.getEmployeeFirstName(),
                    employee.getEmployeeLastName(),
                    employee.getDepartmentId(),
                    finance.getSalary(),
                    finance.getSalaryGrade()),
            JoinWindows.of(windowRetentionTimeMs), //30 seconds
            Joined.with(
                    employeeKeySerde,
                    employeeSerde,
                    financeSerde)).to(outputTopic, Produced.with(employeeKeySerde, employeeFinanceSerde));

如果具有匹配键的记录在金融主题中晚于 30 秒到达,则不会发生连接。关于如何解决这个问题的任何见解都会有所帮助。提前谢谢你。

P.S.:此数据为虚构作品。如果它与您的部门 ID/薪水相匹配,那只是巧合。 :)

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    如果具有匹配键的记录在金融主题中晚于 30 秒到达,则不会发生联接。

    默认情况下,Kafka Streams 使用 24 小时的宽限期,因此,即使存在滞后或无序数据,您的连接也应该有效。注意 Kafka 中的 lag 总是指 read 路径!

    30 多秒后到达金融主题

    但是,我认为您并不是真的意味着您有 lag (在您回退阅读的意义上),但是您的上游 write 被延迟了——对于这种情况,事件时间可能只是分配不正确:

    注意,当写入 Kafka 主题并且您没有明确指定时间戳时,生产者将在调用 send() 时分配时间戳,而不是在创建 ProducerRecord 实例时。如果要在创建ProducerRecord 时分配时间戳,则需要手动将要分配的时间戳传入构造函数(不确定 Spring boot 是否允许这样做)。

    作为替代方案(如果您无法明确设置记录时间戳),您可以将时间戳嵌入值中(这当然需要您更改 EmployeeFinance 类。使用 Kafka 处理此数据时流,您可以在自定义 TimestampExtractor 中使用来获取您的自定义时间戳,而不是记录时间戳。

    【讨论】:

    • 感谢您的回复。这真的很有帮助。
    猜你喜欢
    • 1970-01-01
    • 2020-03-28
    • 2015-01-12
    • 1970-01-01
    • 1970-01-01
    • 2017-02-06
    • 2012-01-24
    • 1970-01-01
    • 2018-08-17
    相关资源
    最近更新 更多