【问题标题】:How to run multiple publishers at the same time paho-mqtt?如何同时运行多个发布者 paho-mqtt?
【发布时间】:2020-09-14 19:24:50
【问题描述】:

我正在运行一个脚本,多个客户端发布到同一主题,每个客户端 5 条消息,但他们一个接一个地执行。我想知道是否有任何方法可以执行多个发布者,但同时执行,而不是像我编程的那样执行循环“for”。

我想到运行各种 python 脚本,但如果我想拥有例如 100 个发布者,它就不起作用。有人知道我该怎么做吗?提前致谢

import ssl
import time 
import random
import sys  
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime

broker_address = '127.0.0.1'
topic = "casa/hab1"
port=1883
delay=0.2
count=0
i=0
j=0
nclients=1
nmessages=5


for i in range(nclients):
  cname="Client"+str(i) 
  j=int(time.time()) #eliminar la parte decimal
  j=str(j)
  client_id=cname+str(j)+"_" #generar client_id
  client=mqtt.Client(client_id) 
  client.connect(broker_address)  
  print("")

  print(str(client_id))
  client.loop_start() 

  for count in range(nmessages):
     b=random.randrange(10, 99, 1)
     mensaje="Hello World:"+ str(b)+" -- "
     client.publish(topic,mensaje, 2, retain=False) 
     now=datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
     print("timestamp "+str(count)+" = "+now)
     
     count+=1
     time.sleep(delay)

  i+=1

【问题讨论】:

  • 请不要发布代码图片,对于那些使用屏幕阅读器的人来说,这真的很难阅读而且不可能。请edit问题包含实际文本并使用工具栏正确格式化。
  • 如果你想同时做一些事情,你需要学习如何使用线程
  • 对不起,我刚刚编辑了帖子并发布了代码。
  • 如果您必须在一个脚本中运行它,您将不得不使用模块threadingmultiprocessing 在单独的线程/进程中运行每个发布者。如果您可以创建自己的 loop 而不是 loop_start() 并在一个循环中运行所有客户端,您还可以查看文档 - 然后您必须首先创建所有客户端并保留在列表中,然后您可以使用此列表在同时 - 使用自己的 loop 而不是 loop_start()

标签: python mqtt mosquitto paho


【解决方案1】:

通常要同时运行多个函数,您需要使用threadingmultiprocessing 来运行单独的thread/process 中的每个函数。

但是client.loop_start() 已经运行thread 所以你可以先创建很多客户端,

all_clients = []

for i in range(nclients):
    t = int(time.time())

    client_id= "Client_{}_{}_".format(i, t) 
    print('create:', client_id)
    
    client = mqtt.Client(client_id) 
    client.connect(broker_address)  
    client.loop_start() 

    all_clients.append([client_id, client])

然后在发送消息的循环中使用它们

for count in range(nmessages):

    for client_id, client in all_clients: 
        b = random.randrange(10, 99, 1)
        mensaje = "Hello World: {} -- ".format(b)
        client.publish(topic, mensaje, 2, retain=False) 
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        print('client:', client_id, '| count:', count, "=", now)

    time.sleep(delay)

它将几乎在同一时间 (+- 0.01 秒) 发送,但运行每个 client 以分隔 thread/process 将更难以如此小的延迟运行消息。


import time 
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime

broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'

topic = "casa/hab1"
delay = 0.2

nclients = 3
nmessages = 5

# --- first create all clients ----

all_clients = []
for i in range(nclients):
    t = int(time.time())

    client_id= "Client_{}_{}_".format(i, t) 
    print('create:', client_id)
    
    client = mqtt.Client(client_id) 
    client.connect(broker_address)  
    client.loop_start() 

    all_clients.append([client_id, client])

# ---

print()

# --- loop ---

for count in range(nmessages):

    for client_id, client in all_clients: 
        b = random.randrange(10, 99, 1)
        mensaje = "Hello World: {} -- ".format(b)
        client.publish(topic, mensaje, 2, retain=False) 
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        print('client:', client_id, '| count:', count, "=", now)

    time.sleep(delay)

结果:

create: Client_0_1600159539_
create: Client_1_1600159539_
create: Client_2_1600159539_

client: Client_0_1600159539_ | count: 0 = 2020-09-15 10:45:39.125615
client: Client_1_1600159539_ | count: 0 = 2020-09-15 10:45:39.126737
client: Client_2_1600159539_ | count: 0 = 2020-09-15 10:45:39.128049
client: Client_0_1600159539_ | count: 1 = 2020-09-15 10:45:39.329731
client: Client_1_1600159539_ | count: 1 = 2020-09-15 10:45:39.330702
client: Client_2_1600159539_ | count: 1 = 2020-09-15 10:45:39.332028
client: Client_0_1600159539_ | count: 2 = 2020-09-15 10:45:39.533360
client: Client_1_1600159539_ | count: 2 = 2020-09-15 10:45:39.534323
client: Client_2_1600159539_ | count: 2 = 2020-09-15 10:45:39.535380
client: Client_0_1600159539_ | count: 3 = 2020-09-15 10:45:39.737049
client: Client_1_1600159539_ | count: 3 = 2020-09-15 10:45:39.738118
client: Client_2_1600159539_ | count: 3 = 2020-09-15 10:45:39.739249
client: Client_0_1600159539_ | count: 4 = 2020-09-15 10:45:39.941419
client: Client_1_1600159539_ | count: 4 = 2020-09-15 10:45:39.943207
client: Client_2_1600159539_ | count: 4 = 2020-09-15 10:45:39.944785

编辑:

同样使用threading

在此版本中,您可以在每个client 中使用随机delay,以使所有流量更加随机。

import time 
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
import threading

broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'

topic = "casa/hab1"
delay = 0.2

nclients = 3
nmessages = 5

# --- functions ---

def sending(client, client_id):

    for count in range(nmessages):

        b = random.randrange(10, 99, 1)
        mensaje = "Hello World: {} -- ".format(b)
        client.publish(topic, mensaje, 2, retain=False) 
        now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
        print('client: {} | count: {} = {}'.format(client_id, count, now))
        #delay = random.randint(1, 5) / 10

        time.sleep(delay)

# --- first create all clients ----

all_clients = []
for i in range(nclients):
    t = int(time.time())

    client_id= "Client_{}_{}_".format(i, t) 
    print('create:', client_id)
    
    client = mqtt.Client(client_id) 
    client.connect(broker_address)  
    client.loop_start() 

    all_clients.append([client_id, client])

# ---

print()

# --- threads ---

all_threads = []

# start threads
for client_id, client in all_clients:
    t = threading.Thread(target=sending, args=(client, client_id))
    t.start()
    all_threads.append(t)
    
# ... other code ...

# at the end wait for end of threads
for t in all_threads:
    t.join()

【讨论】:

  • 非常感谢!我开始阅读如何在 paho-clients 中使用线程和进程,但我需要示例。感谢您的贡献!
猜你喜欢
  • 2022-07-06
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
  • 1970-01-01
相关资源
最近更新 更多