【问题标题】:Efficiently batching Spark dataframes to call an API高效批处理 Spark 数据帧以调用 API
【发布时间】:2020-05-30 19:25:30
【问题描述】:

我是 Spark 的新手,我正在尝试使用 Spotipy 调用 Spotify API。我有一个艺术家 ID 列表,可用于获取艺术家信息。 Spotify API 允许一次最多 50 个 id 的批量调用。我从 MySQL 数据库加载艺术家 ID 并将它们存储在数据框中。

我现在的问题是我不知道如何有效地将该数据帧批处理为 50 行或更少的行。

在下面的示例中,我将数据框转换为常规 Python 列表,我可以从中批量调用 50 个 API。

有什么想法可以在不返回 Python 列表的情况下做到这一点吗?

import spotipy
from spotipy.oauth2 import SpotifyClientCredentials
from pyspark.sql import SparkSession
import os

spark = SparkSession\
        .builder\
        .appName("GetArtists")\
        .getOrCreate()

df = spark.read.format('jdbc') \
    .option("url", "jdbc:mysql://"+os.getenv("DB_SERVER")+":"+os.getenv("DB_PORT")+"/spotify_metadata")\
    .option("user", os.getenv("DB_USER"))\
    .option("password", os.getenv("DB_PW"))\
    .option("query", "SELECT artist_id FROM artists")\
    .load()

sp = spotipy.Spotify(client_credentials_manager=SpotifyClientCredentials())

ids = [row['artist_id'] for row in df.collect()]

batch_size = 50
for i in range(0,len(ids), batch_size):
    artists = sp.artists( ids[i:i+batch_size] )

    # process the JSON response

我考虑过使用foreach 并为每个id 调用API,但这会导致不必要的请求。结果也存储回数据库中,这意味着我将许多单行写入数据库。

【问题讨论】:

    标签: apache-spark pyspark apache-spark-sql spotipy


    【解决方案1】:

    如果你想根据行号划分数据框,那么你可以这样做:

    from pyspark.sql import functions as f
    from pyspark.sql import Window
    
    df = df.withColumn('row_num', f.row_number().over(Window.orderBy(f.lit(1))))
    len = df.count()
    
    for i in range(0,len, 50):
        df = df.filter(f.col('row_num')>=i & f.col('row_num')<=i+50)
        #api logic goes here
    

    但是如果您可以直接将 df 传递给 api,那么传递 df 或收集 df 每次只有 50 个值。

    【讨论】:

    • 这个解决方案效果很好。我在过滤方法中改变了两件事:
    • 1) 过滤语句需要在括号中 2) 从第二个过滤语句中删除等号,所以我们得到 50 行。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 2017-01-24
    • 2023-03-04
    • 1970-01-01
    • 2018-04-30
    • 2018-09-14
    • 1970-01-01
    • 2020-07-26
    相关资源
    最近更新 更多