文字步骤:
-
explode专栏href
-
filter 具有已知 URL 的那些行
-
collect 结果并在urls 中查找每个 URL
下面的代码被分成小步骤,以便更容易检查中间数据帧。
假设您已经有一个名为 ss 的 SparkSession 对象,我们可以像这样重新创建您的原始 DataFrame:
df = ss.createDataFrame(
[
("q1", ["url7", "url11", "url12", "url13", "url14"]),
("q2", ["url1", "url3", "url5", "url6"]),
("q3", ["url1", "url2", "url8"]),
],
["query", "href"],
)
urls = ["url1", "url2", "url3", "url4", "url5", "url6", "url7", "url8"]
现在我们应用前面描述的步骤:
import pyspark.sql.functions as sf
# Exploding the column "href".
exp_df = df.select("query", sf.explode(sf.col("href")).alias("href_sing"))
# Checking if the URL in the DataFrame exists in "urls".
# I suggest to convert "urls" into a "set" before this step: "set(urls)". It might
# improve the performance of "isin", but this is just an optional optimization.
known_df = exp_df.select("*", sf.col("href_sing").isin(urls).alias("is_known"))
# Discard unknown URLs.
true_df = true_df = known_df.filter("is_known = True")
# The final results.
res = [
(r["query"], r["href_sing"], urls.index(r["href_sing"]))
for r in true_df.collect()
]
检查一些值:
In [18]: df.show()
+-----+--------------------+
|query| href|
+-----+--------------------+
| q1|[url7, url11, url...|
| q2|[url1, url3, url5...|
| q3| [url1, url2, url8]|
+-----+--------------------+
In [19]: exp_df.show()
+-----+---------+
|query|href_sing|
+-----+---------+
| q1| url7|
| q1| url11|
| q1| url12|
| q1| url13|
| q1| url14|
| q2| url1|
| q2| url3|
| q2| url5|
| q2| url6|
| q3| url1|
| q3| url2|
| q3| url8|
+-----+---------+
In [20]: true_df.show()
+-----+---------+--------+
|query|href_sing|is_known|
+-----+---------+--------+
| q1| url7| true|
| q2| url1| true|
| q2| url3| true|
| q2| url5| true|
| q2| url6| true|
| q3| url1| true|
| q3| url2| true|
| q3| url8| true|
+-----+---------+--------+
In [23]: res
Out[23]:
[('q1', 'url7', 6),
('q2', 'url1', 0),
('q2', 'url3', 2),
('q2', 'url5', 4),
('q2', 'url6', 5),
('q3', 'url1', 0),
('q3', 'url2', 1),
('q3', 'url8', 7)]