【发布时间】:2020-08-28 15:27:45
【问题描述】:
我有一个 SOAP Web 服务,它发送一个 kafka 请求消息并等待一个 kafka 响应消息(例如 consumer.poll(10000))。
每次调用 Web 服务时,它都会创建一个新的 Kafka 生产者和一个新的 Kafka 消费者。
每次我调用 Web 服务时,消费者都会收到相同的消息(例如,具有相同偏移量的消息)。
我使用的是 Kafka 0.9 并启用了自动提交,自动提交频率为 100 毫秒。
对于由 poll() 方法返回的每个 ConsumerRecord,我在其自己的 Callable 中处理,例如
ConsumerRecords<String, String> records = consumer.poll(200);
for (ConsumerRecord<String, String> record : records) {
final Handler handler = new Handler(consumerRecord);
executor.submit(handler);
}
为什么我总是一遍又一遍地收到相同的消息?
更新 0001
metric.reporters = []
metadata.max.age.ms = 300000
value.deserializer = class com.kafka.MDCDeserializer
group.id = group-A.group
partition.assignment.strategy = [org.apache.kafka.clients.consumer.RangeAssignor]
reconnect.backoff.ms = 50
sasl.kerberos.ticket.renew.window.factor = 0.8
max.partition.fetch.bytes = 1048576
bootstrap.servers = [machine1:6667, machine2:6667, machine3:6667, machine0:6667]
retry.backoff.ms = 100
sasl.kerberos.kinit.cmd = /usr/bin/kinit
sasl.kerberos.service.name = kafka
sasl.kerberos.ticket.renew.jitter = 0.05
ssl.keystore.type = JKS
ssl.trustmanager.algorithm = PKIX
enable.auto.commit = true
ssl.key.password = null
fetch.max.wait.ms = 500
sasl.kerberos.min.time.before.relogin = 60000
connections.max.idle.ms = 540000
ssl.truststore.password = null
session.timeout.ms = 30000
metrics.num.samples = 2
client.id =
ssl.endpoint.identification.algorithm = null
key.deserializer = class com.kafka.UUIDDerializer
ssl.protocol = TLS
check.crcs = true
request.timeout.ms = 40000
ssl.provider = null
ssl.enabled.protocols = [TLSv1.2, TLSv1.1, TLSv1]
ssl.keystore.location = null
heartbeat.interval.ms = 3000
auto.commit.interval.ms = 5000
receive.buffer.bytes = 32768
ssl.cipher.suites = null
ssl.truststore.type = JKS
security.protocol = PLAINTEXTSASL
ssl.truststore.location = null
ssl.keystore.password = null
ssl.keymanager.algorithm = IbmX509
metrics.sample.window.ms = 30000
fetch.min.bytes = 1024
send.buffer.bytes = 131072
auto.offset.reset = latest
【问题讨论】:
-
您是否可以将您的 kafka 消费者配置为始终以最小偏移量开始?
标签: apache-kafka kafka-consumer-api