【问题标题】:How to process dynamically in kafka streams and send to different topic如何在kafka流中动态处理并发送到不同的主题
【发布时间】:2020-01-15 07:12:05
【问题描述】:

我正在创建一个流处理应用程序。它应该创建一个 kafka 流连接。当消息到达时,我需要做以下事情:

  • 检查消息类型
  • 通过调用特定的TypeProcessing Service 处理消息
  • 结束于根据消息类型决定的特定主题

    public java.util.function.Consumer<KStream<String, String>> process() {
        String topic;
        return input ->
                input.map((key, value) -> {
                    //check type and ask object from factory
                    try {
                        JSONObject msg = Util.getObjectMapper().readValue(value, JSONObject.class);
                        String type = msg.get("type").toString();
                        if(type.equalsIgnoreCase("test")){
                            //processing started
                            msgTypeHandlerFactory
                                    .createInstance(type)
                                    .enrichAndRelay(msg);
                            System.out.println("IN");
                        }
                        else{
                            input.to("notStream");
                            System.out.println("OUT");
                        }
    
                    } catch (JsonProcessingException e) {
                        e.printStackTrace();
                    } catch (Exception e) {
                        e.printStackTrace();
                    }
                    return KeyValue.pair(key, value);
    
                })
                .to("output_topic");
    
    }
    

    上述代码的问题是我正在使用 map 函数,它使我能够使用 .to() 函数来发送流。 我想要的是检查每条消息的类型,然后进行处理,并相应地发送到另一个流。 为此,我应该使用不给我 .to() 函数的 foreach 函数,所以我必须创建另一个 Kafka Producer 来完成这项工作。

要求:

  1. 在处理下一个 msg 之前,每个 msg 都应该在流函数的帮助下处理和发送,而不是使用另一个 kafka 生产者
  2. 如果满足要求,那么我应该能够将消息发送到将根据类型动态决定的主题。

【问题讨论】:

    标签: apache-kafka apache-kafka-streams spring-kafka


    【解决方案1】:
    1. 在处理下一个 msg 之前,每个 msg 都应该在流函数的帮助下处理和发送,而不是使用另一个 kafka 生产者

    默认情况下无论如何都会发生这种情况。

    1. 如果满足要求,那么我应该能够将消息发送到将根据类型动态决定的主题。

    首先,为了简化根据事件类型处理事件的步骤,请查看branch()branch() 函数允许您提供固定数量的谓词来将消息路由到不同的子流中。然后,您可以独立处理这些子流,例如使用map() 函数。最后,您可以使用to() 将每个子流发送到单独的主题。

    KStream<String, Event>[] branches = events.branch(
        (id, event) -> event.getTransactionValue() >= FRAUD_LIMIT,
        (id, event) -> event.getTransactionValue() < FRAUD_LIMIT);
    branches[0].map(...).to(suspiciousTransactionsTopicName);
    branches[1].map(...).to(validatedTransactionsTopicName);
    

    您还可以根据事件有效负载中的任何内容在to() 中做出真正动态的路由决策。此处,输出主题的名称来源于事件数据。

    myStream.to(
      (eventId, event, record) -> "topic-prefix-" + event.methodOfYourEventLikeGetTypeName()
    );
    

    此外,如果动态路由决策需要事件中不直接可用的信息,您可以选择使用路由相关信息动态丰富原始事件(例如,通过将原始事件流与带有路由相关信息),然后通过to()进行动态路由。详情请见https://www.confluent.io/blog/putting-events-in-their-place-with-dynamic-routing/

    【讨论】:

      【解决方案2】:

      如果你想检查类型,你实际上是filtering 那些匹配这些类型的事件。

      因此,您不需要 map 或 foreach,filter(...).to(topic} 会更好

          final ObjectMapper mapper = Util.getObjectMapper();
          KStream notTestEvents = input.filter((key, value) -> {
              //check type and ask object from factory
              try {
                  JSONObject msg = mapper.readValue(value, JSONObject.class); // You should probably use JSONDeserializer instead, which does this for you
                  String type = msg.get("type").toString();
                  System.out.println("OUT");
                  return !type.equalsIgnoreCase("test");     
              } catch (JsonProcessingException e) {
                  e.printStackTrace();
              } catch (Exception e) {
                  e.printStackTrace();
              }
          );
          notTestEvents.to("notStream");
      

      另一个选项是分支

      KStream<String, String>[] branches = events.branch(
          (k, v) -> { 
             return !mapper
                .readValue(value, JSONObject.class)
                .get("type").toString();
                .equalsIgnoreCase("test")
          },
          (k, v) -> true
      );
      branches[0].map(...).to("notStream");
      branches[1].map(...).to("output_topic");
      

      【讨论】:

      • 过滤器解决了选择类型问题。但是 map 函数是一次处理一条记录并将其发送到输出流,还是将所有记录一一处理,然后一次将所有记录发送到输出流?我将使用它作为输入流->按类型过滤->.map 函数->.to() 函数
      • map和filter函数都是一一对应的。您需要 groupBy 或其他聚合来将数据收集到更大的消息中
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2019-10-04
      • 1970-01-01
      • 2019-12-07
      • 1970-01-01
      • 2022-06-16
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多