【发布时间】:2021-07-12 01:27:30
【问题描述】:
我们有一个要求,我们需要通过将 3 个代理集群的流量分配(例如 80% 给代理 1、15% 给代理 2、5% 给代理 3)分配给 Kafka 集群,并为根据broker的流量分布给broker的topic。
为了使用 kafka-python 在 python 编程中实现这个逻辑,我们从 main 函数中调用产生不平衡消息函数。下面提供了实现逻辑的代码示例:-
主要功能
def mf():
.
.
.
# create a topic if the topic doesn't exists. Tps_crtn will create new topic if no existing topics found else, will send messages to the existing topics, as usual.
tpc_list = tps_crtn(base_topic_name=bt, no_of_topics=int(ntp),
topic_partn=int(ptp),
repicas_per_partn=int(rpp))
#traffic distribution list
dl = [80,15,5]
while True:
for ix, topic in enumerate (tpc_list):
produce_unbalanced_message(topic_name=topic,
no_of_msgs=int(round((int(nm) * (float(dl[ix])/100.0)))),
max_wait_time=float(mwt)
if __name__ == "__main__":
mf()
主函数调用下面提到的生产者发送函数,以便向主题列表中的每个主题发送消息。
不平衡产生消息功能
def produce_unbalanced_message(topic_name='test-topic',
no_of_msgs=-1,
max_wait_time=2):
kafka_admin_client: KafkaAdminClient = KafkaAdminClient(
bootstrap_servers='10.22.151.16:9100'
)
.
.
# List of all node ids in the cluster
LOG.info("Fetch the existing Kafka node list")
nodeids: List[int] = [node.nodeId for node in kafka_admin_client._client.cluster.brokers()]
for n in nodeids:
print(n)
.
.
.
# sending unbalanced messages to Kafka
producer.send(topic_name,
key=key,
value=message)
.
.
根据要求,消息应该根据broker号和相应的流量分布列表发送,而不是主题列表。我们从 producer_unbalanced_message 函数中的 nodeids 列表中获取的代理编号。
但是,在按照流量分布列表参数测试超过三个主题的代码时,我们遇到了-index out of bound 错误。这是因为我们一增加它们的值就在主题列表中,流量列表分布值不匹配,因为它们是根据代理设置的。
谁能建议应该尝试哪些更改,以便根据从nodeids列表和相应的流量分配列表获得的代理编号而不是根据主题列表发送消息?
【问题讨论】:
-
您将要定义自己的分区函数。否则,您无法通过其 ID 来定位特定代理以获取记录(密钥通过哈希函数,并且您不能保证此处不会发生哈希冲突)
-
@OneCricketeer 假设我们定义了一个分区函数,我们在其中获取代理的节点 ID,然后在遍历节点 ID 时在 for 循环中调用静态 if 块,如您在下面提到的,相应地发送消息?
-
代理 ID 不映射到分区,所以我不明白这对你有什么帮助。您需要描述主题,然后找到领导分区,并确保没有重叠,并确保代理不会在您生产(或仅使用一个副本)时重新平衡分区
标签: python apache-kafka kafka-python