【发布时间】:2019-11-30 05:30:21
【问题描述】:
我的要求是使用 pyspark 从 HDFS 读取数据,仅过滤所需的列,删除 NULL 值,然后将处理后的数据写回 HDFS。完成这些步骤后,我们需要从 HDFS 中删除 RAW Dirty 数据。这是我的每个操作的脚本。
导入库和依赖项
#Spark Version = > version 2.4.0-cdh6.3.1
from pyspark.sql import SparkSession
sparkSession = SparkSession.builder.appName("example-pyspark-read-and-write").getOrCreate()
import pyspark.sql.functions as F
从 HDFS 读取数据
df_load_1 = sparkSession.read.csv('hdfs:///cdrs/file_path/*.csv', sep = ";")
只选择所需的列
col = [ '_c0', '_c1', '_c2', '_c3', '_c5', '_c7', '_c8', '_c9', '_c10', '_C11', '_c12', '_c13', '_c22', '_C32', '_c34', '_c38', '_c40',
'_c43', '_c46', '_c47', '_c50', '_c52', '_c53', '_c54', '_c56', '_c57', '_c59', '_c62', '_c63','_c77', '_c81','_c83']
df1=df_load_1.select(*[col])
检查 NULL 值,我们有任何删除它们
df_agg_1 = df1.agg(*[F.count(F.when(F.isnull(c), c)).alias(c) for c in df1.columns])
df_agg_1.show()
df1 = df1.na.drop()
将预处理后的数据写入HDFS,同一个集群,不同目录
df1.write.csv("hdfs://nm/pyspark_cleaned_data/py_in_gateway.csv")
从 HDFS 中删除原始原始数据
def delete_path(spark , path):
sc = spark.sparkContext
fs = (sc._jvm.org
.apache.hadoop
.fs.FileSystem
.get(sc._jsc.hadoopConfiguration())
)
fs.delete(sc._jvm.org.apache.hadoop.fs.Path(path), True)
通过传递HDFS绝对路径在下面执行
delete_path(spark , '/cdrs//cdrs/file_path/')
我可以在pyspark提示符下成功完成所有操作。
现在我想开发应用程序并使用 spark-submit 提交作业
例如
spark-submit --master yarn --deploy-mode client project.py for local
spark-submit --master yarn --deploy-mode cluster project.py for cluster
此时我被卡住了,我不确定我应该在 spark-submit 中传递什么参数。我不确定简单地复制和粘贴上述所有命令并制作.py 文件是否会有所帮助。我对这项技术非常陌生。
【问题讨论】:
-
你想在哪里运行你的代码?
-
由于我是全新的,我不确定是否应该在 clinet /cluster /local 模式下运行。我们有集群配置[1个namenode,2个数据节点]并且数据在namenode上,我还需要了解哪种模式适合这个要求
标签: apache-spark hadoop pyspark hdfs