【问题标题】:How do I send asynchronous http requests in python one at a time?如何在 python 中一次发送一个异步 http 请求?
【发布时间】:2013-04-03 07:23:34
【问题描述】:

我们有一个作业队列,工作人员一次处理一个作业。每个作业都需要我们格式化一些数据并发出 HTTP POST 请求,并将数据作为请求负载。

我们如何让每个工作人员以单线程、非阻塞的方式异步发出这些 HTTP POST 请求?我们不关心请求的响应——我们只想让请求尽快执行,然后让工作人员立即进入下一个工作。

我们已经探索了使用geventgrequests 库(请参阅Why does gevent.spawn not execute the parameterized function until a call to Greenlet.join?)。我们的工作代码如下所示:

def execute_task(worker, job):

    print "About to spawn request"
    greenlet = gevent.spawn(requests.post, url, params=params)

    print "Request spawned, about to call sleep"
    gevent.sleep()

    print "Greenlet status: ", greenlet.ready()

第一个 print 语句执行,但第二个和第三个 print 语句永远不会被打印,并且 url 永远不会被命中。

我们怎样才能让这些异步请求执行?

【问题讨论】:

  • 有一个名为 asyncore 的标准库,但对于您的用例而言,它可能太低级了。
  • 我必须同意@georgesl 在这一点上的看法,asyncore 将是一个迁移的好地方,因为它将为您的应用程序提供更好的灵活性以供以后开发。此外,http://stackoverflow.com/questions/15753901/python-asyncore-client-socket-can-not-determaine-connection-status/15754244#15754244 这是一个好的开始和如何使用它的示例(请参阅我的问题的答案)。如果没有,您实际上必须在多个进程中执行此操作,如果可以并行发送请求,即使 python 的“子”库也很可能会为您线程化,这就是多进程
  • 你的 gevent 代码看起来不错(快速测试告诉我它工作得很好;我使用 gevent 1.0b3)。我想这取决于调用execute_task 的上下文。
  • 请问您是否真的需要gevent?使用非标准库总是有计算风险,因为它们可能依赖于版本,需要在下一个版本中进行更多开发或稍后缺少功能,而标准库不会改变:) 当我阅读您关于版本等的评论时,现在只是一个想法

标签: python asynchronous gevent http-request


【解决方案1】:

1) 创建一个 Queue.Queue 对象

2) 创建任意数量的“工作”线程,循环并从 Queue.Queue 中读取数据

3) 将作业提供给 Queue.Queue

工作线程将按照放置顺序读取 Queue.Queue

从文件中读取行并将它们放入 Queue.Queue 的示例

import sys
import urllib2
import urllib
from Queue import Queue
import threading
import re

THEEND = "TERMINATION-NOW-THE-END"


#read from file into Queue.Queue asynchronously
class QueueFile(threading.Thread):
    def run(self):
        if not(isinstance(self.myq, Queue)):
            print "Queue not set to a Queue"
            sys.exit(1)
        h = open(self.f, 'r')
        for l in h:
            self.myq.put(l.strip())  # this will block if the queue is full
        self.myq.put(THEEND)

    def set_queue(self, q):
        self.myq = q

    def set_file(self, f):
        self.f = f

工作线程可能是什么样的概念(仅示例)

class myWorker(threading.Thread):
    def run(self):
        while(running):           
            try:
                data = self.q.get()  # read from fifo

                req = urllib2.Request("http://192.168.1.10/url/path")
                req.add_data(urllib.urlencode(data))
                h1 = urllib2.urlopen(req, timeout=10)
                res = h1.read()
                assert(len(res) > 80)

            except urllib2.HTTPError, e:
                print e

            except urllib2.URLError, e:
                print "done %d reqs " % n
                print e
                sys.exit()

要使基于threading.Thread的对象运行,创建对象然后在实例上调用“start”

