【问题标题】:Run HTTP requests with PySpark in parallel and asynchronously使用 PySpark 并行和异步运行 HTTP 请求
【发布时间】:2019-04-08 21:20:33
【问题描述】:

我有一个包含数百万个 URL 的文本文件,我必须为每个 URL 运行一个 POST 请求。 我尝试在我的机器上执行此操作,但它需要很长时间,所以我想改用我的 Spark 集群。

我编写了这个 PySpark 代码:

from pyspark.sql.types import StringType
import requests

url = ["http://myurltoping.com"]
list_urls = url * 1000 # The final code will just import my text file
list_urls_df = spark.createDataFrame(list_urls, StringType())

print 'number of partitions: {}'.format(list_urls_df.rdd.getNumPartitions())

def execute_requests(list_of_url):
    final_iterator = []
    for url in list_of_url:
        r = requests.post(url.value)
        final_iterator.append((r.status_code, r.text))
    return iter(final_iterator)

processed_urls_df = list_urls_df.rdd.mapPartitions(execute_requests)

但是仍然需要很多时间,例如如何使函数 execute_requests 更高效地在每个分区中异步启动请求?

谢谢!

【问题讨论】:

  • 你关心请求是否成功吗?它是你只需要运行一次还是经常运行? Spark 可能不是最好的工具。也许一个简单的 java 程序就可以解决问题。
  • 我只是想看看响应状态是否为 400 否则会重试请求。我只需要运行一次。

标签: http parallel-processing pyspark python-requests


【解决方案1】:

使用 python 包 grequests(可与 pip install grequests 一起安装)可能是一个简单的解决方案,无需使用 spark。

文档(可以在这里找到https://github.com/kennethreitz/grequests)给出了一个简单的例子:

import grequests

urls = [
    'http://www.heroku.com',
    'http://python-tablib.org',
    'http://httpbin.org',
    'http://python-requests.org',
    'http://fakedomain/',
    'http://kennethreitz.com'
]

创建一组未发送的请求:

>>> rs = (grequests.get(u) for u in urls)

同时发送它们:

>>> grequests.map(rs)
[<Response [200]>, <Response [200]>, <Response [200]>, <Response [200]>, None, <Response [200]>]

我发现,在 spark Dataframe 上使用 gevent 和 foreach 会导致一些奇怪的错误并且不起作用。好像spark也依赖gevent,被grequests使用...

【讨论】:

    猜你喜欢
    • 2015-10-24
    • 1970-01-01
    • 2018-12-05
    • 1970-01-01
    • 1970-01-01
    • 2017-12-26
    • 2011-08-06
    • 2010-11-06
    • 1970-01-01
    相关资源
    最近更新 更多