【问题标题】:Invoking updateStateByKey twice on the same RDD在同一个 RDD 上调用 updateStateByKey 两次
【发布时间】:2015-04-13 04:03:42
【问题描述】:

我可以在同一个 RDD 上调用两次 UpdateStateByKey。我的要求如下。

  1. 从 Kafka 获取事件流
  2. UpdateStateByKey 根据时间戳聚合和过滤事件集
  3. 进行一些处理并保存到 Cassandra DB
  4. UpdateStateByKey 根据 eventType 移除键

我尝试将步骤 2 中的结果分配给 VAR,然后将其重新分配给步骤 4 中的更新值。但似乎它不起作用。我是新来的火花,不确定这种行为是如何可能的。

感谢任何帮助。

【问题讨论】:

  • 这个答案应该解释如何通过键更新状态,stackoverflow.com/questions/24771823/…,是的,你应该能够在同一个 RDD 上多次调用 updateStateByKey,如果你这样做,你可能想要缓存,要获得更多帮助,您应该发布您所做的尝试
  • 你的意思是,我应该使用广播变量,或者 RDD 持久性,比如 cache()、persist(),不确定在上述情况下哪一个会有所帮助

标签: scala cassandra apache-spark apache-kafka


【解决方案1】:

我通过在状态更新方法开始时第二次调用 updateStateByKey() 时实际执行我想要执行的清理来解决了这个问题。一个小例子:

private static Optional<State> updateState(
        final List<Events> allEvents,
        final Optional<State> state) {
    State state = state.or(State::new);
    state.clearAccumulatedValues();

    // Do some work...
    state.addValue("Purple Elephants!");

    return Optional.fromNullable(state.isEmpty() ? null : state);
}

【讨论】:

    猜你喜欢
    • 2022-11-22
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2010-09-19
    相关资源
    最近更新 更多