【问题标题】:Pyspark filtering items in column of listsPyspark过滤列表列中的项目
【发布时间】:2020-07-08 02:21:39
【问题描述】:

我正在尝试过滤数据框中的数据。数据框 df 有 2 列 - query + href。在一行中:query 是随机字符串,href 是字符串列表。我有另一个名为urls 的列表,带有字符串。

href 列列表中的列表urls 中查找URL + 在href 列表中的URL 位置。我正在尝试df.filter(col("href")).isin(urls),但 pyspark 抱怨该列表。 + 我无法对数据量进行 .collect() bcs。

提前致谢!

基本上它应该看起来像这样,但我不太确定如何在 pyspark 中做到这一点:

for url in urls:
    if url in "href item list":
        print(query + url + "href item list".index(url)) # doesn't matter if index or position
    else:
        pass

例子:

urls = [url1, url2, url3, url4, url5, url6, url7, url8]

query | href
------------
q1    | [url7, url11, url12, url13, url14]
q2    | [url1, url3, url5, url6]
q3    | [url1, url2, url8]

Output should look like 

q2 - url1 - 0
q3 - url1 - 0
q3 - url2 - 1
q2 - url3 - 1
q2 - url5 - 2
q2 - url6 - 3
q1 - url7 - 0
q3 - url8 - 2

【问题讨论】:

  • 我认为你不需要在这里收集。你在用 spark2.4 吗?

标签: python dataframe filter pyspark


【解决方案1】:

我建议 1) 使用 explode 制作 urls 的单列 DataFrame 和 2) 使用 posexplode 制作查询、href 和 href 的索引位置的 3 列 DataFrame,然后 3)内联两个

  1. 创建urls的DataFrame
from pyspark.sql.functions import explode, posexplode

urls = [
    (['url1', 'url2', 'url3', 'url4', 'url5', 'url6', 'url7', 'url8'],),
]
refs = (
    spark.createDataFrame(urls, ['ref']).
        select(
            explode('ref')
        )
)
refs.show(truncate=False)
# +----+
# |col |
# +----+
# |url1|
# |url2|
# |url3|
# |url4|
# |url5|
# |url6|
# |url7|
# |url8|
# +----+
  1. 创建您提供的示例数据
data = [
    ("q1", ["url7", "url11", "url12", "url13", "url14"]),
    ("q2", ["url1", "url3", "url5", "url6"]),
    ("q3", ["url1", "url2", "url8"]),
]
df = spark.createDataFrame(data, ["query", "href"])
df.show(truncate=False)
# +-----+----------------------------------+
# |query|href                              |
# +-----+----------------------------------+
# |q1   |[url7, url11, url12, url13, url14]|
# |q2   |[url1, url3, url5, url6]          |
# |q3   |[url1, url2, url8]                |
# +-----+----------------------------------+
  1. 解决方案
(
    df.
        select(
            'query',
            posexplode('href')
        ).
        join(
            refs,
            'col',
            'inner'
        ).
        orderBy('col', 'query').
        show(truncate=False)
)
# +----+-----+---+                                                                
# |col |query|pos|
# +----+-----+---+
# |url1|q2   |0  |
# |url1|q3   |0  |
# |url2|q3   |1  |
# |url3|q2   |1  |
# |url5|q2   |2  |
# |url6|q2   |3  |
# |url7|q1   |0  |
# |url8|q3   |2  |
# +----+-----+---+

【讨论】:

    【解决方案2】:

    文字步骤:

    1. explode专栏href
    2. filter 具有已知 URL 的那些行
    3. collect 结果并在urls 中查找每个 URL

    下面的代码被分成小步骤,以便更容易检查中间数据帧。

    假设您已经有一个名为 ssSparkSession 对象,我们可以像这样重新创建您的原始 DataFrame:

    df = ss.createDataFrame(
        [
            ("q1", ["url7", "url11", "url12", "url13", "url14"]),
            ("q2", ["url1", "url3", "url5", "url6"]),
            ("q3", ["url1", "url2", "url8"]),
        ],
        ["query", "href"],
    )
    urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]
    

    现在我们应用前面描述的步骤:

    import pyspark.sql.functions as sf
    
    # Exploding the column "href".
    exp_df = df.select("query", sf.explode(sf.col("href")).alias("href_sing"))
    # Checking if the URL in the DataFrame exists in "urls".
    # I suggest to convert "urls" into a "set" before this step: "set(urls)". It might 
    # improve the performance of "isin", but this is just an optional optimization.
    known_df = exp_df.select("*", sf.col("href_sing").isin(urls).alias("is_known"))
    # Discard unknown URLs.
    true_df = true_df = known_df.filter("is_known = True")
    # The final results.
    res = [
        (r["query"], r["href_sing"], urls.index(r["href_sing"]))
        for r in true_df.collect()
    ]
    

    检查一些值:

    In [18]: df.show()      
    +-----+--------------------+
    |query|                href|
    +-----+--------------------+
    |   q1|[url7, url11, url...|
    |   q2|[url1, url3, url5...|
    |   q3|  [url1, url2, url8]|
    +-----+--------------------+
    
    In [19]: exp_df.show()                                                                    
    +-----+---------+
    |query|href_sing|
    +-----+---------+
    |   q1|     url7|
    |   q1|    url11|
    |   q1|    url12|
    |   q1|    url13|
    |   q1|    url14|
    |   q2|     url1|
    |   q2|     url3|
    |   q2|     url5|
    |   q2|     url6|
    |   q3|     url1|
    |   q3|     url2|
    |   q3|     url8|
    +-----+---------+
    
    In [20]: true_df.show()                                                                   
    +-----+---------+--------+
    |query|href_sing|is_known|
    +-----+---------+--------+
    |   q1|     url7|    true|
    |   q2|     url1|    true|
    |   q2|     url3|    true|
    |   q2|     url5|    true|
    |   q2|     url6|    true|
    |   q3|     url1|    true|
    |   q3|     url2|    true|
    |   q3|     url8|    true|
    +-----+---------+--------+
    
    In [23]: res                                                                              
    Out[23]: 
    [('q1', 'url7', 6),
     ('q2', 'url1', 0),
     ('q2', 'url3', 2),
     ('q2', 'url5', 4),
     ('q2', 'url6', 5),
     ('q3', 'url1', 0),
     ('q3', 'url2', 1),
     ('q3', 'url8', 7)]
    

    【讨论】:

      猜你喜欢
      • 2018-03-24
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2013-01-29
      • 1970-01-01
      • 1970-01-01
      • 2020-08-19
      • 1970-01-01
      相关资源
      最近更新 更多