【问题标题】:how to achieve kafka-streams foreign key join如何实现kafka-streams外键加入
【发布时间】:2022-08-24 19:05:07
【问题描述】:

我最近开始工作卡夫卡流并加入。 我只是想知道是否有任何简单的方法可以在 kafka-streams 中实现多个外键的外键连接

员工话题

{
\"ID\" : 1,
\"Name\" : \"Jay\",
\"DepartmentIds\": [2,3,4]
}

部门主题

{
\"DepartmentId\": 2,
\"Name\": \"Computers\"
}
{
\"DepartmentId\": 3,
\"Name\": \"Electronics\"
}
{
\"DepartmentId\": 4,
\"Name\": \"Mechanical\"
}

应该转化为

联合输出

{
\"ID\" : 1,
\"Name\" : \"Jay\",
\"DepartmentIds\": [2,3,4]
\"Departments\": [{
      \"Department ID\" : 2,
      \"Name\" : \"Electronics\"
   },
{
      \"Department ID\" : 3,
      \"Name\" : \"Computers\"
   },
{
      \"Department ID\" : 4,
      \"Name\" : \"Mechanical\"
   }]
}

编辑1:

尝试解决方案1:

 KTable<String, EmployeeDepartments> employeeWithDepartments = employees.flatMapValues(value -> value.getDepartmentIds())
      .map((employeeId, departmentId) -> new KeyValue<>(departmentId, employeeId))
      .join(departments, (employeeId, department) -> {
        return new DepartmentWrapper(employeeId, department);
      }, Joined.with(Serdes.String(), Serdes.String(), departmentSerde))
      .groupBy((departmentId, departmentWrapper) -> departmentWrapper.getEmployeeId(), Grouped.with(Serdes.String(), departmentWrapperSerde))
      .aggregate(EmployeeDepartments::new, (employeeId, departmentWrapper, employeeDepartments) -> {
        employeeDepartments.setEmployeeId(employeeId);
        employeeDepartments.addDepartment(employeeWrapper.getDepartment());
        return employeeDepartments;
      }, Materialized.<String, employeeDepartments, KeyValueStore<Bytes, byte[]>>
        as(\"EMPLOYEE_DEPARTMENTS\")
        .withKeySerde(Serdes.String())
        .withValueSerde(employeeDepartmentSerde));

但是上面的代码聚合了事件中的所有部门。 如果任何员工的部门发生变化,我仍然会在汇总的国有商店中看到旧部门。 我只想随时查看员工的最新部门。

尝试解决方案2:

下面一个完美的作品!但我觉得下面一个不是一个有效的解决方案,因为它不使用任何 kafka 流连接。

KStream<String, Employee> enrichedEmployeeObject = employees.transformValues(() -> new ValueTransformer<Employee, Employee>() {
      private ProcessorContext context;

      @Override
      public void init(ProcessorContext processorContext) {
        this.context = processorContext;
      }

      @Override
      public Employee transform(Employee inEmployee) {
        Set<Department> employeeDepartments = inEmployee.getDepartmentIds().stream().map(departmentId -> {
          ReadOnlyKeyValueStore<String, Department> departmentIdDepartmentMap = getTopologyStream()
            .store(\"DEPARTMENTS_TABLE\",
              QueryableStoreTypes.<String, Department>keyValueStore());
          return departmentIdDepartmentMap.get(departmentId);
        }).collect(Collectors.toSet());
        inEmployee.setDepartments(employeeDepartments);
        return inEmployee;
      }

      @Override
      public void close() {
      }
    });
  • 我猜你想加入流表?如果是这样,您可以在部门主题上定义一个 GlobalKTable,然后使用有状态转换器在处理员工流时从 KTable 的状态存储中查找所有部门。
  • @user152468 感谢您的回复。我尝试了以上两种解决方案。我仍然对这种方法并不完全满意。你能看看上面吗?
  • 您的第二个解决方案对我来说看起来不错。我不认为它是低效的。对于每个员工的每个部门,您正在按部门 ID 在 KTable 中进行查找。由于 KTable 在该 id 上有一个索引,所以这很快。此外,在使用连接时,您必须进行某种查找。

标签: apache-kafka apache-kafka-streams confluent-platform confluent-cloud


【解决方案1】:

你能提供更大的工作代码的 sn-p 吗?

我正在做一些非常相似的事情,你上面描述的方法看起来正是我需要的。

如果您可以共享创建上述连接所需的所有代码,请这样做。谢谢!

【讨论】:

    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2019-12-15
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2019-03-17
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多