【发布时间】:2022-08-24 19:05:07
【问题描述】:
我最近开始工作卡夫卡流并加入。 我只是想知道是否有任何简单的方法可以在 kafka-streams 中实现多个外键的外键连接
员工话题
{
\"ID\" : 1,
\"Name\" : \"Jay\",
\"DepartmentIds\": [2,3,4]
}
部门主题
{
\"DepartmentId\": 2,
\"Name\": \"Computers\"
}
{
\"DepartmentId\": 3,
\"Name\": \"Electronics\"
}
{
\"DepartmentId\": 4,
\"Name\": \"Mechanical\"
}
应该转化为
联合输出
{
\"ID\" : 1,
\"Name\" : \"Jay\",
\"DepartmentIds\": [2,3,4]
\"Departments\": [{
\"Department ID\" : 2,
\"Name\" : \"Electronics\"
},
{
\"Department ID\" : 3,
\"Name\" : \"Computers\"
},
{
\"Department ID\" : 4,
\"Name\" : \"Mechanical\"
}]
}
编辑1:
尝试解决方案1:
KTable<String, EmployeeDepartments> employeeWithDepartments = employees.flatMapValues(value -> value.getDepartmentIds())
.map((employeeId, departmentId) -> new KeyValue<>(departmentId, employeeId))
.join(departments, (employeeId, department) -> {
return new DepartmentWrapper(employeeId, department);
}, Joined.with(Serdes.String(), Serdes.String(), departmentSerde))
.groupBy((departmentId, departmentWrapper) -> departmentWrapper.getEmployeeId(), Grouped.with(Serdes.String(), departmentWrapperSerde))
.aggregate(EmployeeDepartments::new, (employeeId, departmentWrapper, employeeDepartments) -> {
employeeDepartments.setEmployeeId(employeeId);
employeeDepartments.addDepartment(employeeWrapper.getDepartment());
return employeeDepartments;
}, Materialized.<String, employeeDepartments, KeyValueStore<Bytes, byte[]>>
as(\"EMPLOYEE_DEPARTMENTS\")
.withKeySerde(Serdes.String())
.withValueSerde(employeeDepartmentSerde));
但是上面的代码聚合了事件中的所有部门。 如果任何员工的部门发生变化,我仍然会在汇总的国有商店中看到旧部门。 我只想随时查看员工的最新部门。
尝试解决方案2:
下面一个完美的作品!但我觉得下面一个不是一个有效的解决方案,因为它不使用任何 kafka 流连接。
KStream<String, Employee> enrichedEmployeeObject = employees.transformValues(() -> new ValueTransformer<Employee, Employee>() {
private ProcessorContext context;
@Override
public void init(ProcessorContext processorContext) {
this.context = processorContext;
}
@Override
public Employee transform(Employee inEmployee) {
Set<Department> employeeDepartments = inEmployee.getDepartmentIds().stream().map(departmentId -> {
ReadOnlyKeyValueStore<String, Department> departmentIdDepartmentMap = getTopologyStream()
.store(\"DEPARTMENTS_TABLE\",
QueryableStoreTypes.<String, Department>keyValueStore());
return departmentIdDepartmentMap.get(departmentId);
}).collect(Collectors.toSet());
inEmployee.setDepartments(employeeDepartments);
return inEmployee;
}
@Override
public void close() {
}
});
-
我猜你想加入流表?如果是这样,您可以在部门主题上定义一个 GlobalKTable,然后使用有状态转换器在处理员工流时从 KTable 的状态存储中查找所有部门。
-
@user152468 感谢您的回复。我尝试了以上两种解决方案。我仍然对这种方法并不完全满意。你能看看上面吗?
-
您的第二个解决方案对我来说看起来不错。我不认为它是低效的。对于每个员工的每个部门,您正在按部门 ID 在 KTable 中进行查找。由于 KTable 在该 id 上有一个索引,所以这很快。此外,在使用连接时,您必须进行某种查找。
标签: apache-kafka apache-kafka-streams confluent-platform confluent-cloud