【问题标题】:[Structured Streaming]: Structured Streaming into Redshift sink[Structured Streaming]:结构化流到 Redshift sink
【发布时间】:2018-01-19 04:37:33
【问题描述】:

是否可以将Kafka Streaming源支持的Dataframe写入AWS Redshift,我们过去使用spark-redshift写入Redshift,但我认为它不适用于DataFrame##writeStream。考虑到 Redshift 的工作方式,使用带有 ForeachWriter 的 JDBC 连接器编写也可能不是一个好主意。

我从Yelp blog 遇到的一种可能方法是将文件写入 S3,然后使用具有 S3 对象路径的清单文件调用Redshift COPY,在结构化流的情况下,如何控制文件我写到哪个 S3?并且在将 5 个文件写入 S3 后,还有一个单独的触发器来创建清单文件。

也赞赏任何其他可能的解决方案。提前致谢。

【问题讨论】:

    标签: apache-spark-sql amazon-redshift spark-structured-streaming


    【解决方案1】:

    有一种方法可以在结构化流中使用 spark-redshift,但您必须在自己的 fork 中实现一些额外的类。首先你需要一个 RedshiftSink 应该实现org.apache.spark.sql.execution.streaming.Sink 接口:

    private[redshift] class RedshiftSink(
        sqlContext: SQLContext,
        parameters: MergedParameters,
        redshiftWriter: RedshiftWriter) extends Sink {
    
      private val log = LoggerFactory.getLogger(getClass)
    
      @volatile private var latestBatchId = -1L
    
      override def toString(): String = "RedshiftSink"
    
      override def addBatch(batchId: Long, data: DataFrame): Unit = {
        if (batchId <= latestBatchId) {
          log.info(s"Skipping already committed batch $batchId")
        } else {
          val mode = if (parameters.overwrite) SaveMode.Overwrite else SaveMode.Append
          redshiftWriter.saveToRedshift(sqlContext, data, mode, parameters)
          latestBatchId = batchId
        }
      }
    }
    

    那么com.databricks.spark.redshift.DefaultSource应该通过org.apache.spark.sql.sources.StreamSinkProvider的实现来扩展:

      /**
       * Creates a Sink instance
       */
      override def createSink(
        sqlContext: SQLContext,
        parameters: Map[String, String],
        partitionColumns: Seq[String],
        outputMode: OutputMode): Sink = {
          new RedshiftSink(sqlContext, Parameters.mergeParameters(parameters), new RedshiftWriter(jdbcWrapper, s3ClientFactory))
      }
    

    现在您应该可以在结构化流中使用它了:

    dataset.writeStream()
            .trigger(Trigger.ProcessingTime(10, TimeUnit.SECONDS))
            .format("com.databricks.spark.redshift")
            .outputMode(OutputMode.Append())
            .queryName("redshift-stream")
            .start()
    

    更新

    要解决向 StreamExecution 报告指标的问题,必须将 RedshiftWriter.unloadData() 更改为使用 data.queryExecution.toRdd.mapPartitions 而不是 data.rdd.mapPartitions,因为 data.rdd 创建了一个对 StreamExecution 不可见的新计划(它使用现有计划来收集指标)。它还需要将转换函数更改为:

    val conversionFunctions: Array[(InternalRow, Int) => Any] = data.schema.fields.map { field =>
      field.dataType match {
        case DateType =>
          val dateFormat = Conversions.createRedshiftDateFormat()
          (row: InternalRow, ordinal: Int) => {
            if (row.isNullAt(ordinal)) null else dateFormat.format(
              DateTimeUtils.toJavaDate(row.getInt(ordinal)))
          }
        case TimestampType =>
          val timestampFormat = Conversions.createRedshiftTimestampFormat()
          (row: InternalRow, ordinal: Int) => {
            if (row.isNullAt(ordinal)) null else timestampFormat.format(
              DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)))
          }
        case StringType =>
          (row: InternalRow, ordinal: Int) => {
            if (row.isNullAt(ordinal)) null else row.getString(ordinal)
          }
        case dt: DataType =>
          (row: InternalRow, ordinal: Int) => {
            if (row.isNullAt(ordinal)) null else row.get(ordinal, dt)
          }
      }
    }
    

    【讨论】:

    • 成功了!!几乎!!他们在 RedshiftWriter 中使用 Dataset.rdd() 函数,结果是 org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start()
    • 是的,我忘了添加 writeStream.start()。查看更新的答案
    • 抱歉不清楚,在 RedshiftWriter 中完成的 Dataframe.rdd 检查 Dataframe 是否在 Spark SQL api 的 UnsupportedOperationChecker 类中流式传输并引发该异常。所以在这种情况下我们不能直接使用 RedshiftWriter,它有很多私有方法,所以也不能扩展它。所以可能不得不去并行实现它
    • RedshiftWriter 使用 data.rdd 创建一个新计划,以便 StreamExecution 不知道它(导致缺少指标、水印更新)。但是,无论何时启动流,它都不应该引发异常。您使用哪个火花版本?以及如何构建您的流?
    【解决方案2】:

    Spark 能够非常有效地将普通数据帧加载到 Redshift,但我还没有在 Spark 中使用过流。

    如果您可以连续将流输出写入标准 df,则可以按指定的时间间隔将该 df 加载到 Redshift 并将其清空。

    另一种选择是将流发送到 Kinesis 并使用 Kinesis Firehose 将其加载到 Redshift。不过,向堆栈中添加另一个流层似乎有些过分。

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2020-10-22
      • 2018-01-04
      • 2020-04-28
      • 1970-01-01
      • 2020-07-25
      • 2018-08-08
      • 2020-02-27
      相关资源
      最近更新 更多