【问题标题】:delete all subdomain and Get Root Domain only pyspark删除所有子域并仅获取根域 pyspark
【发布时间】:2020-10-24 22:50:24
【问题描述】:

我有一个大列表是一个json文件的例子:

{"timestamp":"1600840155","name":"0.0.0.1","value":"subdomain.test.net","type":"hd"}
{"timestamp":"1600840155","name":"0.0.0.2","value":"test.net","type":"hd"}
{"timestamp":"1600846210","name":"0.0.0.3","value":"node-fwx.pool-1-0.dynamic.exmple4.net","type":"hd"}
{"timestamp":"1600846210","name":"0.0.0.4","value":"exmple4.net","type":"hd"}
{"timestamp":"1600848078","name":"0.0.0.5","value":"node-fwy.pool-1-0.dynamic.exmple5.net","type":"hd"}
{"timestamp":"1600848078","name":"0.0.0.6","value":"exmple5.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.7","value":"node-fwz.pool-1-0.dynamic.exmple6.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.8","value":"exmple6.net","type":"hd"}
{"timestamp":"1600879127","name":"0.0.0.9","value":"node-fx0.pool-1-0.dynamic.exmple7.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.10","value":"exmple7.net","type":"hd"}
{"timestamp":"1600874834","name":"0.0.0.11","value":"node-fx1.pool-1-0.dynamic.exmple8.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.12","value":"exmple8.net","type":"hd"}
{"timestamp":"1600825122","name":"0.0.0.13","value":"node-ftb.pool-1-0.dynamic.exmple9.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.14","value":"exmple9.net","type":"hd"}
{"timestamp":"1600849239","name":"0.0.0.15","value":"node-fx2.pool-1-0.dynamic.exmple10.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.16","value":"exmple10.net","type":"hd"}
{"timestamp":"1600820784","name":"0.0.0.17","value":"node-fx3.pool-1-0.dynamic.other11.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.18","value":"exmple11.net","type":"hd"}
{"timestamp":"1600840955","name":"0.0.0.19","value":"node-fx4.pool-1-0.dynamic.other12.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.20","value":"exmple12.net","type":"hd"}
{"timestamp":"1600860091","name":"0.0.0.21","value":"another -one.pool-1-0.dynamic.other13.net","type":"hd"}
{"timestamp":"1600838189","name":"0.0.0.22","value":"exmple13.net","type":"hd"}

我想只获取根目录并使用 pyspark 删除另一个 所以想选择

df.select("name","value","type").distinct() \
.write \
.save("mycleanlist",format="json")

我想要这个结果

"name":"0.0.0.22","value":"exmple13.net","type":"hd"}
"name":"0.0.0.2","value":"test.net","type":"hd"}
"name":"0.0.0.4","value":"exmple4.net","type":"hd"}
"name":"0.0.0.6","value":"exmple5.net","type":"hd"}
"name":"0.0.0.8","value":"exmple6.net","type":"hd"}
"name":"0.0.0.10","value":"exmple7.net","type":"hd"}
"name":"0.0.0.12","value":"exmple8.net","type":"hd"}
"name":"0.0.0.14","value":"exmple9.net","type":"hd"}
"name":"0.0.0.16","value":"exmple10.net","type":"hd"}
"name":"0.0.0.18","value":"exmple11.net","type":"hd"}
"name":"0.0.0.20","value":"exmple12.net","type":"hd"}
"name":"0.0.0.22","value":"exmple13.net","type":"hd"}

【问题讨论】:

    标签: python apache-spark pyspark apache-spark-sql spark-streaming


    【解决方案1】:

    您可以使用UDF 包装提取根域的方法:

    from pyspark.sql.functions import udf
    from pyspark.sql.types import StringType
    
    def extract_domain(url):
      if url:
        parts = url.split('.')
        url = '.'.join(parts[-2:])
      return url
    extract_domain_udf = udf(extract_domain, StringType())
    
    df.select("name", extract_domain_udf("value"), "type").distinct() \
    .write \
    .save("mycleanlist",format="json")
    

    【讨论】:

    • spark-3.0.1-bin-hadoop2.7/python/lib/pyspark.zip/pyspark/worker.py",第 605 行,在主进程()中
    • 你有完整的异常跟踪吗? “值”列总是被填充?
    • du -sh mycleanlist/ 3.3G mycleanlist/ root@vmi318702:/home/Downloads# du -sh mycleanlist/ 4.2G mycleanlist/ root@vmi318702:/home/Downloads# du -sh mycleanlist/ 4.0 K mycleanlist/ 并出现错误
    • "'NoneType' 对象没有属性 'split'"。在拆分之前需要验证 url 不是 None
    • import os import csv import pyspark from pyspark.sql.functions import col, countDistinct from pyspark.sql import SparkSession from pyspark.sql.types import StructType,StructField, StringType, IntegerType from pyspark.sql import functions as F from pyspark.sql.functions import udf df = spark.read.json("list.json") def extract_domain(url): parts = url.split('.') return '.'.join(parts[- 2:]) extract_domain_udf = udf(extract_domain, StringType()) df.select("name", extract_domain_udf("value")).write \ .save("mycleanlist",format="csv")
    猜你喜欢
    • 2023-03-21
    • 1970-01-01
    • 1970-01-01
    • 2021-08-18
    • 1970-01-01
    • 2012-11-23
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多