【问题标题】:Process and check event using kafka-streams during some period在一段时间内使用 kafka-streams 处理和检查事件
【发布时间】:2020-02-20 07:34:09
【问题描述】:

我有一个 KStream eventsStream,它从主题“事件”中获取数据。 有两种类型的事件,它们的键:
1.{user_id = X, event_id = 1} {..value, include time_event...}
2.{user_id = X, event_id = 2} {..value, include time_event...}

如果在 10 分钟内没有用户提供带有 event_id = 2 的事件,我需要将带有 event_id = 1 的事件迁移到主题“结果”。

例如,
1. 第一种情况:我们得到数据{user_id = 100, event_id = 1} {.. time_event = xxxx ...},10分钟内没有事件{user_id = 100, event_id = 2} {.. time_event = xxxx + 10 minutes...},所以我们将它写入结果主题
2.第二种情况:我们得到数据{user_id = 100, event_id = 1} {.. time_event = xxxx ...}和一个10分钟内的事件{user_id = 100, event_id = 2} {.. time_event = xxxx + 5 minutes...},所以我们不会把它写到results-topic中

如何在 java 代码中使用 kafka-streams 实现这种行为?

我的代码:

公共类 ResultStream {

public static KafkaStreams newStream() {

    Properties properties = Config.getProperties("ResultStream");

    Serde<String> stringSerde = Serdes.String();

    StreamsBuilder builder = new StreamsBuilder();

    StoreBuilder<KeyValueStore<String, String>> store =
            Stores.keyValueStoreBuilder(
                    Stores.inMemoryKeyValueStore("inmemory"),
                    stringSerde,
                    stringSerde
            );
    builder.addStateStore(store);

    KStream<String, String> resourceEventStream = builder.stream(EVENTS.topicName(), Consumed.with(stringSerde, stringSerde));
    resourceEventStream.print(Printed.toSysOut());

    resourceEventStream.process(() -> new CashProcessor("inmemory"), "inmemory");
    resourceEventStream.process(() -> new FilterProcessor("inmemory", resourceEventStream), "inmemory");

    Topology topology = builder.build();

    return new KafkaStreams(topology, properties);

}

}

公共类 FilterProcessor 实现处理器{

private ProcessorContext context;
private String eventStoreName;
private KeyValueStore<String, String> eventStore;
private KStream<String, String> stream;

public FilterProcessor(String eventStoreName, KStream<String, String> stream) {
    this.eventStoreName = eventStoreName;
    this.stream = stream;
}

@Override
public void init(ProcessorContext processorContext) {
    this.context = processorContext;
    eventStore = (KeyValueStore) processorContext.getStateStore(eventStoreName);
}

@Override
public void process(Object key, Object value) {

    this.context.schedule(Duration.ofMinutes(1), PunctuationType.WALL_CLOCK_TIME, timestamp -> {

        System.out.println("Scheduler is working");

        stream.filter((k, v) -> {

            JsonObject events = new Gson().fromJson(k, JsonObject.class);
            if (***condition***) {
                return true;
            }

            return false;
        }).to("results");
    });
}

@Override
public void close() {

}

}

CashProcessor 的作用只是将事件放入本地存储,如果同一用户的 event_id = 2,则用户删除 event_id = 1 的记录。

FilterProcess 应该每分钟使用本地存储过滤事件。但是我不能正确调用这个处理(事实上我是这样做的)......

我真的需要帮助。

【问题讨论】:

    标签: java apache-kafka-streams


    【解决方案1】:

    为什么要将KStream 传递到您的处理器中?这不是 DSL 的工作方式。

    当您已经通过 resourceEventStream.process()“连接”您的处理器时,您的 FilterProcessor#process(key, value) 方法将自动为流中的每条记录调用 - 但是,KStream#process() 是终端操作,因此不允许您向下游发送任何数据。相反,您可能想使用transform()(这与process() 基本相同,加上一个输出KStream)。

    要在标点符号中实际将数据转发到下游,您应该使用context.forward() 使用ProcessorContext,它是通过init() 方法提供的。

    【讨论】:

      猜你喜欢
      • 2018-08-21
      • 1970-01-01
      • 2017-01-07
      • 1970-01-01
      • 1970-01-01
      • 2020-01-07
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多