【问题标题】:How to write the dataframe to S3 after filter过滤后如何将数据帧写入S3
【发布时间】:2020-10-17 17:27:51
【问题描述】:

我正在尝试使用以下 Scala 代码在脚本编辑中以 CVS 格式过滤到 S3 后写入数据帧。

当前状态:

  • 运行后不显示任何错误,只是不写入 S3。

  • 日志屏幕打印开始,但是看不到打印结束。

  • 没有指明问题的特定错误消息。

  • 在 temp.count 处停止。

环境条件:我拥有所有 S3 的管理员权限。

import com.amazonaws.services.glue.GlueContext
import <others>

object GlueApp {
  def main(sysArgs: Array[String]) {
    val spark: SparkContext = new SparkContext()
    val glueContext: GlueContext = new GlueContext(spark)
    // @params: [JOB_NAME]
    val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray)
    Job.init(args("JOB_NAME"), glueContext, args.asJava)
    
    val datasource0 = glueContext.getCatalogSource(database = "db", tableName = "table", redshiftTmpDir = "", transformationContext = "datasource0").getDynamicFrame()
    val appymapping1 = datasource0.appyMapping(mapping=........)

    val temp=appymapping1.toDF.filter(some filtering rules)
    print("start")
    if (temp.count() <= 0) {
    temp.write.format("csv").option("sep", ",").save("s3://directory/error.csv")
  }
    print("End")
     

【问题讨论】:

  • (temp.count() &lt;= 0) 你的 if 条件似乎是错误的,

标签: scala apache-spark-sql aws-glue aws-glue-data-catalog aws-glue-spark


【解决方案1】:

您正在使用 if 条件将 Dataframe 写入 S3(If 条件是检查数据帧是否有一行或多行),但您的 If 条件是反转的。仅当数据框具有 0(或更少)行时才适用。所以改变它。

Advance:Spark 始终将文件保存为“part-”名称。所以将 S3 路径更改为 s3://directory/。并添加 .mode("overwrite") .

所以你写的 df 查询应该是

temp.write.format("csv").option("sep", ",").mode("overwrite").save("s3://directory")

【讨论】:

    猜你喜欢
    • 2023-04-03
    • 1970-01-01
    • 1970-01-01
    • 2020-01-02
    • 2021-11-15
    • 2021-04-19
    • 2017-11-01
    • 2020-07-29
    • 2017-02-19
    相关资源
    最近更新 更多