【问题标题】:Loading data from Spark Structured Streaming into ArrayList将数据从 Spark 结构化流加载到 ArrayList
【发布时间】:2020-10-16 17:52:29
【问题描述】:

我需要将数据从 Kafka 发送到 Kinesis Firehose。我正在使用 Spark 结构化流处理 Kafka 数据。我不确定如何将流式查询的数据集处理为 ArrayList 变量 - 例如,recordList - 例如100 条记录(可以是任何其他值),然后调用 Firehose API 的 putRecordBatch(recordList) 将记录放入 Firehose。

【问题讨论】:

    标签: apache-spark apache-kafka


    【解决方案1】:

    我认为您想查看Foreach and ForeachBatch,具体取决于您的 Spark 版本。 ForeachBatch 出现在 V2.4.0 中,foreach 可用 ForeachWriter。 Databricks 有一些很好的examples 使用 foreach 创建自定义编写器。

    我从未使用过 Kinesis,但这里有一个示例,说明您的自定义接收器可能是什么样子。

    case class MyConfigInfo(info1: String, info2: String)
    
    class  KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
      val kinesisProducer = _
    
      def open(partitionId: Long,version: Long): Boolean = {
        kinesisProducer = //set up the kinesis producer using MyConfigInfo
          true
      }
    
      def process(value: (String, String)): Unit = {
        //ask kinesisProducer to send data
      }
    
      def close(errorOrNull: Throwable): Unit = {
        //close the kinesis producer
      }
    }
    

    如果您使用的是 AWS kinesisfirehose API,您可能会这样做

    case class MyConfigInfo(info1: String, info2: String)
    
    class  KinesisSink(configInfo: MyConfigInfo) extends ForeachWriter[(String, String)] {
      val firehoseClient = _
      val req = putRecordBatchRequest = new PutRecordBatchRequest()
      val records = 0
      val recordLimit = //maybe you need to set this? 
    
      def open(partitionId: Long,version: Long): Boolean = {
        firehoseClient = //set up the firehose client using MyConfigInfo
          true
      }
    
      def process(value: (String, String)): Unit = {
        //ask fireHose client to send data or batch the request
        val record: Record = //create Record out of value
        req.setRecords(record)
        records = records + 1
        if(records >= recordLimit) {
          firehoseClient.putRecordBatch(req)
          records = 0
        }
      }
    
      def close(errorOrNull: Throwable): Unit = {
        //close the firehose client
        //or instead you could put the batch request to the firehose client here but i'm not sure if that's good practice
      }
    }
    

    那么你就这样使用它

    val writer = new KinesisSink(configuration)
    val query =
      streamingSelectDF
        .writeStream
        .foreach(writer)
        .outputMode("update")
        .trigger(ProcessingTime("25 seconds"))
        .start()
    

    【讨论】:

    • 谢谢亚历克斯。我能够实现它。但我一次放一条记录,因为这是在线程级别。我使用 putRecord 而不是 putRecordBatch
    • 我用另一个示例编辑了代码,这可能会有所帮助。也许你可以按照我展示的方式使用批处理请求。
    • 再次感谢。我看到的唯一问题是处理超过记录限制的记录。例如,假设 recordLimit 为 50,而您有 55 条记录。您的代码的哪一部分将使用 firehoseClient.putRecordBatch(req) 将最后 5 条记录推送到 Firehose?有没有办法知道结局已经到了?
    • 您是否可以遍历每条记录并将其添加到一个批次中,直到 recordLimit,发送该批次,然后继续创建新批次,直到您浏览完所有记录?
    • Alex,如果 Kafka 是 SSL 加密的,需要进行哪些更改?谢谢
    猜你喜欢
    • 2018-10-06
    • 2019-08-24
    • 1970-01-01
    • 2017-05-04
    • 1970-01-01
    • 1970-01-01
    • 2022-01-01
    • 1970-01-01
    • 2018-03-14
    相关资源
    最近更新 更多