【问题标题】:How to run a blocking process on a background thread in python如何在python中的后台线程上运行阻塞进程
【发布时间】:2014-05-30 02:25:04
【问题描述】:

我有一个单元测试脚本,需要测试所有其余的 api。同时我也有一个生成消息的xmpp服务器。

我需要在我的单元测试中运行一个 xmpp 实例来接收这些消息。但问题是xmpp是一个阻塞进程。

---> self.process(block=True)

这会导致单元测试停止。

有什么方法可以在后台线程上运行这个 xmpp 并继续接收 msgs 并在主线程上运行单元测试。如果是的话,我可以有一个我可以实现的代码 sn-p 吗?

提前致谢。

【问题讨论】:

    标签: python multithreading


    【解决方案1】:
    1. 一种解决方案是在您的setUp() 例程中在后台启动服务器——即os.system('myserver &'),然后在测试结束时在tearDown() 中终止它

    2. 如果您想直接控制服务器,请使用fork() 并遵循与#1 大致相同的模式。

    例子:

    import os, sys, subprocess, time, unittest
    
    def server():
        try:
            for _ in xrange(5, 0, -1):
                print 'ding'
                time.sleep(1)
        except KeyboardInterrupt:
            pass
    
    
    class TestClient(unittest.TestCase):
        def setUp(self):
            self.server_pid = None
            pid = os.fork()
            if not pid:             # child
                return server()
            # parent
            self.server_pid = pid
    
        def test1(self):
            print 'test server, PID',self.server_pid
            time.sleep(2)
    
        def tearDown(self):
            if not self.server_pid:
                return
            import signal
            os.kill(self.server_pid, signal.SIGINT)
    

    运行:

    python -m unittest ptest
    

    输出:

    测试服务器,PID 16490 ding ding .test 服务器,PID 无

    ----------------------------------- ----------------------- 在 2.003 秒内运行 1 次测试

    好的

    【讨论】:

    • 但是如果在设置服务器之前开始执行测试,则会出现竞争条件。
    猜你喜欢
    • 1970-01-01
    • 2013-10-05
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多