【问题标题】:Kafka Streams: thread-safety of processors, serdes, etcKafka Streams:处理器、serdes 等的线程安全
【发布时间】:2020-12-06 20:32:01
【问题描述】:

最近我正在为 Kafka Streams DSL 应用程序实现 Kryo 序列化程序。 Kryo 默认情况下不是线程安全的,序列化方法抛出异常很可能是由多个线程的不同步访问引起的。添加同步解决了这个问题,但提出了一些问题。

Kafka Streams 应用程序对于不同处理对象的线程模型是什么?哪些对象在线程之间共享,哪些对象仅由单个线程使用?在这些对象中存在不同步的本地状态(字段,而不是状态存储)是否安全?

我对 Processor / TransformerSerializer / Deserializer 对象特别感兴趣。

我见过this answer,但我仍然不清楚。请注意,我不是试图在线程之间共享任何状态,而是为了避免出现这种共享状态。

DSL 和 PAPI 都需要 Processors / Transformers 的供应商(即工厂),Serde 接口也是工厂,所以我假设每个线程或每个任务创建一个实例。这个假设似乎是错误的,但同时接受一个工厂,创建多个实例然后同时从多个线程访问它们是非常奇怪的。


我的Serde 实现基本上是这样的(每个Serializer / Deserializer 实例创建一个新的Kryo 实例):

public class MySerde<T> implements Serde<T> {
    @Override
    public Serializer<T> serializer() {
        final Kryo kryo = new Kryo();
        return (topic, data) -> { /* use kryo instance */ };
    }

    @Override
    public Deserializer<T> deserializer() {
        final Kryo kryo = new Kryo();
        return (topic, data) -> { /* use kryo instance */ };
    }
}

调用了 Serdes 以读取/写入重新分区主题:

stream
    .groupByKey(Grouped.with(/* set serdes here */))
    .windowedBy(...)
    .aggregate(...)

【问题讨论】:

    标签: multithreading thread-safety apache-kafka-streams


    【解决方案1】:

    我已经建立了一个小型演示应用程序来检测对 serdes 和转换器的并发访问:https://github.com/sukhinin/kafka-streams-threading

    1. 序列化器和反序列化器从多个线程同时访问,因此必须是线程安全的。

    2. 一次只能从一个线程访问转换器。

    如果有更多 Kafka Streams 经验的人来信,我仍将不胜感激。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2018-06-14
      • 2017-01-07
      • 2019-06-02
      • 1970-01-01
      相关资源
      最近更新 更多