【问题标题】:Kafka stream consumer for JSON object : How to mapJSON对象的Kafka流消费者:如何映射
【发布时间】:2019-10-03 18:53:45
【问题描述】:

我是 Kafka/Kafka Stream 的新手。我正在使用 latest Kafka/kafka-stream 和 kafka-client 以及 openjdk11。我的制作人正在生成看起来像

的 json 对象(其中 keyname
{"Name":"John", "amount":123, "time":2019-10-03T05:24:52" }

生产者代码以便更好地理解:

public static ProducerRecord<String, String> newRandomTransaction(String name) {
    // creates an empty json {}
    ObjectNode transaction = JsonNodeFactory.instance.objectNode();

    Integer amount = ThreadLocalRandom.current().nextInt(0, 100);

    // Instant.now() is to get the current time
    Instant now = Instant.now();

    // we write the data to the json document
    transaction.put("name", name);
    transaction.put("amount", amount);
    transaction.put("time", now.toString());
    return new ProducerRecord<>("bank-transactions", name, transaction.toString());
}

现在我正在尝试编写使用交易并计算该人余额中的总金额的应用程序。

仅供参考:我正在使用旧代码并试图使其工作)。

使用 GroupBYKey 作为主题已经有正确的键。然后聚合以计算总余额我正在努力的地方。

此时的应用程序(注释掉的部分是我试图使其在下一行中工作的旧代码):

public class BankBalanceExactlyOnceApp {
    private static ObjectMapper mapper = new ObjectMapper();

    public static void main(String[] args) {
        Properties config = new Properties();

        config.put(StreamsConfig.APPLICATION_ID_CONFIG, "bank-balance-application");
        config.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092");
        config.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");

        // we disable the cache to demonstrate all the "steps" involved in the transformation - not recommended in prod
        config.put(StreamsConfig.CACHE_MAX_BYTES_BUFFERING_CONFIG, "0");

        // Exactly once processing!!
        config.put(StreamsConfig.PROCESSING_GUARANTEE_CONFIG, StreamsConfig.EXACTLY_ONCE);

        // json Serde
        final Serializer<JsonNode> jsonSerializer = new JsonSerializer();
        final Deserializer<JsonNode> jsonDeserializer = new JsonDeserializer();
        final Serde<JsonNode> jsonSerde = Serdes.serdeFrom(jsonSerializer, jsonDeserializer);


        StreamsBuilder builder = new StreamsBuilder();

        KStream<String, JsonNode> bankTransactions =
                builder.stream( "bank-transactions", Materialized.with(Serdes.String(), jsonSerde);


        // create the initial json object for balances
        ObjectNode initialBalance = JsonNodeFactory.instance.objectNode();
        initialBalance.put("count", 0);
        initialBalance.put("balance", 0);
        initialBalance.put("time", Instant.ofEpochMilli(0L).toString());

        /*KTable<String, JsonNode> bankBalance = bankTransactions
                .groupByKey(Serdes.String(), jsonSerde)
                .aggregate(
                        () -> initialBalance,
                        (key, transaction, balance) -> newBalance(transaction, balance),
                        jsonSerde,
                        "bank-balance-agg"
                );*/

        KTable<String, JsonNode> bankBalance = bankTransactions
                .groupByKey(Serialized.with(Serdes.String(), jsonSerde))
                .aggregate(
                        () -> initialBalance,
                        (key, transaction, balance) -> {
                            //String t = transaction.toString();
                            newBalance(transaction, balance);
                        },
                        Materialized.with(Serdes.String(), jsonSerde),
                        "bank-balance-agg"
                );

        bankBalance.toStream().to("bank-balance-exactly-once", Produced.with(Serdes.String(), jsonSerde));

        KafkaStreams streams = new KafkaStreams(builder.build(), config);
        streams.cleanUp();
        streams.start();

        // print the topology
        System.out.println(streams.toString());

        // shutdown hook to correctly close the streams application
        Runtime.getRuntime().addShutdownHook(new Thread(streams::close));
    }

    private static JsonNode newBalance(JsonNode transaction, JsonNode balance) {
        // create a new balance json object
        ObjectNode newBalance = JsonNodeFactory.instance.objectNode();
        newBalance.put("count", balance.get("count").asInt() + 1);
        newBalance.put("balance", balance.get("balance").asInt() + transaction.get("amount").asInt());

        Long balanceEpoch = Instant.parse(balance.get("time").asText()).toEpochMilli();
        Long transactionEpoch = Instant.parse(transaction.get("time").asText()).toEpochMilli();
        Instant newBalanceInstant = Instant.ofEpochMilli(Math.max(balanceEpoch, transactionEpoch));
        newBalance.put("time", newBalanceInstant.toString());
        return newBalance;
    }
}

问题是:当我尝试在该行中调用 newBalance(transaction, balance) 时:

aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> newBalance(transaction, balance),
                    jsonSerde,
                    "bank-balance-agg"
            )

并看到编译器 error 带有 msg:

newBalance(JsonNode, JsonNode) can not be applied to (<lambda parameter>,<lambda parameter>)

我尝试将其读取为字符串,将参数类型从 JsonNode 更改为 Object。但是,无法修复它。

我可以就如何修复它获得任何建议吗?

【问题讨论】:

    标签: java lambda apache-kafka apache-kafka-streams


    【解决方案1】:

    Kafka Streams 2.3 中的KGroupedStream 没有具有以下签名的方法:

    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                 final Aggregator<? super K, ? super V, VR> aggregator,
                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized,
                                 String aggregateName);
    

    有两个重载方法aggregate

    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                 final Aggregator<? super K, ? super V, VR> aggregator);
    
    <VR> KTable<K, VR> aggregate(final Initializer<VR> initializer,
                                 final Aggregator<? super K, ? super V, VR> aggregator,
                                 final Materialized<K, VR, KeyValueStore<Bytes, byte[]>> materialized);
    

    您应该使用第二个,您的代码应该类似于:

    KTable<String, JsonNode> bankBalance = input
            .groupByKey(Grouped.with(Serdes.String(), jsonSerde))
            .aggregate(
                    () -> initialBalance,
                    (key, transaction, balance) -> newBalance(transaction, balance),
                    Materialized.with(Serdes.String(), jsonSerde)
            );
    

    【讨论】:

      猜你喜欢
      • 2017-01-04
      • 1970-01-01
      • 2017-09-07
      • 2021-08-07
      • 1970-01-01
      • 2017-10-16
      • 2017-05-31
      • 2019-11-18
      • 1970-01-01
      相关资源
      最近更新 更多