【发布时间】:2020-04-12 13:20:31
【问题描述】:
我正在使用 Kafka 消费者,但在运行它并尝试获取 1000 条消息时,我收到以下错误:
kafka.consumer.fetcher.RecordTooLargeError: RecordTooLargeError: ("There are some messages at [Partition=Offset]: {TopicPartition(topic='stag-client-topic', partition=0): 177} 其大小较大大于获取大小 247483647,因此永远无法返回。增加获取大小,或减小代理将允许的最大消息大小。",{TopicPartition(topic='stag-client-topic', partition=0): 177} )
我已经在我的消费者配置中使用了最大获取大小。这是定义消费者的函数
def kafka_decoder(x, context=dict()):
try:
return json.loads(x.decode('utf-8'))
except json.JSONDecodeError as e:
return None
def build_consumer(topic, servers, auto_commit, context=dict()):
try:
return KafkaConsumer(
topic,
bootstrap_servers=servers,
value_deserializer=lambda value: kafka_decoder(value, context={
'event_string': value.decode('utf-8')}),
key_deserializer=lambda key: key.decode('utf-8'),
group_id='client-',
api_version=(0, 10, 1),
enable_auto_commit=auto_commit,
auto_offset_reset='earliest',
request_timeout_ms=30000,
security_protocol='SASL_SSL',
max_partition_fetch_bytes=247483647,
max_poll_records=10000,
fetch_max_wait_ms=4000,
fetch_max_bytes=247483647,
sasl_mechanism='PLAIN',
ssl_check_hostname = False,
sasl_plain_username='usrname',
sasl_plain_password='somepsswrd')
except Exception:
print('Error in Kafka consumer creation')
有人对如何在此处进行操作有任何建议吗?
【问题讨论】:
标签: python apache-kafka kafka-consumer-api kafka-python