【问题标题】:Threading and Queues in Python3: sentinel value in input_queue fails to break out of while loopPython中的线程和队列:输入队列中的哨兵值未能跳出while循环
【发布时间】:2013-08-05 13:08:47
【问题描述】:

一般概述:

我正在尝试构建一个简单的应用程序示例,该应用程序使用线程从网络上抓取信息。我知道有专门的模块(例如,scrapy),但我想做更多的工作来学习和理解线程的工作原理并理解其中的陷阱。此外,我还看到了各种教程(IBM 教程和其他教程)、关于 SO 的问题,甚至在 Python Cookbook,第 3 版中描述了如何做到这一点的一些食谱,但我使用线程/队列时仍然会挂断。

首先,我在stackoverflow(以及Cookbook)上读到子类threading.Thread 是一种浪费,所以我一直在尝试使用Thread(target=..., args=(...))。大多数教程似乎都使用(旧的?)子类化方法,我可能会因为采用这种新的(?)方式而感到困惑。

其次,在 Python Cookbook 中,他们提供了将 _sentinel 值放入工作队列的示例,您的方法会注意并在发现时退出。我认为这是我遇到麻烦的地方。

最后,我确信我一定是在搞各种矫枉过正的事情,所以如果我把事情复杂化了,我会很感激提示或指示。感谢您的建议。


具体的编码问题和尝试:

我有一个(公司名称,网址)元组的工作队列。我的方法是解析股票代码名称的 url,并将一个元组(代码名称、公司名称)发送到输出队列。当所有值都已被消耗时,我正在尝试使用哨兵来突破该方法:

def grab_ticker_name(input_q, output_q, _sentinel):
    '''Function to be threaded later: grabs company ticker from url and sends it to an output queue.'''

  while True:
      item = input_q.get()
      if item is _sentinel:
          print("We encountered the sentinel!!")
          input_q.put(_sentinel) # See page 492 of Python Cookbook, 3rd Ed.
          break
      else:
          company_name, company_url = item
          with urlopen(company_url) as f:
              newsoup = BeautifulSoup(f)
              if len(newsoup.select('.inlink_chart > a')) > 0:
                  ticker_name = newsoup.select('.inlink_chart > a')[0].string
                  output_q.put((ticker_name, company_name))
                  print("grab_ticker_name grabbed the following: {ticker_name}, {company_name}".format_map(vars()))
          input_q.task_done()

我正在填充队列并像这样构建线程(使用print 语句只是为了帮助我了解发生了什么):

def main():
    work_queue = queue.Queue()
    out_queue = queue.Queue()
    _sentinel = object() # This recommendation comes from the *Python Cookbook*
    stock_list = StockBuilder()
    print("Populating the work_queue now")
    for item in stock_list.yield_url():
        work_queue.put(item)
    work_queue.put(_sentinel)

    t1 = threading.Thread(target=grab_ticker_name, args=(work_queue, out_queue, _sentinel))
    t2 = threading.Thread(target=grab_ticker_name, args=(work_queue, out_queue, _sentinel))
    print("Now we're going to start our threads!")
    t1.start() ; t2.start()
    print("Now we're going to attempt to join our work queue.")
    work_queue.join()

它似乎可以工作并给我一行一行的值,但随后似乎无限循环,直到我不得不取消它(CTRL+C):

...
grab_ticker_name grabbed the following: MRK, Merk
grab_ticker_name grabbed the following: HUM, Humana
We encountered the sentinel!!
grab_ticker_name grabbed the following: WLP, WellPoint
We encountered the sentinel!!
^CTraceback (most recent call last).
...

问题:为什么break 表达式没有使函数返回main()?

最后,我还希望能够从out_queue 中获取所有数据并将其放入另一个数据结构(列表、字典等)中。

问题 2: 我知道我可能会为此使用一个带锁的列表,但我在这里也感到困惑,因为我认为我已经读过队列是线程修改一个更简单的方法可变对象?

问题 2a:假设我使用 out_queue,一旦填充了 out_queue,是否有一种简单的方法可以简单地从中获取所有值?我想也许我会在work_queue.join() 之后将另一个_sentinel 放入out_queue,然后再做一个while 循环并处理out_queue 中的每个项目,如果它不是_sentinel?这是一种天真的方法吗?

感谢阅读。

【问题讨论】:

    标签: python multithreading python-3.x queue


    【解决方案1】:

    问题是你正在加入work_queue,但work_queue 不是空的——它仍然有哨兵。试试这个:

    t1.join()
    t2.join()
    

    【讨论】:

    • 哇。那完全奏效了!但现在我很困惑:我以为我必须joinQueues 而不是线程。另外,看起来我可以用tryexcept queue.Empty 代替哨兵?我想我在毫无意义地使用哨兵。好的,谢谢你的帮助。
    • @erewok:你可以加入任何一个,你只需要记住每个的退出条件是什么。而您的_sentinel 绝对是有目的的:线程知道何时退出。
    • 知道了。谢谢您的帮助。我想我需要做更多的阅读。
    猜你喜欢
    • 2016-04-11
    • 2011-02-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2011-01-11
    • 1970-01-01
    • 2013-06-18
    相关资源
    最近更新 更多