【问题标题】:Python, send a stop notification to a blocking loop within a threadPython,向线程内的阻塞循环发送停止通知
【发布时间】:2014-02-04 18:48:06
【问题描述】:

我已经阅读了很多答案,但是我还没有找到合适的解决方案。

问题,我正在阅读mixed/replace HTTP streams,默认情况下不会过期或结束。

你可以使用 curl 自己尝试一下:

curl http://agent.mtconnect.org/sample\?interval\=0

所以,现在我使用 Python 线程和 requests 从多个流中读取数据。

import requests
import uuid
from threading import Thread

tasks = ['http://agent.mtconnect.org/sample?interval=5000',
         'http://agent.mtconnect.org/sample?interval=10000']
thread_id = []


def http_handler(thread_id, url, flag):
    print 'Starting task %s' % thread_id
    try:
        requests_stream = requests.get(url, stream=True, timeout=2)
        for line in requests_stream.iter_lines():
            if line:
                print line
            if flag and line.endswith('</MTConnectStreams>'):
                # Wait until XML message end is reached to receive the full message
                break

    except requests.exceptions.RequestException as e:
        print('error: ', e)

    except BaseException as e:
        print e


if __name__ == '__main__':
    for task in tasks:
        uid = str(uuid.uuid4())
        thread_id.append(uid)
        t = Thread(target=http_handler, args=(uid, task, False), name=uid)
        t.start()

    print thread_id

    # Wait Time X or until user is doing something
    # Send flag = to desired thread to indicate the loop should stop after reaching the end.

有什么建议吗?什么是最好的解决方案?我不想杀死线程,因为我想阅读结尾以获得完整的 XML 消息。

【问题讨论】:

    标签: python multithreading python-2.7


    【解决方案1】:

    我通过使用threading module 和threading.events 找到了解决方案。也许不是最好的解决方案,但它目前运行良好。

    import logging
    import threading
    import time
    import uuid
    import requests
    
    logging.basicConfig(level=logging.DEBUG, format='(%(threadName)-10s) %(message)s', )
    
    tasks = ['http://agent.mtconnect.org/sample?interval=5000',
             'http://agent.mtconnect.org/sample?interval=10000']
    
    d = dict()
    
    
    def http_handler(e, url):
        logging.debug('wait_for_event starting')
        message_buffer = []
        filter_namespace = True
    
        try:
            requests_stream = requests.get(url, stream=True, timeout=2)
            for line in requests_stream.iter_lines():
                if line:
                    message_buffer.append(line)
    
                if e.isSet() and line.endswith('</MTConnectStreams>'):
                    logging.debug(len(message_buffer))
                    break
    
        except requests.exceptions.RequestException as e:
            print('error: ', e)
    
        except BaseException as e:
            print e
    
    
    if __name__ == '__main__':
        logging.debug('Waiting before calling Event.set()')
        for task in tasks:
            uid = str(uuid.uuid4())
            e = threading.Event()
            d[uid] = {"stop_event": e}
            t = threading.Event(uid)
            t = threading.Thread(name=uid,
                                 target=http_handler,
                                 args=(e, task))
            t.start()
    
        logging.debug('Waiting 3 seconds before calling Event.set()')
    
        for key in d:
            time.sleep(3)
            logging.debug(threading.enumerate())
            logging.debug(d[key])
            d[key]['stop_event'].set()
        logging.debug('bye')
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-07-05
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多