【发布时间】:2018-03-04 07:16:06
【问题描述】:
我在线程内部创建了一个 Kafka 消费者实例,作为构造函数的一部分,并且在线程内部的 run 方法中,我确实调用了不同的 Web 服务并保持调用非阻塞,我正在使用可完成的未来。我的问题是我无法通过调用 thenApply 方法并传递 Kafka 消费者实例来发出提交,因为它给了我一个错误,即 Kafka 消费者不是线程安全的。虽然在我的提交方法中我已经编写了代码
synchronized(consumer) {
commitResponse();
}
我仍然得到ConcurrentModificationException。
class KafkaConsumerThread implements Runnable {
KafkaConsumer<String, String> consumer;
public KafkaConsumerThread(Properties properties) {
consumer = new KafkaConsumer<String, String>(properties);
...
}
@Override
public void run() {
try {
// synchronized (consumer) {
consumer.subscribe(topics);
while (true) {
if (closed.get()) {
consumer.close();
}
ConsumerRecords<String, String> records = consumer.poll(120000);
for (ConsumerRecord<String, String> record : records) {
getAsyncClient().prepareGet(webServiceUrl)
.execute()
.toCompletableFuture()
.thenApply(resp -> callAnotherService1(resp))
.thenApply(resp -> callAnotherService2(resp))
.thenApply(resp -> commitResponse(resp, consumer));
}
}
}
} catch (Exception ex) {
...
}
在上面的代码中,我在 commitResponse 方法中遇到异常,即“KafkaConsumer 对于多线程访问不安全”。虽然在我的提交响应中,如果我将提交包含在 synchronized(consumer) 中,我仍然会收到错误。
【问题讨论】:
标签: java multithreading java-8 apache-kafka kafka-consumer-api