执行时,报出问题:SyntaxError: invalid syntax

第一步:改变量名

  • async从 python3.7 开始已经加入保留关键字中,所以async不能作为函数的参数名.
  • 修改/Library/anaconda3/lib/python3.7/site-packages/pyspider路径下的python文件中的async为async_mode(其他名也可以)替换好的代码在下方。
    • 1.run.py (直接全部替换,async为async_mode)
    • #!/usr/bin/env python
      # -*- encoding: utf-8 -*-
      # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
      # Author: Binux<i@binux.me>
      #         http://binux.me
      # Created on 2014-03-05 00:11:49
      
      
      import os
      import sys
      import six
      import copy
      import time
      import shutil
      import logging
      import logging.config
      
      import click
      import pyspider
      from pyspider.message_queue import connect_message_queue
      from pyspider.database import connect_database
      from pyspider.libs import utils
      
      
      def read_config(ctx, param, value):
          if not value:
              return {}
          import json
      
          def underline_dict(d):
              if not isinstance(d, dict):
                  return d
              return dict((k.replace('-', '_'), underline_dict(v)) for k, v in six.iteritems(d))
      
          config = underline_dict(json.load(value))
          ctx.default_map = config
          return config
      
      
      def connect_db(ctx, param, value):
          if not value:
              return
          return utils.Get(lambda: connect_database(value))
      
      
      def load_cls(ctx, param, value):
          if isinstance(value, six.string_types):
              return utils.load_object(value)
          return value
      
      
      def connect_rpc(ctx, param, value):
          if not value:
              return
          try:
              from six.moves import xmlrpc_client
          except ImportError:
              import xmlrpclib as xmlrpc_client
          return xmlrpc_client.ServerProxy(value, allow_none=True)
      
      
      @click.group(invoke_without_command=True)
      @click.option('-c', '--config', callback=read_config, type=click.File('r'),
                    help='a json file with default values for subcommands. {"webui": {"port":5001}}')
      @click.option('--logging-config', default=os.path.join(os.path.dirname(__file__), "logging.conf"),
                    help="logging config file for built-in python logging module", show_default=True)
      @click.option('--debug', envvar='DEBUG', default=False, is_flag=True, help='debug mode')
      @click.option('--queue-maxsize', envvar='QUEUE_MAXSIZE', default=100,
                    help='maxsize of queue')
      @click.option('--taskdb', envvar='TASKDB', callback=connect_db,
                    help='database url for taskdb, default: sqlite')
      @click.option('--projectdb', envvar='PROJECTDB', callback=connect_db,
                    help='database url for projectdb, default: sqlite')
      @click.option('--resultdb', envvar='RESULTDB', callback=connect_db,
                    help='database url for resultdb, default: sqlite')
      @click.option('--message-queue', envvar='AMQP_URL',
                    help='connection url to message queue, '
                    'default: builtin multiprocessing.Queue')
      @click.option('--amqp-url', help='[deprecated] amqp url for rabbitmq. '
                    'please use --message-queue instead.')
      @click.option('--beanstalk', envvar='BEANSTALK_HOST',
                    help='[deprecated] beanstalk config for beanstalk queue. '
                    'please use --message-queue instead.')
      @click.option('--phantomjs-proxy', envvar='PHANTOMJS_PROXY', help="phantomjs proxy ip:port")
      @click.option('--data-path', default='./data', help='data dir path')
      @click.option('--add-sys-path/--not-add-sys-path', default=True, is_flag=True,
                    help='add current working directory to python lib search path')
      @click.version_option(version=pyspider.__version__, prog_name=pyspider.__name__)
      @click.pass_context
      def cli(ctx, **kwargs):
          """
          A powerful spider system in python.
          """
          if kwargs['add_sys_path']:
              sys.path.append(os.getcwd())
      
          logging.config.fileConfig(kwargs['logging_config'])
      
          # get db from env
          for db in ('taskdb', 'projectdb', 'resultdb'):
              if kwargs[db] is not None:
                  continue
              if os.environ.get('MYSQL_NAME'):
                  kwargs[db] = utils.Get(lambda db=db: connect_database(
                      'sqlalchemy+mysql+%s://%s:%s/%s' % (
                          db, os.environ['MYSQL_PORT_3306_TCP_ADDR'],
                          os.environ['MYSQL_PORT_3306_TCP_PORT'], db)))
              elif os.environ.get('MONGODB_NAME'):
                  kwargs[db] = utils.Get(lambda db=db: connect_database(
                      'mongodb+%s://%s:%s/%s' % (
                          db, os.environ['MONGODB_PORT_27017_TCP_ADDR'],
                          os.environ['MONGODB_PORT_27017_TCP_PORT'], db)))
              elif ctx.invoked_subcommand == 'bench':
                  if kwargs['data_path'] == './data':
                      kwargs['data_path'] += '/bench'
                      shutil.rmtree(kwargs['data_path'], ignore_errors=True)
                      os.mkdir(kwargs['data_path'])
                  if db in ('taskdb', 'resultdb'):
                      kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s://' % (db)))
                  elif db in ('projectdb', ):
                      kwargs[db] = utils.Get(lambda db=db: connect_database('local+%s://%s' % (
                          db, os.path.join(os.path.dirname(__file__), 'libs/bench.py'))))
              else:
                  if not os.path.exists(kwargs['data_path']):
                      os.mkdir(kwargs['data_path'])
                  kwargs[db] = utils.Get(lambda db=db: connect_database('sqlite+%s:///%s/%s.db' % (
                      db, kwargs['data_path'], db[:-2])))
                  kwargs['is_%s_default' % db] = True
      
          # create folder for counter.dump
          if not os.path.exists(kwargs['data_path']):
              os.mkdir(kwargs['data_path'])
      
          # message queue, compatible with old version
          if kwargs.get('message_queue'):
              pass
          elif kwargs.get('amqp_url'):
              kwargs['message_queue'] = kwargs['amqp_url']
          elif os.environ.get('RABBITMQ_NAME'):
              kwargs['message_queue'] = ("amqp://guest:guest@%(RABBITMQ_PORT_5672_TCP_ADDR)s"
                                         ":%(RABBITMQ_PORT_5672_TCP_PORT)s/%%2F" % os.environ)
          elif kwargs.get('beanstalk'):
              kwargs['message_queue'] = "beanstalk://%s/" % kwargs['beanstalk']
      
          for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
                       'fetcher2processor', 'processor2result'):
              if kwargs.get('message_queue'):
                  kwargs[name] = utils.Get(lambda name=name: connect_message_queue(
                      name, kwargs.get('message_queue'), kwargs['queue_maxsize']))
              else:
                  kwargs[name] = connect_message_queue(name, kwargs.get('message_queue'),
                                                       kwargs['queue_maxsize'])
      
          # phantomjs-proxy
          if kwargs.get('phantomjs_proxy'):
              pass
          elif os.environ.get('PHANTOMJS_NAME'):
              kwargs['phantomjs_proxy'] = os.environ['PHANTOMJS_PORT_25555_TCP'][len('tcp://'):]
      
          ctx.obj = utils.ObjectDict(ctx.obj or {})
          ctx.obj['instances'] = []
          ctx.obj.update(kwargs)
      
          if ctx.invoked_subcommand is None and not ctx.obj.get('testing_mode'):
              ctx.invoke(all)
          return ctx
      
      
      @cli.command()
      @click.option('--xmlrpc/--no-xmlrpc', default=True)
      @click.option('--xmlrpc-host', default='0.0.0.0')
      @click.option('--xmlrpc-port', envvar='SCHEDULER_XMLRPC_PORT', default=23333)
      @click.option('--inqueue-limit', default=0,
                    help='size limit of task queue for each project, '
                    'tasks will been ignored when overflow')
      @click.option('--delete-time', default=24 * 60 * 60,
                    help='delete time before marked as delete')
      @click.option('--active-tasks', default=100, help='active log size')
      @click.option('--loop-limit', default=1000, help='maximum number of tasks due with in a loop')
      @click.option('--fail-pause-num', default=10, help='auto pause the project when last FAIL_PAUSE_NUM task failed, set 0 to disable')
      @click.option('--scheduler-cls', default='pyspider.scheduler.ThreadBaseScheduler', callback=load_cls,
                    help='scheduler class to be used.')
      @click.option('--threads', default=None, help='thread number for ThreadBaseScheduler, default: 4')
      @click.pass_context
      def scheduler(ctx, xmlrpc, xmlrpc_host, xmlrpc_port,
                    inqueue_limit, delete_time, active_tasks, loop_limit, fail_pause_num,
                    scheduler_cls, threads, get_object=False):
          """
          Run Scheduler, only one scheduler is allowed.
          """
          g = ctx.obj
          Scheduler = load_cls(None, None, scheduler_cls)
      
          kwargs = dict(taskdb=g.taskdb, projectdb=g.projectdb, resultdb=g.resultdb,
                        newtask_queue=g.newtask_queue, status_queue=g.status_queue,
                        out_queue=g.scheduler2fetcher, data_path=g.get('data_path', 'data'))
          if threads:
              kwargs['threads'] = int(threads)
      
          scheduler = Scheduler(**kwargs)
          scheduler.INQUEUE_LIMIT = inqueue_limit
          scheduler.DELETE_TIME = delete_time
          scheduler.ACTIVE_TASKS = active_tasks
          scheduler.LOOP_LIMIT = loop_limit
          scheduler.FAIL_PAUSE_NUM = fail_pause_num
      
          g.instances.append(scheduler)
          if g.get('testing_mode') or get_object:
              return scheduler
      
          if xmlrpc:
              utils.run_in_thread(scheduler.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
          scheduler.run()
      
      
      @cli.command()
      @click.option('--xmlrpc/--no-xmlrpc', default=False)
      @click.option('--xmlrpc-host', default='0.0.0.0')
      @click.option('--xmlrpc-port', envvar='FETCHER_XMLRPC_PORT', default=24444)
      @click.option('--poolsize', default=100, help="max simultaneous fetches")
      @click.option('--proxy', help="proxy host:port")
      @click.option('--user-agent', help='user agent')
      @click.option('--timeout', help='default fetch timeout')
      @click.option('--phantomjs-endpoint', help="endpoint of phantomjs, start via pyspider phantomjs")
      @click.option('--splash-endpoint', help="execute endpoint of splash: http://splash.readthedocs.io/en/stable/api.html#execute")
      @click.option('--fetcher-cls', default='pyspider.fetcher.Fetcher', callback=load_cls,
                    help='Fetcher class to be used.')
      @click.pass_context
      def fetcher(ctx, xmlrpc, xmlrpc_host, xmlrpc_port, poolsize, proxy, user_agent,
                  timeout, phantomjs_endpoint, splash_endpoint, fetcher_cls,
                  async_mode=True, get_object=False, no_input=False):
          """
          Run Fetcher.
          """
          g = ctx.obj
          Fetcher = load_cls(None, None, fetcher_cls)
      
          if no_input:
              inqueue = None
              outqueue = None
          else:
              inqueue = g.scheduler2fetcher
              outqueue = g.fetcher2processor
          fetcher = Fetcher(inqueue=inqueue, outqueue=outqueue,
                            poolsize=poolsize, proxy=proxy, async_mode=async_mode)
          fetcher.phantomjs_proxy = phantomjs_endpoint or g.phantomjs_proxy
          fetcher.splash_endpoint = splash_endpoint
          if user_agent:
              fetcher.user_agent = user_agent
          if timeout:
              fetcher.default_options = copy.deepcopy(fetcher.default_options)
              fetcher.default_options['timeout'] = timeout
      
          g.instances.append(fetcher)
          if g.get('testing_mode') or get_object:
              return fetcher
      
          if xmlrpc:
              utils.run_in_thread(fetcher.xmlrpc_run, port=xmlrpc_port, bind=xmlrpc_host)
          fetcher.run()
      
      
      @cli.command()
      @click.option('--processor-cls', default='pyspider.processor.Processor',
                    callback=load_cls, help='Processor class to be used.')
      @click.option('--process-time-limit', default=30, help='script process time limit')
      @click.pass_context
      def processor(ctx, processor_cls, process_time_limit, enable_stdout_capture=True, get_object=False):
          """
          Run Processor.
          """
          g = ctx.obj
          Processor = load_cls(None, None, processor_cls)
      
          processor = Processor(projectdb=g.projectdb,
                                inqueue=g.fetcher2processor, status_queue=g.status_queue,
                                newtask_queue=g.newtask_queue, result_queue=g.processor2result,
                                enable_stdout_capture=enable_stdout_capture,
                                process_time_limit=process_time_limit)
      
          g.instances.append(processor)
          if g.get('testing_mode') or get_object:
              return processor
      
          processor.run()
      
      
      @cli.command()
      @click.option('--result-cls', default='pyspider.result.ResultWorker', callback=load_cls,
                    help='ResultWorker class to be used.')
      @click.pass_context
      def result_worker(ctx, result_cls, get_object=False):
          """
          Run result worker.
          """
          g = ctx.obj
          ResultWorker = load_cls(None, None, result_cls)
      
          result_worker = ResultWorker(resultdb=g.resultdb, inqueue=g.processor2result)
      
          g.instances.append(result_worker)
          if g.get('testing_mode') or get_object:
              return result_worker
      
          result_worker.run()
      
      
      @cli.command()
      @click.option('--host', default='0.0.0.0', envvar='WEBUI_HOST',
                    help='webui bind to host')
      @click.option('--port', default=5000, envvar='WEBUI_PORT',
                    help='webui bind to host')
      @click.option('--cdn', default='//cdnjs.cloudflare.com/ajax/libs/',
                    help='js/css cdn server')
      @click.option('--scheduler-rpc', help='xmlrpc path of scheduler')
      @click.option('--fetcher-rpc', help='xmlrpc path of fetcher')
      @click.option('--max-rate', type=float, help='max rate for each project')
      @click.option('--max-burst', type=float, help='max burst for each project')
      @click.option('--username', envvar='WEBUI_USERNAME',
                    help='username of lock -ed projects')
      @click.option('--password', envvar='WEBUI_PASSWORD',
                    help='password of lock -ed projects')
      @click.option('--need-auth', is_flag=True, default=False, help='need username and password')
      @click.option('--webui-instance', default='pyspider.webui.app.app', callback=load_cls,
                    help='webui Flask Application instance to be used.')
      @click.option('--process-time-limit', default=30, help='script process time limit in debug')
      @click.pass_context
      def webui(ctx, host, port, cdn, scheduler_rpc, fetcher_rpc, max_rate, max_burst,
                username, password, need_auth, webui_instance, process_time_limit, get_object=False):
          """
          Run WebUI
          """
          app = load_cls(None, None, webui_instance)
      
          g = ctx.obj
          app.config['taskdb'] = g.taskdb
          app.config['projectdb'] = g.projectdb
          app.config['resultdb'] = g.resultdb
          app.config['cdn'] = cdn
      
          if max_rate:
              app.config['max_rate'] = max_rate
          if max_burst:
              app.config['max_burst'] = max_burst
          if username:
              app.config['webui_username'] = username
          if password:
              app.config['webui_password'] = password
          app.config['need_auth'] = need_auth
          app.config['process_time_limit'] = process_time_limit
      
          # inject queues for webui
          for name in ('newtask_queue', 'status_queue', 'scheduler2fetcher',
                       'fetcher2processor', 'processor2result'):
              app.config['queues'][name] = getattr(g, name, None)
      
          # fetcher rpc
          if isinstance(fetcher_rpc, six.string_types):
              import umsgpack
              fetcher_rpc = connect_rpc(ctx, None, fetcher_rpc)
              app.config['fetch'] = lambda x: umsgpack.unpackb(fetcher_rpc.fetch(x).data)
          else:
              # get fetcher instance for webui
              fetcher_config = g.config.get('fetcher', {})
              webui_fetcher = ctx.invoke(fetcher, async_mode=False, get_object=True, no_input=True, **fetcher_config)
      
              app.config['fetch'] = lambda x: webui_fetcher.fetch(x)
      
          if isinstance(scheduler_rpc, six.string_types):
              scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
          if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
              app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://%s/' % (
                  os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
          elif scheduler_rpc is None:
              app.config['scheduler_rpc'] = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
          else:
              app.config['scheduler_rpc'] = scheduler_rpc
      
          app.debug = g.debug
          g.instances.append(app)
          if g.get('testing_mode') or get_object:
              return app
      
          app.run(host=host, port=port)
      
      
      @cli.command()
      @click.option('--phantomjs-path', default='phantomjs', help='phantomjs path')
      @click.option('--port', default=25555, help='phantomjs port')
      @click.option('--auto-restart', default=False, help='auto restart phantomjs if crashed')
      @click.argument('args', nargs=-1)
      @click.pass_context
      def phantomjs(ctx, phantomjs_path, port, auto_restart, args):
          """
          Run phantomjs fetcher if phantomjs is installed.
          """
          args = args or ctx.default_map and ctx.default_map.get('args', [])
      
          import subprocess
          g = ctx.obj
          _quit = []
          phantomjs_fetcher = os.path.join(
              os.path.dirname(pyspider.__file__), 'fetcher/phantomjs_fetcher.js')
          cmd = [phantomjs_path,
                 # this may cause memory leak: https://github.com/ariya/phantomjs/issues/12903
                 #'--load-images=false',
                 '--ssl-protocol=any',
                 '--disk-cache=true'] + list(args or []) + [phantomjs_fetcher, str(port)]
      
          try:
              _phantomjs = subprocess.Popen(cmd)
          except OSError:
              logging.warning('phantomjs not found, continue running without it.')
              return None
      
          def quit(*args, **kwargs):
              _quit.append(1)
              _phantomjs.kill()
              _phantomjs.wait()
              logging.info('phantomjs exited.')
      
          if not g.get('phantomjs_proxy'):
              g['phantomjs_proxy'] = '127.0.0.1:%s' % port
      
          phantomjs = utils.ObjectDict(port=port, quit=quit)
          g.instances.append(phantomjs)
          if g.get('testing_mode'):
              return phantomjs
      
          while True:
              _phantomjs.wait()
              if _quit or not auto_restart:
                  break
              _phantomjs = subprocess.Popen(cmd)
      
      
      @cli.command()
      @click.option('--fetcher-num', default=1, help='instance num of fetcher')
      @click.option('--processor-num', default=1, help='instance num of processor')
      @click.option('--result-worker-num', default=1,
                    help='instance num of result worker')
      @click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
                    help='run each components in thread or subprocess. '
                    'always using thread for windows.')
      @click.pass_context
      def all(ctx, fetcher_num, processor_num, result_worker_num, run_in):
          """
          Run all the components in subprocess or thread
          """
      
          ctx.obj['debug'] = False
          g = ctx.obj
      
          # FIXME: py34 cannot run components with threads
          if run_in == 'subprocess' and os.name != 'nt':
              run_in = utils.run_in_subprocess
          else:
              run_in = utils.run_in_thread
      
          threads = []
      
          try:
              # phantomjs
              if not g.get('phantomjs_proxy'):
                  phantomjs_config = g.config.get('phantomjs', {})
                  phantomjs_config.setdefault('auto_restart', True)
                  threads.append(run_in(ctx.invoke, phantomjs, **phantomjs_config))
                  time.sleep(2)
                  if threads[-1].is_alive() and not g.get('phantomjs_proxy'):
                      g['phantomjs_proxy'] = '127.0.0.1:%s' % phantomjs_config.get('port', 25555)
      
              # result worker
              result_worker_config = g.config.get('result_worker', {})
              for i in range(result_worker_num):
                  threads.append(run_in(ctx.invoke, result_worker, **result_worker_config))
      
              # processor
              processor_config = g.config.get('processor', {})
              for i in range(processor_num):
                  threads.append(run_in(ctx.invoke, processor, **processor_config))
      
              # fetcher
              fetcher_config = g.config.get('fetcher', {})
              fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
              for i in range(fetcher_num):
                  threads.append(run_in(ctx.invoke, fetcher, **fetcher_config))
      
              # scheduler
              scheduler_config = g.config.get('scheduler', {})
              scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
              threads.append(run_in(ctx.invoke, scheduler, **scheduler_config))
      
              # running webui in main thread to make it exitable
              webui_config = g.config.get('webui', {})
              webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
                                      % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
              ctx.invoke(webui, **webui_config)
          finally:
              # exit components run in threading
              for each in g.instances:
                  each.quit()
      
              # exit components run in subprocess
              for each in threads:
                  if not each.is_alive():
                      continue
                  if hasattr(each, 'terminate'):
                      each.terminate()
                  each.join()
      
      
      @cli.command()
      @click.option('--fetcher-num', default=1, help='instance num of fetcher')
      @click.option('--processor-num', default=2, help='instance num of processor')
      @click.option('--result-worker-num', default=1, help='instance num of result worker')
      @click.option('--run-in', default='subprocess', type=click.Choice(['subprocess', 'thread']),
                    help='run each components in thread or subprocess. '
                    'always using thread for windows.')
      @click.option('--total', default=10000, help="total url in test page")
      @click.option('--show', default=20, help="show how many urls in a page")
      @click.option('--taskdb-bench', default=False, is_flag=True,
                    help="only run taskdb bench test")
      @click.option('--message-queue-bench', default=False, is_flag=True,
                    help="only run message queue bench test")
      @click.option('--all-bench', default=False, is_flag=True,
                    help="only run all bench test")
      @click.pass_context
      def bench(ctx, fetcher_num, processor_num, result_worker_num, run_in, total, show,
                taskdb_bench, message_queue_bench, all_bench):
          """
          Run Benchmark test.
          In bench mode, in-memory sqlite database is used instead of on-disk sqlite database.
          """
          from pyspider.libs import bench
          from pyspider.webui import bench_test  # flake8: noqa
      
          ctx.obj['debug'] = False
          g = ctx.obj
          if result_worker_num == 0:
              g['processor2result'] = None
      
          if run_in == 'subprocess' and os.name != 'nt':
              run_in = utils.run_in_subprocess
          else:
              run_in = utils.run_in_thread
      
          all_test = not taskdb_bench and not message_queue_bench and not all_bench
      
          # test taskdb
          if all_test or taskdb_bench:
              bench.bench_test_taskdb(g.taskdb)
          # test message queue
          if all_test or message_queue_bench:
              bench.bench_test_message_queue(g.scheduler2fetcher)
          # test all
          if not all_test and not all_bench:
              return
      
          project_name = 'bench'
      
          def clear_project():
              g.taskdb.drop(project_name)
              g.resultdb.drop(project_name)
      
          clear_project()
      
          # disable log
          logging.getLogger().setLevel(logging.ERROR)
          logging.getLogger('scheduler').setLevel(logging.ERROR)
          logging.getLogger('fetcher').setLevel(logging.ERROR)
          logging.getLogger('processor').setLevel(logging.ERROR)
          logging.getLogger('result').setLevel(logging.ERROR)
          logging.getLogger('webui').setLevel(logging.ERROR)
          logging.getLogger('werkzeug').setLevel(logging.ERROR)
      
          try:
              threads = []
      
              # result worker
              result_worker_config = g.config.get('result_worker', {})
              for i in range(result_worker_num):
                  threads.append(run_in(ctx.invoke, result_worker,
                                        result_cls='pyspider.libs.bench.BenchResultWorker',
                                        **result_worker_config))
      
              # processor
              processor_config = g.config.get('processor', {})
              for i in range(processor_num):
                  threads.append(run_in(ctx.invoke, processor,
                                        processor_cls='pyspider.libs.bench.BenchProcessor',
                                        **processor_config))
      
              # fetcher
              fetcher_config = g.config.get('fetcher', {})
              fetcher_config.setdefault('xmlrpc_host', '127.0.0.1')
              for i in range(fetcher_num):
                  threads.append(run_in(ctx.invoke, fetcher,
                                        fetcher_cls='pyspider.libs.bench.BenchFetcher',
                                        **fetcher_config))
      
              # webui
              webui_config = g.config.get('webui', {})
              webui_config.setdefault('scheduler_rpc', 'http://127.0.0.1:%s/'
                                      % g.config.get('scheduler', {}).get('xmlrpc_port', 23333))
              threads.append(run_in(ctx.invoke, webui, **webui_config))
      
              # scheduler
              scheduler_config = g.config.get('scheduler', {})
              scheduler_config.setdefault('xmlrpc_host', '127.0.0.1')
              scheduler_config.setdefault('xmlrpc_port', 23333)
              threads.append(run_in(ctx.invoke, scheduler,
                                    scheduler_cls='pyspider.libs.bench.BenchScheduler',
                                    **scheduler_config))
              scheduler_rpc = connect_rpc(ctx, None,
                                          'http://%(xmlrpc_host)s:%(xmlrpc_port)s/' % scheduler_config)
      
              for _ in range(20):
                  if utils.check_port_open(23333):
                      break
                  time.sleep(1)
      
              scheduler_rpc.newtask({
                  "project": project_name,
                  "taskid": "on_start",
                  "url": "data:,on_start",
                  "fetch": {
                      "save": {"total": total, "show": show}
                  },
                  "process": {
                      "callback": "on_start",
                  },
              })
      
              # wait bench test finished
              while True:
                  time.sleep(1)
                  if scheduler_rpc.size() == 0:
                      break
          finally:
              # exit components run in threading
              for each in g.instances:
                  each.quit()
      
              # exit components run in subprocess
              for each in threads:
                  if hasattr(each, 'terminate'):
                      each.terminate()
                  each.join(1)
      
              clear_project()
      
      
      @cli.command()
      @click.option('-i', '--interactive', default=False, is_flag=True,
                    help='enable interactive mode, you can choose crawl url.')
      @click.option('--phantomjs', 'enable_phantomjs', default=False, is_flag=True,
                    help='enable phantomjs, will spawn a subprocess for phantomjs')
      @click.argument('scripts', nargs=-1)
      @click.pass_context
      def one(ctx, interactive, enable_phantomjs, scripts):
          """
          One mode not only means all-in-one, it runs every thing in one process over
          tornado.ioloop, for debug purpose
          """
      
          ctx.obj['debug'] = False
          g = ctx.obj
          g['testing_mode'] = True
      
          if scripts:
              from pyspider.database.local.projectdb import ProjectDB
              g['projectdb'] = ProjectDB(scripts)
              if g.get('is_taskdb_default'):
                  g['taskdb'] = connect_database('sqlite+taskdb://')
              if g.get('is_resultdb_default'):
                  g['resultdb'] = None
      
          if enable_phantomjs:
              phantomjs_config = g.config.get('phantomjs', {})
              phantomjs_obj = ctx.invoke(phantomjs, **phantomjs_config)
              if phantomjs_obj:
                  g.setdefault('phantomjs_proxy', '127.0.0.1:%s' % phantomjs_obj.port)
          else:
              phantomjs_obj = None
      
          result_worker_config = g.config.get('result_worker', {})
          if g.resultdb is None:
              result_worker_config.setdefault('result_cls',
                                              'pyspider.result.OneResultWorker')
          result_worker_obj = ctx.invoke(result_worker, **result_worker_config)
      
          processor_config = g.config.get('processor', {})
          processor_config.setdefault('enable_stdout_capture', False)
          processor_obj = ctx.invoke(processor, **processor_config)
      
          fetcher_config = g.config.get('fetcher', {})
          fetcher_config.setdefault('xmlrpc', False)
          fetcher_obj = ctx.invoke(fetcher, **fetcher_config)
      
          scheduler_config = g.config.get('scheduler', {})
          scheduler_config.setdefault('xmlrpc', False)
          scheduler_config.setdefault('scheduler_cls',
                                      'pyspider.scheduler.OneScheduler')
          scheduler_obj = ctx.invoke(scheduler, **scheduler_config)
      
          scheduler_obj.init_one(ioloop=fetcher_obj.ioloop,
                                 fetcher=fetcher_obj,
                                 processor=processor_obj,
                                 result_worker=result_worker_obj,
                                 interactive=interactive)
          if scripts:
              for project in g.projectdb.projects:
                  scheduler_obj.trigger_on_start(project)
      
          try:
              scheduler_obj.run()
          finally:
              scheduler_obj.quit()
              if phantomjs_obj:
                  phantomjs_obj.quit()
      
      
      @cli.command()
      @click.option('--scheduler-rpc', callback=connect_rpc, help='xmlrpc path of scheduler')
      @click.argument('project', nargs=1)
      @click.argument('message', nargs=1)
      @click.pass_context
      def send_message(ctx, scheduler_rpc, project, message):
          """
          Send Message to project from command line
          """
          if isinstance(scheduler_rpc, six.string_types):
              scheduler_rpc = connect_rpc(ctx, None, scheduler_rpc)
          if scheduler_rpc is None and os.environ.get('SCHEDULER_NAME'):
              scheduler_rpc = connect_rpc(ctx, None, 'http://%s/' % (
                  os.environ['SCHEDULER_PORT_23333_TCP'][len('tcp://'):]))
          if scheduler_rpc is None:
              scheduler_rpc = connect_rpc(ctx, None, 'http://127.0.0.1:23333/')
      
          return scheduler_rpc.send_task({
              'taskid': utils.md5string('data:,on_message'),
              'project': project,
              'url': 'data:,on_message',
              'fetch': {
                  'save': ('__command__', message),
              },
              'process': {
                  'callback': '_on_message',
              }
          })
      
      
      def main():
          cli()
      
      if __name__ == '__main__':
          main()
      View Code

 

  • 2.fetcher\tornado_fetcher.py(注意:替换这个文件稍微麻烦一点,注意只替换里面的变量,不替换包里带有async文字的东西
  • #!/usr/bin/env python
    # -*- encoding: utf-8 -*-
    # vim: set et sw=4 ts=4 sts=4 ff=unix fenc=utf8:
    # Author: Binux<i@binux.me>
    #         http://binux.me
    # Created on 2012-12-17 11:07:19
    
    from __future__ import unicode_literals
    
    import os
    import sys
    import six
    import copy
    import time
    import json
    import logging
    import traceback
    import functools
    import threading
    import tornado.ioloop
    import tornado.httputil
    import tornado.httpclient
    import pyspider
    
    from six.moves import queue, http_cookies
    from six.moves.urllib.robotparser import RobotFileParser
    from requests import cookies
    from six.moves.urllib.parse import urljoin, urlsplit
    from tornado import gen
    from tornado.curl_httpclient import CurlAsyncHTTPClient
    from tornado.simple_httpclient import SimpleAsyncHTTPClient
    
    from pyspider.libs import utils, dataurl, counter
    from pyspider.libs.url import quote_chinese
    from .cookie_utils import extract_cookies_to_jar
    logger = logging.getLogger('fetcher')
    
    
    class MyCurlAsyncHTTPClient(CurlAsyncHTTPClient):
    
        def free_size(self):
            return len(self._free_list)
    
        def size(self):
            return len(self._curls) - self.free_size()
    
    
    class MySimpleAsyncHTTPClient(SimpleAsyncHTTPClient):
    
        def free_size(self):
            return self.max_clients - self.size()
    
        def size(self):
            return len(self.active)
    
    fetcher_output = {
        "status_code": int,
        "orig_url": str,
        "url": str,
        "headers": dict,
        "content": str,
        "cookies": dict,
    }
    
    
    class Fetcher(object):
        user_agent = "pyspider/%s (+http://pyspider.org/)" % pyspider.__version__
        default_options = {
            'method': 'GET',
            'headers': {
            },
            'use_gzip': True,
            'timeout': 120,
            'connect_timeout': 20,
        }
        phantomjs_proxy = None
        splash_endpoint = None
        splash_lua_source = open(os.path.join(os.path.dirname(__file__), "splash_fetcher.lua")).read()
        robot_txt_age = 60*60  # 1h
    
        def __init__(self, inqueue, outqueue, poolsize=100, proxy=None, async_mode=True):
            self.inqueue = inqueue
            self.outqueue = outqueue
    
            self.poolsize = poolsize
            self._running = False
            self._quit = False
            self.proxy = proxy
            self.async_mode = async_mode
            self.ioloop = tornado.ioloop.IOLoop()
    
            self.robots_txt_cache = {}
    
            # binding io_loop to http_client here
            if self.async_mode:
                self.http_client = MyCurlAsyncHTTPClient(max_clients=self.poolsize,
                                                         io_loop=self.ioloop)
            else:
                self.http_client = tornado.httpclient.HTTPClient(MyCurlAsyncHTTPClient, max_clients=self.poolsize)
    
            self._cnt = {
                '5m': counter.CounterManager(
                    lambda: counter.TimebaseAverageWindowCounter(30, 10)),
                '1h': counter.CounterManager(
                    lambda: counter.TimebaseAverageWindowCounter(60, 60)),
            }
    
        def send_result(self, type, task, result):
            '''Send fetch result to processor'''
            if self.outqueue:
                try:
                    self.outqueue.put((task, result))
                except Exception as e:
                    logger.exception(e)
    
        def fetch(self, task, callback=None):
            if self.async_mode:
                return self.async_mode_fetch(task, callback)
            else:
                return self.async_mode_fetch(task, callback).result()
    
        @gen.coroutine
        def async_mode_fetch(self, task, callback=None):
            '''Do one fetch'''
            url = task.get('url', 'data:,')
            if callback is None:
                callback = self.send_result
    
            type = 'None'
            start_time = time.time()
            try:
                if url.startswith('data:'):
                    type = 'data'
                    result = yield gen.maybe_future(self.data_fetch(url, task))
                elif task.get('fetch', {}).get('fetch_type') in ('js', 'phantomjs'):
                    type = 'phantomjs'
                    result = yield self.phantomjs_fetch(url, task)
                elif task.get('fetch', {}).get('fetch_type') in ('splash', ):
                    type = 'splash'
                    result = yield self.splash_fetch(url, task)
                else:
                    type = 'http'
                    result = yield self.http_fetch(url, task)
            except Exception as e:
                logger.exception(e)
                result = self.handle_error(type, url, task, start_time, e)
    
            callback(type, task, result)
            self.on_result(type, task, result)
            raise gen.Return(result)
    
        def sync_fetch(self, task):
            '''Synchronization fetch, usually used in xmlrpc thread'''
            if not self._running:
                return self.ioloop.run_sync(functools.partial(self.async_mode_fetch, task, lambda t, _, r: True))
    
            wait_result = threading.Condition()
            _result = {}
    
            def callback(type, task, result):
                wait_result.acquire()
                _result['type'] = type
                _result['task'] = task
                _result['result'] = result
                wait_result.notify()
                wait_result.release()
    
            wait_result.acquire()
            self.ioloop.add_callback(self.fetch, task, callback)
            while 'result' not in _result:
                wait_result.wait()
            wait_result.release()
            return _result['result']
    
        def data_fetch(self, url, task):
            '''A fake fetcher for dataurl'''
            self.on_fetch('data', task)
            result = {}
            result['orig_url'] = url
            result['content'] = dataurl.decode(url)
            result['headers'] = {}
            result['status_code'] = 200
            result['url'] = url
            result['cookies'] = {}
            result['time'] = 0
            result['save'] = task.get('fetch', {}).get('save')
            if len(result['content']) < 70:
                logger.info("[200] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
            else:
                logger.info(
                    "[200] %s:%s data:,%s...[content:%d] 0s",
                    task.get('project'), task.get('taskid'),
                    result['content'][:70],
                    len(result['content'])
                )
    
            return result
    
        def handle_error(self, type, url, task, start_time, error):
            result = {
                'status_code': getattr(error, 'code', 599),
                'error': utils.text(error),
                'traceback': traceback.format_exc() if sys.exc_info()[0] else None,
                'content': "",
                'time': time.time() - start_time,
                'orig_url': url,
                'url': url,
                "save": task.get('fetch', {}).get('save')
            }
            logger.error("[%d] %s:%s %s, %r %.2fs",
                         result['status_code'], task.get('project'), task.get('taskid'),
                         url, error, result['time'])
            return result
    
        allowed_options = ['method', 'data', 'connect_timeout', 'timeout', 'cookies', 'use_gzip', 'validate_cert']
    
        def pack_tornado_request_parameters(self, url, task):
            fetch = copy.deepcopy(self.default_options)
            fetch['url'] = url
            fetch['headers'] = tornado.httputil.HTTPHeaders(fetch['headers'])
            fetch['headers']['User-Agent'] = self.user_agent
            task_fetch = task.get('fetch', {})
            for each in self.allowed_options:
                if each in task_fetch:
                    fetch[each] = task_fetch[each]
            fetch['headers'].update(task_fetch.get('headers', {}))
    
            if task.get('track'):
                track_headers = tornado.httputil.HTTPHeaders(
                    task.get('track', {}).get('fetch', {}).get('headers') or {})
                track_ok = task.get('track', {}).get('process', {}).get('ok', False)
            else:
                track_headers = {}
                track_ok = False
            # proxy
            proxy_string = None
            if isinstance(task_fetch.get('proxy'), six.string_types):
                proxy_string = task_fetch['proxy']
            elif self.proxy and task_fetch.get('proxy', True):
                proxy_string = self.proxy
            if proxy_string:
                if '://' not in proxy_string:
                    proxy_string = 'http://' + proxy_string
                proxy_splited = urlsplit(proxy_string)
                fetch['proxy_host'] = proxy_splited.hostname
                if proxy_splited.username:
                    fetch['proxy_username'] = proxy_splited.username
                if proxy_splited.password:
                    fetch['proxy_password'] = proxy_splited.password
                if six.PY2:
                    for key in ('proxy_host', 'proxy_username', 'proxy_password'):
                        if key in fetch:
                            fetch[key] = fetch[key].encode('utf8')
                fetch['proxy_port'] = proxy_splited.port or 8080
    
            # etag
            if task_fetch.get('etag', True):
                _t = None
                if isinstance(task_fetch.get('etag'), six.string_types):
                    _t = task_fetch.get('etag')
                elif track_ok:
                    _t = track_headers.get('etag')
                if _t and 'If-None-Match' not in fetch['headers']:
                    fetch['headers']['If-None-Match'] = _t
            # last modifed
            if task_fetch.get('last_modified', task_fetch.get('last_modifed', True)):
                last_modified = task_fetch.get('last_modified', task_fetch.get('last_modifed', True))
                _t = None
                if isinstance(last_modified, six.string_types):
                    _t = last_modified
                elif track_ok:
                    _t = track_headers.get('last-modified')
                if _t and 'If-Modified-Since' not in fetch['headers']:
                    fetch['headers']['If-Modified-Since'] = _t
            # timeout
            if 'timeout' in fetch:
                fetch['request_timeout'] = fetch['timeout']
                del fetch['timeout']
            # data rename to body
            if 'data' in fetch:
                fetch['body'] = fetch['data']
                del fetch['data']
    
            return fetch
    
        @gen.coroutine
        def can_fetch(self, user_agent, url):
            parsed = urlsplit(url)
            domain = parsed.netloc
            if domain in self.robots_txt_cache:
                robot_txt = self.robots_txt_cache[domain]
                if time.time() - robot_txt.mtime() > self.robot_txt_age:
                    robot_txt = None
            else:
                robot_txt = None
    
            if robot_txt is None:
                robot_txt = RobotFileParser()
                try:
                    response = yield gen.maybe_future(self.http_client.fetch(
                        urljoin(url, '/robots.txt'), connect_timeout=10, request_timeout=30))
                    content = response.body
                except tornado.httpclient.HTTPError as e:
                    logger.error('load robots.txt from %s error: %r', domain, e)
                    content = ''
    
                try:
                    content = content.decode('utf8', 'ignore')
                except UnicodeDecodeError:
                    content = ''
    
                robot_txt.parse(content.splitlines())
                self.robots_txt_cache[domain] = robot_txt
    
            raise gen.Return(robot_txt.can_fetch(user_agent, url))
    
        def clear_robot_txt_cache(self):
            now = time.time()
            for domain, robot_txt in self.robots_txt_cache.items():
                if now - robot_txt.mtime() > self.robot_txt_age:
                    del self.robots_txt_cache[domain]
    
        @gen.coroutine
        def http_fetch(self, url, task):
            '''HTTP fetcher'''
            start_time = time.time()
            self.on_fetch('http', task)
            handle_error = lambda x: self.handle_error('http', url, task, start_time, x)
    
            # setup request parameters
            fetch = self.pack_tornado_request_parameters(url, task)
            task_fetch = task.get('fetch', {})
    
            session = cookies.RequestsCookieJar()
            # fix for tornado request obj
            if 'Cookie' in fetch['headers']:
                c = http_cookies.SimpleCookie()
                try:
                    c.load(fetch['headers']['Cookie'])
                except AttributeError:
                    c.load(utils.utf8(fetch['headers']['Cookie']))
                for key in c:
                    session.set(key, c[key])
                del fetch['headers']['Cookie']
            if 'cookies' in fetch:
                session.update(fetch['cookies'])
                del fetch['cookies']
    
            max_redirects = task_fetch.get('max_redirects', 5)
            # we will handle redirects by hand to capture cookies
            fetch['follow_redirects'] = False
    
            # making requests
            while True:
                # robots.txt
                if task_fetch.get('robots_txt', False):
                    can_fetch = yield self.can_fetch(fetch['headers']['User-Agent'], fetch['url'])
                    if not can_fetch:
                        error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
                        raise gen.Return(handle_error(error))
    
                try:
                    request = tornado.httpclient.HTTPRequest(**fetch)
                    # if cookie already in header, get_cookie_header wouldn't work
                    old_cookie_header = request.headers.get('Cookie')
                    if old_cookie_header:
                        del request.headers['Cookie']
                    cookie_header = cookies.get_cookie_header(session, request)
                    if cookie_header:
                        request.headers['Cookie'] = cookie_header
                    elif old_cookie_header:
                        request.headers['Cookie'] = old_cookie_header
                except Exception as e:
                    logger.exception(fetch)
                    raise gen.Return(handle_error(e))
    
                try:
                    response = yield gen.maybe_future(self.http_client.fetch(request))
                except tornado.httpclient.HTTPError as e:
                    if e.response:
                        response = e.response
                    else:
                        raise gen.Return(handle_error(e))
    
                extract_cookies_to_jar(session, response.request, response.headers)
                if (response.code in (301, 302, 303, 307)
                        and response.headers.get('Location')
                        and task_fetch.get('allow_redirects', True)):
                    if max_redirects <= 0:
                        error = tornado.httpclient.HTTPError(
                            599, 'Maximum (%d) redirects followed' % task_fetch.get('max_redirects', 5),
                            response)
                        raise gen.Return(handle_error(error))
                    if response.code in (302, 303):
                        fetch['method'] = 'GET'
                        if 'body' in fetch:
                            del fetch['body']
                    fetch['url'] = quote_chinese(urljoin(fetch['url'], response.headers['Location']))
                    fetch['request_timeout'] -= time.time() - start_time
                    if fetch['request_timeout'] < 0:
                        fetch['request_timeout'] = 0.1
                    max_redirects -= 1
                    continue
    
                result = {}
                result['orig_url'] = url
                result['content'] = response.body or ''
                result['headers'] = dict(response.headers)
                result['status_code'] = response.code
                result['url'] = response.effective_url or url
                result['time'] = time.time() - start_time
                result['cookies'] = session.get_dict()
                result['save'] = task_fetch.get('save')
                if response.error:
                    result['error'] = utils.text(response.error)
                if 200 <= response.code < 300:
                    logger.info("[%d] %s:%s %s %.2fs", response.code,
                                task.get('project'), task.get('taskid'),
                                url, result['time'])
                else:
                    logger.warning("[%d] %s:%s %s %.2fs", response.code,
                                   task.get('project'), task.get('taskid'),
                                   url, result['time'])
    
                raise gen.Return(result)
    
        @gen.coroutine
        def phantomjs_fetch(self, url, task):
            '''Fetch with phantomjs proxy'''
            start_time = time.time()
            self.on_fetch('phantomjs', task)
            handle_error = lambda x: self.handle_error('phantomjs', url, task, start_time, x)
    
            # check phantomjs proxy is enabled
            if not self.phantomjs_proxy:
                result = {
                    "orig_url": url,
                    "content": "phantomjs is not enabled.",
                    "headers": {},
                    "status_code": 501,
                    "url": url,
                    "time": time.time() - start_time,
                    "cookies": {},
                    "save": task.get('fetch', {}).get('save')
                }
                logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
                raise gen.Return(result)
    
            # setup request parameters
            fetch = self.pack_tornado_request_parameters(url, task)
            task_fetch = task.get('fetch', {})
            for each in task_fetch:
                if each not in fetch:
                    fetch[each] = task_fetch[each]
    
            # robots.txt
            if task_fetch.get('robots_txt', False):
                user_agent = fetch['headers']['User-Agent']
                can_fetch = yield self.can_fetch(user_agent, url)
                if not can_fetch:
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
                    raise gen.Return(handle_error(error))
    
            request_conf = {
                'follow_redirects': False
            }
            request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
            request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
    
            session = cookies.RequestsCookieJar()
            if 'Cookie' in fetch['headers']:
                c = http_cookies.SimpleCookie()
                try:
                    c.load(fetch['headers']['Cookie'])
                except AttributeError:
                    c.load(utils.utf8(fetch['headers']['Cookie']))
                for key in c:
                    session.set(key, c[key])
                del fetch['headers']['Cookie']
            if 'cookies' in fetch:
                session.update(fetch['cookies'])
                del fetch['cookies']
    
            request = tornado.httpclient.HTTPRequest(url=fetch['url'])
            cookie_header = cookies.get_cookie_header(session, request)
            if cookie_header:
                fetch['headers']['Cookie'] = cookie_header
    
            # making requests
            fetch['headers'] = dict(fetch['headers'])
            try:
                request = tornado.httpclient.HTTPRequest(
                    url=self.phantomjs_proxy, method="POST",
                    body=json.dumps(fetch), **request_conf)
            except Exception as e:
                raise gen.Return(handle_error(e))
    
            try:
                response = yield gen.maybe_future(self.http_client.fetch(request))
            except tornado.httpclient.HTTPError as e:
                if e.response:
                    response = e.response
                else:
                    raise gen.Return(handle_error(e))
    
            if not response.body:
                raise gen.Return(handle_error(Exception('no response from phantomjs: %r' % response)))
    
            result = {}
            try:
                result = json.loads(utils.text(response.body))
                assert 'status_code' in result, result
            except Exception as e:
                if response.error:
                    result['error'] = utils.text(response.error)
                raise gen.Return(handle_error(e))
    
            if result.get('status_code', 200):
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
                            task.get('project'), task.get('taskid'), url, result['time'])
            else:
                logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
                             task.get('project'), task.get('taskid'),
                             url, result['content'], result['time'])
    
            raise gen.Return(result)
    
        @gen.coroutine
        def splash_fetch(self, url, task):
            '''Fetch with splash'''
            start_time = time.time()
            self.on_fetch('splash', task)
            handle_error = lambda x: self.handle_error('splash', url, task, start_time, x)
    
            # check phantomjs proxy is enabled
            if not self.splash_endpoint:
                result = {
                    "orig_url": url,
                    "content": "splash is not enabled.",
                    "headers": {},
                    "status_code": 501,
                    "url": url,
                    "time": time.time() - start_time,
                    "cookies": {},
                    "save": task.get('fetch', {}).get('save')
                }
                logger.warning("[501] %s:%s %s 0s", task.get('project'), task.get('taskid'), url)
                raise gen.Return(result)
    
            # setup request parameters
            fetch = self.pack_tornado_request_parameters(url, task)
            task_fetch = task.get('fetch', {})
            for each in task_fetch:
                if each not in fetch:
                    fetch[each] = task_fetch[each]
    
            # robots.txt
            if task_fetch.get('robots_txt', False):
                user_agent = fetch['headers']['User-Agent']
                can_fetch = yield self.can_fetch(user_agent, url)
                if not can_fetch:
                    error = tornado.httpclient.HTTPError(403, 'Disallowed by robots.txt')
                    raise gen.Return(handle_error(error))
    
            request_conf = {
                'follow_redirects': False,
                'headers': {
                    'Content-Type': 'application/json',
                }
            }
            request_conf['connect_timeout'] = fetch.get('connect_timeout', 20)
            request_conf['request_timeout'] = fetch.get('request_timeout', 120) + 1
    
            session = cookies.RequestsCookieJar()
            if 'Cookie' in fetch['headers']:
                c = http_cookies.SimpleCookie()
                try:
                    c.load(fetch['headers']['Cookie'])
                except AttributeError:
                    c.load(utils.utf8(fetch['headers']['Cookie']))
                for key in c:
                    session.set(key, c[key])
                del fetch['headers']['Cookie']
            if 'cookies' in fetch:
                session.update(fetch['cookies'])
                del fetch['cookies']
    
            request = tornado.httpclient.HTTPRequest(url=fetch['url'])
            cookie_header = cookies.get_cookie_header(session, request)
            if cookie_header:
                fetch['headers']['Cookie'] = cookie_header
    
            # making requests
            fetch['lua_source'] = self.splash_lua_source
            fetch['headers'] = dict(fetch['headers'])
            try:
                request = tornado.httpclient.HTTPRequest(
                    url=self.splash_endpoint, method="POST",
                    body=json.dumps(fetch), **request_conf)
            except Exception as e:
                raise gen.Return(handle_error(e))
    
            try:
                response = yield gen.maybe_future(self.http_client.fetch(request))
            except tornado.httpclient.HTTPError as e:
                if e.response:
                    response = e.response
                else:
                    raise gen.Return(handle_error(e))
    
            if not response.body:
                raise gen.Return(handle_error(Exception('no response from phantomjs')))
    
            result = {}
            try:
                result = json.loads(utils.text(response.body))
                assert 'status_code' in result, result
            except ValueError as e:
                logger.error("result is not json: %r", response.body[:500])
                raise gen.Return(handle_error(e))
            except Exception as e:
                if response.error:
                    result['error'] = utils.text(response.error)
                raise gen.Return(handle_error(e))
    
            if result.get('status_code', 200):
                logger.info("[%d] %s:%s %s %.2fs", result['status_code'],
                            task.get('project'), task.get('taskid'), url, result['time'])
            else:
                logger.error("[%d] %s:%s %s, %r %.2fs", result['status_code'],
                             task.get('project'), task.get('taskid'),
                             url, result['content'], result['time'])
    
            raise gen.Return(result)
    
        def run(self):
            '''Run loop'''
            logger.info("fetcher starting...")
    
            def queue_loop():
                if not self.outqueue or not self.inqueue:
                    return
                while not self._quit:
                    try:
                        if self.outqueue.full():
                            break
                        if self.http_client.free_size() <= 0:
                            break
                        task = self.inqueue.get_nowait()
                        # FIXME: decode unicode_obj should used after data selete from
                        # database, it's used here for performance
                        task = utils.decode_unicode_obj(task)
                        self.fetch(task)
                    except queue.Empty:
                        break
                    except KeyboardInterrupt:
                        break
                    except Exception as e:
                        logger.exception(e)
                        break
    
            tornado.ioloop.PeriodicCallback(queue_loop, 100, io_loop=self.ioloop).start()
            tornado.ioloop.PeriodicCallback(self.clear_robot_txt_cache, 10000, io_loop=self.ioloop).start()
            self._running = True
    
            try:
                self.ioloop.start()
            except KeyboardInterrupt:
                pass
    
            logger.info("fetcher exiting...")
    
        def quit(self):
            '''Quit fetcher'''
            self._running = False
            self._quit = True
            self.ioloop.add_callback(self.ioloop.stop)
            if hasattr(self, 'xmlrpc_server'):
                self.xmlrpc_ioloop.add_callback(self.xmlrpc_server.stop)
                self.xmlrpc_ioloop.add_callback(self.xmlrpc_ioloop.stop)
    
        def size(self):
            return self.http_client.size()
    
        def xmlrpc_run(self, port=24444, bind='127.0.0.1', logRequests=False):
            '''Run xmlrpc server'''
            import umsgpack
            from pyspider.libs.wsgi_xmlrpc import WSGIXMLRPCApplication
            try:
                from xmlrpc.client import Binary
            except ImportError:
                from xmlrpclib import Binary
    
            application = WSGIXMLRPCApplication()
    
            application.register_function(self.quit, '_quit')
            application.register_function(self.size)
    
            def sync_fetch(task):
                result = self.sync_fetch(task)
                result = Binary(umsgpack.packb(result))
                return result
            application.register_function(sync_fetch, 'fetch')
    
            def dump_counter(_time, _type):
                return self._cnt[_time].to_dict(_type)
            application.register_function(dump_counter, 'counter')
    
            import tornado.wsgi
            import tornado.ioloop
            import tornado.httpserver
    
            container = tornado.wsgi.WSGIContainer(application)
            self.xmlrpc_ioloop = tornado.ioloop.IOLoop()
            self.xmlrpc_server = tornado.httpserver.HTTPServer(container, io_loop=self.xmlrpc_ioloop)
            self.xmlrpc_server.listen(port=port, address=bind)
            logger.info('fetcher.xmlrpc listening on %s:%s', bind, port)
            self.xmlrpc_ioloop.start()
    
        def on_fetch(self, type, task):
            '''Called before task fetch'''
            pass
    
        def on_result(self, type, task, result):
            '''Called after task fetched'''
            status_code = result.get('status_code', 599)
            if status_code != 599:
                status_code = (int(status_code) / 100 * 100)
            self._cnt['5m'].event((task.get('project'), status_code), +1)
            self._cnt['1h'].event((task.get('project'), status_code), +1)
    
            if type in ('http', 'phantomjs') and result.get('time'):
                content_len = len(result.get('content', ''))
                self._cnt['5m'].event((task.get('project'), 'speed'),
                                      float(content_len) / result.get('time'))
                self._cnt['1h'].event((task.get('project'), 'speed'),
                                      float(content_len) / result.get('time'))
                self._cnt['5m'].event((task.get('project'), 'time'), result.get('time'))
                self._cnt['1h'].event((task.get('project'), 'time'), result.get('time'))
    View Code
  • 相关文章:

    • 2022-12-23
    • 2021-05-09
    • 2021-03-30
    • 2021-07-05
    • 2021-12-12
    • 2021-06-23
    • 2022-12-23
    • 2021-08-29
    猜你喜欢
    • 2022-12-23
    • 2022-01-24
    • 2021-05-20
    • 2022-12-23
    • 2021-11-11
    • 2022-12-23
    相关资源
    相似解决方案