【发布时间】:2019-07-18 15:55:15
【问题描述】:
我正在构建一个 python 应用程序来运行一些测试,这些测试涉及使用 Null Modem USB 到 USB(目前在 PC 上使用模拟器)将串行数据从一个 USB 端口发送到另一个。 我写了一个串行监听器如下:
import serial
import threading
from queue import Queue
class SerialPort(object):
def ___init__(self, timeout=None):
self.ser = serial.Serial(baud=_, stopbits=_, ... timeout=timeout)
self.out_q = Queue()
self.in_q = Queue()
self.THREAD = None
def setup(self, com):
self.ser.port = com
self.ser.open()
def run(self):
self.THREAD = threading.Thread(target=self.listen, args=(self.out_q, self.in_q,))
self.THREAD.start()
def listen(self, in_q, out_q):
while True:
if not in_q.empty():
# This code is never reached, even though it should be
message = in_q.get()
if message == 'DIE':
break
else:
self.ser.write(message)
def send_command(self, command):
self.in_q.put(command)
class GUI(object):
def __init__(self):
self.POWER = False
self.server_port = SerialPort(timeout=0.1)
self.client_port = SerialPort(timeout=0.1)
#etc etc Tkinter stuff and things
def on_power_button_click(self):
# Tkinter Button already made, know the button works as expected
self.POWER = not self.POWER
if self.POWER:
self.server_port.setup('COM5')
self.client_port.setup('COM6')
self.server_port.run()
self.client_port.run()
else:
self.server_port.send_command('DIE')
self.client_port.send_command('DIE')
time.sleep(0.3)
self.server_port.ser.close()
self.client_port.ser.close()
my_example_problem = GUI()
# Creates threads and 'turns on' the application
my_example_problem.on_power_button_click()
# Should Turn off the application but doesn't
my_example_problem.on_power_button_click()
一切都很好,但每当它们被关闭时,'DIE' 命令永远不会在 listen() 中注册,并且 in_q.empty() 被卡住为 False,我不知道为什么。我想知道这是否可能是一个范围问题,但是我有另一个应用程序是用完全相同的范围编写的,但只使用一个线程并且工作得很好,据我了解,Python 队列的工作方式类似于带有一个 C 队列指向起始内存块的指针,因此范围无关紧要。
【问题讨论】:
标签: python multithreading serial-port