【问题标题】:Merge two avro schemas programmatically以编程方式合并两个 avro 模式
【发布时间】:2016-04-10 11:56:58
【问题描述】:

我有两个相似的架构,其中只有一个嵌套字段发生变化(在架构 1 中称为 onefield,在架构 2 中称为 anotherfield)。

架构1

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "onefield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

schema2

{
    "type": "record",
    "name": "event",
    "namespace": "foo",
    "fields": [
        {
            "name": "metadata",
            "type": {
                "type": "record",
                "name": "event",
                "namespace": "foo.metadata",
                "fields": [
                    {
                        "name": "anotherfield",
                        "type": [
                            "null",
                            "string"
                        ],
                        "default": null
                    }
                ]
            },
            "default": null
        }
    ]
}

我能够使用 avro 1.8.0 以编程方式合并两个模式:

Schema s1 = new Schema.Parser().parse(schema1);
Schema s2 = new Schema.Parser().parse(schema2);
Schema[] schemas = {s1, s2};

Schema mergedSchema = null;
for (Schema schema: schemas) {
    mergedSchema = AvroStorageUtils.mergeSchema(mergedSchema, schema);
}

并使用它将输入 json 转换为 avro 或 json 表示:

JsonAvroConverter converter = new JsonAvroConverter();
try {
    byte[] example = new String("{}").getBytes("UTF-8");
    byte[] avro = converter.convertToAvro(example, mergedSchema);
    byte[] json = converter.convertToJson(avro, mergedSchema);
    System.out.println(new String(json));
} catch (AvroConversionException e) {
    e.printStackTrace();
}

该代码显示了预期的输出:{"metadata":{"onefield":null,"anotherfield":null}}。问题是我无法看到合并的架构。如果我做一个简单的System.out.println(mergedSchema) 我会得到以下异常:

Exception in thread "main" org.apache.avro.SchemaParseException: Can't redefine: merged schema (generated by AvroStorage).merged
    at org.apache.avro.Schema$Names.put(Schema.java:1127)
    at org.apache.avro.Schema$NamedSchema.writeNameRef(Schema.java:561)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:689)
    at org.apache.avro.Schema$RecordSchema.fieldsToJson(Schema.java:715)
    at org.apache.avro.Schema$RecordSchema.toJson(Schema.java:700)
    at org.apache.avro.Schema.toString(Schema.java:323)
    at org.apache.avro.Schema.toString(Schema.java:313)
    at java.lang.String.valueOf(String.java:2982)
    at java.lang.StringBuilder.append(StringBuilder.java:131)

我称之为 avro 不确定性原理 :)。看起来 avro 能够使用合并的模式,但是当它尝试将模式序列化为 JSON 时它失败了。合并适用于更简单的模式,所以对我来说这听起来像是 avro 1.8.0 中的一个错误。

您知道会发生什么或如何解决吗?欢迎任何解决方法(例如:替代 Schema 序列化程序)。

【问题讨论】:

标签: java avro


【解决方案1】:

我在 pig util 类中发现了同样的问题...实际上这里有 2 个错误

  • AVRO 允许使用无效架构通过 GenericDatumWriter 序列化数据
  • piggybank util 类正在生成无效架构,因为它对所有合并字段使用相同的名称/命名空间(例如保留原始名称)

这适用于更复杂的场景https://github.com/kite-sdk/kite/blob/master/kite-data/kite-data-core/src/main/java/org/kitesdk/data/spi/SchemaUtil.java#L511

    Schema mergedSchema = SchemaUtil.merge(s1, s2);

从您的示例中,我得到以下输出

{
  "type": "record",
  "name": "event",
  "namespace": "foo",
  "fields": [
    {
      "name": "metadata",
      "type": {
        "type": "record",
        "name": "event",
        "namespace": "foo.metadata",
        "fields": [
          {
            "name": "onefield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          },
          {
            "name": "anotherfield",
            "type": [
              "null",
              "string"
            ],
            "default": null
          }
        ]
      },
      "default": null
    }
  ]
}

希望这对其他人有所帮助。

【讨论】:

  • 谢谢@lake。我无法尝试,但它看起来确实不错。
【解决方案2】:

avro 文件尚不支持合并架构工具。 但是,假设您在一个目录中有多个 avro 文件的 avro 文件,这些文件具有不同的模式,例如:/demo,因此您可以使用 spark 读取它。并提供一个主模式文件(即 .avsc 文件),因此 spark 将在内部读取文件中的所有记录,如果任何一个文件缺少列,则它将显示空值。

object AvroSchemaEvolution {
def main(args: Array[String]): Unit = {
val schema = new Schema.Parser().parse(new File("C:\\Users\\murtazaz\\Documents\\Avro_Schema_Evolution\\schema\\emp_inserted.avsc"))
val spark = SparkSession.builder().master("local").getOrCreate()
  val df = spark.read
.format("com.databricks.spark.avro").option("avroSchema", schema.toString)
.load("C:\\Users\\murtazaz\\Documents\\Avro_Schema_Evolution\\demo").show()
 }
}

【讨论】:

    猜你喜欢
    • 2010-11-14
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-02-05
    • 2011-01-17
    • 1970-01-01
    • 2018-04-17
    • 1970-01-01
    相关资源
    最近更新 更多