【发布时间】:2020-12-06 20:32:01
【问题描述】:
最近我正在为 Kafka Streams DSL 应用程序实现 Kryo 序列化程序。 Kryo 默认情况下不是线程安全的,序列化方法抛出异常很可能是由多个线程的不同步访问引起的。添加同步解决了这个问题,但提出了一些问题。
Kafka Streams 应用程序对于不同处理对象的线程模型是什么?哪些对象在线程之间共享,哪些对象仅由单个线程使用?在这些对象中存在不同步的本地状态(字段,而不是状态存储)是否安全?
我对 Processor / Transformer 和 Serializer / Deserializer 对象特别感兴趣。
我见过this answer,但我仍然不清楚。请注意,我不是试图在线程之间共享任何状态,而是为了避免出现这种共享状态。
DSL 和 PAPI 都需要 Processors / Transformers 的供应商(即工厂),Serde 接口也是工厂,所以我假设每个线程或每个任务创建一个实例。这个假设似乎是错误的,但同时接受一个工厂,创建多个实例然后同时从多个线程访问它们是非常奇怪的。
我的Serde 实现基本上是这样的(每个Serializer / Deserializer 实例创建一个新的Kryo 实例):
public class MySerde<T> implements Serde<T> {
@Override
public Serializer<T> serializer() {
final Kryo kryo = new Kryo();
return (topic, data) -> { /* use kryo instance */ };
}
@Override
public Deserializer<T> deserializer() {
final Kryo kryo = new Kryo();
return (topic, data) -> { /* use kryo instance */ };
}
}
调用了 Serdes 以读取/写入重新分区主题:
stream
.groupByKey(Grouped.with(/* set serdes here */))
.windowedBy(...)
.aggregate(...)
【问题讨论】:
标签: multithreading thread-safety apache-kafka-streams