通常要同时运行多个函数,您需要使用threading 或multiprocessing 来运行单独的thread/process 中的每个函数。
但是client.loop_start() 已经运行thread 所以你可以先创建很多客户端,
all_clients = []
for i in range(nclients):
t = int(time.time())
client_id= "Client_{}_{}_".format(i, t)
print('create:', client_id)
client = mqtt.Client(client_id)
client.connect(broker_address)
client.loop_start()
all_clients.append([client_id, client])
然后在发送消息的循环中使用它们
for count in range(nmessages):
for client_id, client in all_clients:
b = random.randrange(10, 99, 1)
mensaje = "Hello World: {} -- ".format(b)
client.publish(topic, mensaje, 2, retain=False)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print('client:', client_id, '| count:', count, "=", now)
time.sleep(delay)
它将几乎在同一时间 (+- 0.01 秒) 发送,但运行每个 client 以分隔 thread/process 将更难以如此小的延迟运行消息。
import time
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'
topic = "casa/hab1"
delay = 0.2
nclients = 3
nmessages = 5
# --- first create all clients ----
all_clients = []
for i in range(nclients):
t = int(time.time())
client_id= "Client_{}_{}_".format(i, t)
print('create:', client_id)
client = mqtt.Client(client_id)
client.connect(broker_address)
client.loop_start()
all_clients.append([client_id, client])
# ---
print()
# --- loop ---
for count in range(nmessages):
for client_id, client in all_clients:
b = random.randrange(10, 99, 1)
mensaje = "Hello World: {} -- ".format(b)
client.publish(topic, mensaje, 2, retain=False)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print('client:', client_id, '| count:', count, "=", now)
time.sleep(delay)
结果:
create: Client_0_1600159539_
create: Client_1_1600159539_
create: Client_2_1600159539_
client: Client_0_1600159539_ | count: 0 = 2020-09-15 10:45:39.125615
client: Client_1_1600159539_ | count: 0 = 2020-09-15 10:45:39.126737
client: Client_2_1600159539_ | count: 0 = 2020-09-15 10:45:39.128049
client: Client_0_1600159539_ | count: 1 = 2020-09-15 10:45:39.329731
client: Client_1_1600159539_ | count: 1 = 2020-09-15 10:45:39.330702
client: Client_2_1600159539_ | count: 1 = 2020-09-15 10:45:39.332028
client: Client_0_1600159539_ | count: 2 = 2020-09-15 10:45:39.533360
client: Client_1_1600159539_ | count: 2 = 2020-09-15 10:45:39.534323
client: Client_2_1600159539_ | count: 2 = 2020-09-15 10:45:39.535380
client: Client_0_1600159539_ | count: 3 = 2020-09-15 10:45:39.737049
client: Client_1_1600159539_ | count: 3 = 2020-09-15 10:45:39.738118
client: Client_2_1600159539_ | count: 3 = 2020-09-15 10:45:39.739249
client: Client_0_1600159539_ | count: 4 = 2020-09-15 10:45:39.941419
client: Client_1_1600159539_ | count: 4 = 2020-09-15 10:45:39.943207
client: Client_2_1600159539_ | count: 4 = 2020-09-15 10:45:39.944785
编辑:
同样使用threading。
在此版本中,您可以在每个client 中使用随机delay,以使所有流量更加随机。
import time
import random
import paho.mqtt.publish as publish
import paho.mqtt.client as mqtt
from datetime import datetime
import threading
broker_address = '127.0.0.1'
#broker_address = '192.168.1.91'
topic = "casa/hab1"
delay = 0.2
nclients = 3
nmessages = 5
# --- functions ---
def sending(client, client_id):
for count in range(nmessages):
b = random.randrange(10, 99, 1)
mensaje = "Hello World: {} -- ".format(b)
client.publish(topic, mensaje, 2, retain=False)
now = datetime.now().strftime("%Y-%m-%d %H:%M:%S.%f")
print('client: {} | count: {} = {}'.format(client_id, count, now))
#delay = random.randint(1, 5) / 10
time.sleep(delay)
# --- first create all clients ----
all_clients = []
for i in range(nclients):
t = int(time.time())
client_id= "Client_{}_{}_".format(i, t)
print('create:', client_id)
client = mqtt.Client(client_id)
client.connect(broker_address)
client.loop_start()
all_clients.append([client_id, client])
# ---
print()
# --- threads ---
all_threads = []
# start threads
for client_id, client in all_clients:
t = threading.Thread(target=sending, args=(client, client_id))
t.start()
all_threads.append(t)
# ... other code ...
# at the end wait for end of threads
for t in all_threads:
t.join()