【发布时间】:2020-04-24 02:41:18
【问题描述】:
我有一个基于 Spring Boot 的 KStreams 应用程序,我在其中连接多个主题的数据。当某个主题出现延迟时,处理某种情况的最佳做法是什么?我已经阅读了How to manage Kafka KStream to Kstream windowed join? 等链接。
这是我的示例代码(Spring Boot 应用程序),用于为 2 个主题(员工和财务)生成模拟数据。员工主题代码如下:
private void sendEmpData() {
IntStream.range(0, 1).forEach(index -> {
EmployeeKey key = new EmployeeKey();
key.setEmployeeId(1);
Employee employee = new Employee();
employee.setDepartmentId(1000);
employee.setEmployeeFirstName("John);
employee.setEmployeeId(1);
employee.setEmployeeLastName("Doe");
kafkaTemplateForEmp.send(EMP_TOPIC, key, employee);
});
}
金融主题也是如此:
private void sendFinanceData() {
IntStream.range(0, 1).forEach(index -> {
FinanceKey key = new FinanceKey();
key.setEmployeeId(1);
key.setDepartmentId(1000);
Finance finance = new Finance();
finance.setDepartmentId(1000);
finance.setEmployeeId(1);
finance.setSalary(2000);
kafkaTemplateForFinance.send(FINANCE_TOPIC, key, finance);
});
}
与这些记录关联的时间戳类型是 TimeStampType.CREATE_TIME,我假设它与 事件时间 在 Streams 中。
我有一个简单的 KStreams 应用程序,它重新设置财务主题的密钥以使财务流密钥与员工流密钥匹配,然后按如下方式进行连接:
employeeKStream.join(reKeyedStream,
(employee, finance) -> new EmployeeFinance(employee.getEmployeeId(),
employee.getEmployeeFirstName(),
employee.getEmployeeLastName(),
employee.getDepartmentId(),
finance.getSalary(),
finance.getSalaryGrade()),
JoinWindows.of(windowRetentionTimeMs), //30 seconds
Joined.with(
employeeKeySerde,
employeeSerde,
financeSerde)).to(outputTopic, Produced.with(employeeKeySerde, employeeFinanceSerde));
如果具有匹配键的记录在金融主题中晚于 30 秒到达,则不会发生连接。关于如何解决这个问题的任何见解都会有所帮助。提前谢谢你。
P.S.:此数据为虚构作品。如果它与您的部门 ID/薪水相匹配,那只是巧合。 :)
【问题讨论】:
标签: apache-kafka apache-kafka-streams