【问题标题】:Handling schema changes in running Spark Streaming application在运行 Spark Streaming 应用程序时处理架构更改
【发布时间】:2017-05-02 19:16:44
【问题描述】:

我希望在 Spark 1.6 上使用 DataFrames API 构建一个 Spark Streaming 应用程序。在我深入兔子洞之前,我希望有人可以帮助我了解 DataFrames 如何处理具有不同架构的数据。

这个想法是消息将通过 Avro 模式流入 Kafka。我们应该能够以向后兼容的方式发展模式,而无需重新启动流式应用程序(应用程序逻辑仍然有效)。

使用模式注册表和嵌入在消息中的模式 ID 反序列化新版本的消息似乎很简单,使用 KafkaUtils 创建直接流和 AvroKafkaDecoder(来自 Confluent)。这让我有一个 DStream。

问题 #1: 在该 DStream 中将有具有不同版本模式的对象。因此,当我将每一个转换为 Row 对象时,我应该传入一个读取器模式,该模式是正确迁移数据的最新模式,并且我需要将最新模式传递给 sqlContext.createDataFrame(rowRdd, schema) 调用。 DStream 中的对象是 GenericData.Record 类型,据我所知,没有简单的方法可以判断哪个是最新版本。我看到了 2 种可能的解决方案,一种是调用模式注册表以在每个微批次上获取最新版本的模式。另一种是修改解码器以附加模式ID。然后我可以遍历 rdd 以找到最高 id 并从本地缓存中获取架构。

我希望有人已经以可重复使用的方式很好地解决了这个问题。

问题/问题 #2: Spark 将为每个分区从 Kafka 拉取不同的执行程序。当一个执行者收到与其他执行者不同的“最新”模式时,我的应用程序会发生什么。由一个执行程序创建的 DataFrame 在同一时间窗口中将具有与另一个执行程序不同的架构。我实际上不知道这是否是一个真正的问题。我无法可视化数据流,以及什么样的操作会出现问题。如果这是一个问题,则意味着需要在执行者之间共享一些数据,这听起来既复杂又低效。

我需要担心这个吗?如果可以,如何解决架构差异?

谢谢, --本

【问题讨论】:

    标签: apache-spark spark-streaming avro spark-avro


    【解决方案1】:

    我相信我已经解决了这个问题。我正在使用 Confluent 的模式注册表和 KafkaAvroDecoder。简化后的代码如下:

    // Get the latest schema here. This schema will be used inside the
    // closure below to ensure that all executors are using the same 
    // version for this time slice.
    val sr : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
    val m = sr.getLatestSchemaMetadata(subject)
    val schemaId = m.getId
    val schemaString = m.getSchema
    
    val outRdd = rdd.mapPartitions(partitions => {
      // Note: we cannot use the schema registry from above because this code
      // will execute on remote machines, requiring the schema registry to be
      // serialized. We could use a pool of these.
      val schemaRegistry : CachedSchemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 1000)
      val decoder: KafkaAvroDecoder = new KafkaAvroDecoder(schemaRegistry)
      val parser = new Schema.Parser()
      val avroSchema = parser.parse(schemaString)
      val avroRecordConverter = AvroSchemaConverter.createConverterToSQL(avroSchema)
    
      partitions.map(input => {
        // Decode the message using the latest version of the schema.
        // This will apply Avro's standard schema evolution rules 
        // (for compatible schemas) to migrate the message to the 
        // latest version of the schema.
        val record = decoder.fromBytes(messageBytes, avroSchema).asInstanceOf[GenericData.Record]
        // Convert record into a DataFrame with columns according to the schema
        avroRecordConverter(record).asInstanceOf[Row]
      })
    })
    
    // Get a Spark StructType representation of the schema to apply 
    // to the DataFrame.
    val sparkSchema = AvroSchemaConverter.toSqlType(
          new Schema.Parser().parse(schemaString)
        ).dataType.asInstanceOf[StructType]
    sqlContext.createDataFrame(outRdd, sparkSchema)
    

    【讨论】:

    • 这里需要模式字符串吗?如果您只使用fromBytes(messageBytes),它将通过其 ID 查找该记录的架构。 github.com/confluentinc/schema-registry/blob/master/… 或者总是使用最新的只是为了应用向后规则有意义?
    • 是的,架构字符串是必需的。问题的症结在于行可能有不同版本的模式。您需要始终使用最新版本的模式作为阅读器模式,以便应用 Avro 的模式演变规则。这样,所有行在反序列化时都将具有一致的架构。
    【解决方案2】:

    我仅使用结构化流式传输来完成此操作。

    case class DeserializedFromKafkaRecord(value: String)
     
    val brokers = "...:9092"
    val schemaRegistryURL = "...:8081"
    var topicRead = "mytopic"
     
     
    val kafkaParams = Map[String, String](
      "kafka.bootstrap.servers" -> brokers,
      "group.id" -> "structured-kafka",
      "failOnDataLoss"-> "false",
      "schema.registry.url" -> schemaRegistryURL
    )
        
    object topicDeserializerWrapper {
      val props = new Properties()
      props.put(AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG, schemaRegistryURL)
      props.put(KafkaAvroDeserializerConfig.SPECIFIC_AVRO_READER_CONFIG, "true")
      val vProps = new kafka.utils.VerifiableProperties(props)
      val deser = new KafkaAvroDecoder(vProps)
      val avro_schema = new RestService(schemaRegistryURL).getLatestVersion(topicRead + "-value")
      val messageSchema = new Schema.Parser().parse(avro_schema.getSchema)
    }
     
    val df = {spark
      .readStream
      .format("kafka")
      .option("subscribe", topicRead)
      .option("kafka.bootstrap.servers", brokers)
      .option("auto.offset.reset", "latest")
      .option("failOnDataLoss", false)
      .option("startingOffsets", "latest")
      .load()
      .map(x => {
         DeserializedFromKafkaRecord(DeserializerWrapper.deser.fromBytes(x.getAs[Array[Byte]]("value"), DeserializerWrapper.messageSchema).asInstanceOf[GenericData.Record].toString)
      })}
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2016-06-12
      • 2021-11-17
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2015-04-12
      • 2020-05-02
      • 1970-01-01
      相关资源
      最近更新 更多