【问题标题】:Building a cache with Kafka Streams使用 Kafka Streams 构建缓存
【发布时间】:2021-10-18 13:41:02
【问题描述】:

我试图了解与Kafka Streams 合作时的可能性以及如何思考。

用例:

有一个话题叫Transactions

  • key -> transactionReference(字符串)
  • 值 -> 时间戳,批准/取消(JSON 字符串)

我想创建一个缓存来保存所有最近的交易(最近 10 分钟)。

rest 客户端可以通过提供事务引用来查询缓存。

问题:

  1. Kafka 流(连同它的物化 视图)是否适合实现这样的缓存?
  2. 如果是,你会怎么做?请记住,它只需要保留最后 10 分钟的交易并丢弃较旧的交易。
  3. 如果没有,为什么不呢?

【问题讨论】:

    标签: apache-kafka apache-kafka-streams


    【解决方案1】:

    是的,在kafka-streams 中开发它是一个非常好的主意。怎么做?

    1. 首先,创建代表缓存值的类:
    class Transaction {
     Instant createTime;
     Status status;
     String transactionReference;
    }
    
    1. 第二,创建处理缓存逻辑的类——实现org.apache.kafka.streams.kstream.Transformer<K, V, R>
    public class TransactionsCache implements Transformer<String, Transaction, KeyValue<String, Transaction>> {
    
        private final long maintainDurationMs = TimeUnit.MINUTES.toMillis(10);
    
        private KeyValueStore<String, Transaction> transactions;
    
        @Override
        public void init(ProcessorContext context) {
            this.transactions = context.getStateStore("transactions-store");
            context.schedule(Duration.ofMillis(5), PunctuationType.WALL_CLOCK_TIME,
                timestamp -> transactions.all()
                    .forEachRemaining(kV -> {
                        if (hasExpired(kV.value.getCreateTime().toEpochMilli(), timestamp)) {
                            transactions.delete(kV.key);
                        }
                    }));
        }
    
        private boolean hasExpired(final long eventTimestamp, final long currentStreamTimeMs) {
            return (currentStreamTimeMs - eventTimestamp) > maintainDurationMs;
        }
    
        @Override
        public KeyValue<String, Transaction> transform(String key, Transaction transaction) {
            Transaction t = this.transactions.get(transaction.getTransactionReference());
            if (t == null) {
                transactions.put(transaction.getTransactionReference(), transaction);
            }
            return null;
        }
    
        @Override
        public void close() {
    
        }
    }
    
    1. 然后,在拓扑中注册变压器:
        static StreamsBuilder buildKafkaStreamsTopology() {
            StreamsBuilder builder = new StreamsBuilder();
            
            StoreBuilder<KeyValueStore<String, Transaction>> transferProcessKeyValueStore = Stores
                .keyValueStoreBuilder(Stores.inMemoryKeyValueStore(("transactions-store"), Serdes.String(), JsonSerdes.forA(Transaction.class));
            builder.addStateStore(transferProcessKeyValueStore);
    
            builder.stream(TRANSACTIONS, Consumed.with(Serdes.String(), JsonSerdes.forA(Transaction.class)))
                .transform(TransactionsCache::new, "transactions-store");
    
            return builder;
        }
    
    1. 下一步是读取 http 控制器中的数据:
    @RestController
    public class TransactionsController {
    
        private final KafkaStreams kafkaStreams;
    
        public TransactionsController(KafkaStreams kafkaStreams) {
            this.kafkaStreams = kafkaStreams;
        }
    
        @GetMapping(value = "/transactions/{transactionReference}", produces = MediaType.APPLICATION_JSON_VALUE)
        Transaction getTransaction(@PathVariable("transactionReference") String transactionReference) {
            ReadOnlyKeyValueStore<String, Transaction> store = kafkaStreams.store(
                StoreQueryParameters.fromNameAndType("transactions-store", QueryableStoreTypes.keyValueStore()));
    
            return store.get(transactionReference);
        }
    }
    
    
    1. 最后一件事。请记住,默认情况下,内存中的缓存是分区的,因此在运行应用程序的许多实例的情况下,您需要添加一些 RPC 方法来从另一个实例获取数据以防丢失(Kafka 交互式查询),这里有一些 very neat example .或者第二种解决方案是使用org.apache.kafka.streams.kstream.GlobalKTable&lt;K, V&gt;

    【讨论】:

    • 其余层逻辑可以用azkarrastreams.io很简单地完成
    • 看起来很有希望,我会调查的,txh!
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多