【问题标题】:How to continuously monitor a directory by using Spark Structured Streaming如何使用 Spark Structured Streaming 持续监控目录
【发布时间】:2018-10-28 19:14:33
【问题描述】:

我希望 spark 持续监视一个目录并在文件出现在该目录中后立即使用spark.readStream 读取 CSV 文件。

请不要包含 Spark Streaming 的解决方案。我正在寻找一种使用 Spark 结构化流式传输的方法。

【问题讨论】:

    标签: scala apache-spark spark-structured-streaming


    【解决方案1】:

    这是此用例的完整解决方案:

    如果您在独立模式下运行。您可以按如下方式增加驱动程序内存:

    bin/spark-shell --driver-memory 4G
    

    无需像 Stand Alone 模式那样设置执行器内存,执行器在驱动程序中运行。

    作为完成@T.Gaweda的解决方案,找到以下解决方案:

    val userSchema = new StructType().add("name", "string").add("age", "integer")
    val csvDF = spark
      .readStream
      .option("sep", ";")
      .schema(userSchema)      // Specify schema of the csv files
      .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
    
    csvDf.writeStream.format("console").option("truncate","false").start()
    

    现在 spark 将持续监控指定的目录,一旦您在目录中添加任何 csv 文件,您的 DataFrame 操作“csvDF”就会在该文件上执行。

    注意:如果你想让 spark 推断模式,你必须首先设置以下配置:

    spark.sqlContext.setConf("spark.sql.streaming.schemaInferenc‌​e","true")
    

    其中 spark 是您的 spark 会话。

    【讨论】:

    • 这可行,但具体到实现,spark 是如何找出新文件的?它是否递归地列出所有文件并与以前的记忆状态进行比较?如果是这样,那么处理庞大的文件层次结构将非常低效吗?
    • 在读取一些源代码后,spark 确实读取了整个文件夹(如果没有提供元数据),然后与之前的状态进行比较,参考源类FileStreamSource.scala
    【解决方案2】:

    正如官方documentation 所写,您应该使用“文件”源:

    文件源 - 将写入目录中的文件作为数据流读取。支持的文件格式为文本、csv、json、parquet。请参阅 DataStreamReader 接口的文档以获取最新列表以及每种文件格式支持的选项。请注意,文件必须以原子方式放置在给定目录中,在大多数文件系统中,这可以通过文件移动操作来实现。

    取自文档的代码示例:

    // Read all the csv files written atomically in a directory
    val userSchema = new StructType().add("name", "string").add("age", "integer")
    val csvDF = spark
      .readStream
      .option("sep", ";")
      .schema(userSchema)      // Specify schema of the csv files
      .csv("/path/to/directory")    // Equivalent to format("csv").load("/path/to/directory")
    

    如果不指定触发器,Spark会尽快读取新文件

    【讨论】:

    • 我有问题。这也适用于读取 avro 文件吗?这是否也支持谷歌云存储,即我想类似地处理我的 gcs 存储桶中出现的新文件?这种方法是否容错,即如果管道发生故障,如何恢复,如何知道处理了哪些文件以及哪些是新文件?
    • 在 text/json 的情况下,如果我的流媒体管道失败,新的流媒体管道如何知道从哪里开始消费文件?
    • 如何告诉spark是否正在写入文件并等待文件写入操作完成。
    猜你喜欢
    • 2021-04-13
    • 2020-03-19
    • 2011-11-25
    • 2019-01-07
    • 1970-01-01
    • 2023-03-31
    • 2020-09-12
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多