【问题标题】:Change the filename of the spark streaming output更改火花流输出的文件名
【发布时间】:2017-09-25 17:39:55
【问题描述】:

下面的简单程序每 5 分钟从 kafka 流读取并写入 CSV 文件,以及它的火花流。它使用命名约定生成文件 part-00000-f90bbc78-b847-41d4-9938-bdae89adb8eb.csv ,有没有办法可以更改名称以包含“DATETIMESTAMP”+ GUID

请注意。谢谢。

我能够找到 DatastreamReader 的选项列表,但没有找到 DatastreamWriter 的选项列表

https://spark.apache.org/docs/latest/api/java/org/apache/spark/sql/streaming/DataStreamReader.html#csv-java.lang.String-

public static void main(String[] args) throws Exception {

    if (args.length == 0)
        throw new Exception("Usage program configFilename");
    String configFilename = args[0];

    addShutdownHook();

    ConfigLoader.loadConfig(configFilename);
    sparkSession = SparkSession
            .builder()
            .appName(TestKafka.class.getName())
            .master(ConfigLoader.getValue("master")).getOrCreate();
    SparkContext context = sparkSession.sparkContext();
    context.setLogLevel(ConfigLoader.getValue("logLevel"));

    SQLContext sqlCtx = sparkSession.sqlContext();
    System.out.println("Spark context established");

    DataStreamReader kafkaDataStreamReader = sparkSession.readStream()
            .format("kafka")
            .option("kafka.bootstrap.servers", ConfigLoader.getValue("brokers"))
            .option("group.id", ConfigLoader.getValue("groupId"))
            .option("subscribe", ConfigLoader.getValue("topics"))
            .option("failOnDataLoss", false);
    Dataset<Row> rawDataSet = kafkaDataStreamReader.load();
    rawDataSet.printSchema();
    rawDataSet.createOrReplaceTempView("rawEventView1");

    rawDataSet = rawDataSet.withColumn("rawEventValue", rawDataSet.col("value").cast("string"));
    rawDataSet.printSchema();
    rawDataSet.createOrReplaceTempView("eventView1");
    sqlCtx.sql("select * from eventView1")
            .writeStream()
            .format("csv")
            .option("header", "true")
            .option("delimiter", "~")
            .option("checkpointLocation", ConfigLoader.getValue("checkpointPath"))
            .option("path", ConfigLoader.getValue("recordsPath"))
            .outputMode(OutputMode.Append())
            .trigger(ProcessingTime.create(Integer.parseInt(ConfigLoader.getValue("kafkaProcessingTime"))
                    , TimeUnit.SECONDS))
            .start()
            .awaitTermination();
}

【问题讨论】:

    标签: apache-spark spark-streaming


    【解决方案1】:

    在结构化流中没有更改部分文件格式的规定,结构化流使用 ManifestFileCommitProtocol 跟踪作业写入的有效文件列表。目标部分文件的名称是 split、uuid 和 extension 的组合,为了避免冲突。

    来源:https://github.com/apache/spark/blob/20adf9aa1f42353432d356117e655e799ea1290b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/ManifestFileCommitProtocol.scala#L87

    【讨论】:

      【解决方案2】:

      1) saveAsTextFile 方法没有直接支持控制文件输出名称。您可以尝试使用 saveAsHadoopDataset 来控制输出文件的基本名称。

      例如:您可以获得 yourCustomName-00000,而不是 part-00000。

      请记住,您无法使用此方法控制后缀 00000。这是 spark 在写入时自动为每个分区分配的东西,以便每个分区写入一个唯一的文件。

      为了控制上面提到的 cmets,您必须编写自己的自定义 OutputFormat。

      SparkConf conf=new SparkConf();
      conf.setMaster("local").setAppName("yello");
      JavaSparkContext sc=new JavaSparkContext(conf);
      
      JobConf jobConf=new JobConf();
      jobConf.set("mapreduce.output.basename", "customName");
      jobConf.set("mapred.output.dir", "outputPath");
      
      JavaRDD<String> input = sc.textFile("inputDir");
      
      input.saveAsHadoopDataset(jobConf);
      

      2) 一种解决方法是将输出原样写入您的输出位置并使用 Hadoop FileUtil.copyMerge function 形成合并文件。

      【讨论】:

      • 我正在寻找结构化流媒体的解决方案......就像@VishAmdi 提到的那样......我们现在有一个单独的工作来处理这个......
      • 在结构化流中,如果您想更改 o/p 文件名,您可以定义自己的分区并将该分区添加为最终数据帧中的单独列,然后尝试写入您的存储(云或本地)。示例代码可以是:
      • dataFrame .select(functions.col("*"), yourUDF 用于添加分区列,例如基于时间戳).as(partitionIndexColName as "_part_index_or_whatevervr")) .repartition(functions.col(partitionIndexColName )) .writeStream.partitionBy(partitionIndexColName) .format("parquet") .option("checkpointLocation", "checkpointing path") .option("path","你的存储路径") .start()
      猜你喜欢
      • 1970-01-01
      • 2017-04-02
      • 2019-12-14
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2020-03-26
      • 2018-09-19
      • 2019-12-24
      相关资源
      最近更新 更多