【问题标题】:Why does Kafka Consumer keep receiving the same messages (offset)为什么 Kafka Consumer 不断收到相同的消息(偏移量)
【发布时间】:2020-08-28 15:27:45
【问题描述】:

我有一个 SOAP Web 服务,它发送一个 kafka 请求消息并等待一个 kafka 响应消息(例如 consumer.poll(10000))。

每次调用 Web 服务时,它都会创建一个新的 Kafka 生产者和一个新的 Kafka 消费者。

每次我调用 Web 服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息)。

我使用的是 Kafka 0.9 并启用了自动提交,自动提交频率为 100 毫秒。

对于由 poll() 方法返回的每个 ConsumerRecord,我在其自己的 Callable 中处理,例如

ConsumerRecords<String, String> records = consumer.poll(200);

for (ConsumerRecord<String, String> record : records) {

final Handler handler = new Handler(consumerRecord);
            executor.submit(handler);

}

为什么我总是一遍又一遍地收到相同的消息?

更新 0001

metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id = 
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest

【问题讨论】:

  • 您是否可以将您的 kafka 消费者配置为始终以最小偏移量开始?

标签: apache-kafka kafka-consumer-api


【解决方案1】:

基于您显示的代码。我认为你的问题是新的消费者是单线程的。如果您进行一次投票,然后不进行另一次投票,那么auto.commit.offset 将无法正常工作。

尝试将您的代码放在一个while循环中,看看您何时再次轮询偏移量将被提交。

【讨论】:

  • 好地方!一旦我们循环调用 poll 多次,它就开始按预期工作了。非常感谢您的宝贵时间
  • 没问题 hector,如果stackoverflow.com/questions/35991849/… 对您也有帮助,请考虑接受。
猜你喜欢
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 2019-11-20
  • 2017-02-15
  • 2019-09-01
  • 1970-01-01
  • 2018-02-09
  • 2014-04-12
相关资源
最近更新 更多