【讨论】:

    【解决方案2】:

    您必须在不同的线程中运行它或使用内置的异步库。 大多数库会在你不知道的情况下使用线程,或者它会依赖 Python 的标准部分 asyncore。

    这是线程和异步的组合:

    #!/usr/bin/python
    # -*- coding: iso-8859-15 -*-
    import asyncore, socket
    from threading import *
    from time import sleep
    from os import _exit
    from logger import *  # <- Non-standard library containing a log function
    from config import *  # <- Non-standard library containing settings such as "server"
    
    class logDispatcher(Thread, asyncore.dispatcher):
        def __init__(self, config=None):
            self.inbuffer = ''
            self.buffer = ''
            self.lockedbuffer = False
            self.is_writable = False
    
            self.is_connected = False
    
            self.exit = False
            self.initated = False
    
            asyncore.dispatcher.__init__(self)
            Thread.__init__(self)
    
            self.create_socket(socket.AF_INET, socket.SOCK_STREAM)
            try:
                self.connect((server, server_port))
            except:
                log('Could not connect to ' + server, 'LOG_SOCK')
                return None
    
            self.start()
    
        def handle_connect_event(self):
            self.is_connected = True
    
        def handle_connect(self):
            self.is_connected = True
            log('Connected to ' + str(server), 'LOG_SOCK')
    
        def handle_close(self):
            self.is_connected = False
            self.close()
    
        def handle_read(self):
            data = self.recv(8192)
            while self.lockedbuffer:
                sleep(0.01)
    
            self.inbuffer += data
    
    
        def handle_write(self):
            while self.is_writable:
                sent = self.send(self.buffer)
                sleep(1)
    
                self.buffer = self.buffer[sent:]
                if len(self.buffer) <= 0:
                    self.is_writable = False
                sleep(0.01)
    
        def _send(self, what):
            self.buffer += what + '\r\n'
            self.is_writable = True
    
        def run(self):
            self._send('GET / HTTP/1.1\r\n')
    
    while 1:
        logDispatcher() # <- Initate one for each request.
        asyncore.loop(0.1)
        log('All threads are done, next loop in 10', 'CORE')
        sleep(10)
    

    或者你可以简单地做一个线程来完成这项工作然后死掉。

    from threading import *
    class worker(Thread):
        def __init__(self, host, postdata)
            Thread.__init__(self)
            self.host = host
            self.postdata = postdata
            self.start()
        def run(self):
            sock.send(self.postdata) #Pseudo, create the socket!
    
    for data in postDataObjects:
        worker('example.com', data)
    

    如果您需要限制线程数(如果您发送的帖子超过 5k 条左右,可能会对系统造成负担),只需执行 while len(enumerate()) &gt; 1000: sleep(0.1) 并让 looper 对象等待一些线程消失.

    【讨论】:

      【解决方案3】:

      您可能希望使用join 方法而不是sleep,然后检查状态。如果你想一次执行一个,那将解决问题。稍微修改您的代码以测试它似乎可以正常工作。

      import gevent
      import requests
      
      def execute_task(worker, job):
      
          print "About to spawn request"
          greenlet = gevent.spawn(requests.get, 'http://example.com', params={})
      
          print "Request spawned, about to call sleep"
          gevent.sleep()
      
          print "Greenlet status: ", greenlet.ready()
          print greenlet.get()
      
      execute_task(None, None)
      

      给出结果:

      About to spawn request
      Request spawned, about to call sleep
      Greenlet status:  True
      <Response [200]>
      

      在这个 Python 进程中是否还有更多可能阻止 Gevent 运行这个 greenlet?

      【讨论】:

        【解决方案4】:

        将你的 url 和参数包装在一个列表中,然后每次将一对弹出到任务池(这里的任务池要么有一个任务,要么是空的),创建线程,从任务池中读取任务,当一个线程获取任务并发送请求,然后从列表中弹出另一个(即这实际上是一个队列列表)

        【讨论】:

          猜你喜欢
          • 1970-01-01
          • 1970-01-01
          • 2016-09-04
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 1970-01-01
          • 2012-04-03
          相关资源
          最近更新 更多