【问题标题】:PySpark: Deserializing an Avro serialized message contained in an eventhub capture avro filePySpark:反序列化包含在 eventthub 捕获 avro 文件中的 Avro 序列化消息
【发布时间】:2019-04-11 09:01:55
【问题描述】:

初步情况

AVRO 序列化事件被发送到 Azure 事件中心。这些事件使用 azure 事件中心捕获功能永久存储。捕获的数据以及事件中心元数据以 Apache Avro 格式编写。捕获 avro 文件中包含的原始事件应使用 (py)Spark 进行分析。


问题

如何使用 (py)Spark 反序列化包含在 AVRO 文件的字段/列中的 AVRO 序列化事件? (注:事件的 avro 模式不被阅读器应用程序知道,但它作为 avro 标头包含在消息中)


背景

背景是物联网场景的分析平台。消息由运行在 kafka 上的 IoT 平台提供。为了更灵活地更改模式,战略决策是坚持使用 avro 格式。 要启用 Azure 流分析 (ASA),请为每条消息指定 avro 架构(否则 ASA 无法反序列化消息)。

捕获文件 avro 架构

事件中心捕获功能生成的 avro 文件的架构如下所列:

{
    "type":"record",
    "name":"EventData",
    "namespace":"Microsoft.ServiceBus.Messaging",
    "fields":[
                 {"name":"SequenceNumber","type":"long"},
                 {"name":"Offset","type":"string"},
                 {"name":"EnqueuedTimeUtc","type":"string"},
                 {"name":"SystemProperties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Properties","type":{"type":"map","values":["long","double","string","bytes"]}},
                 {"name":"Body","type":["null","bytes"]}
             ]
}

(请注意,实际消息以字节形式存储在正文字段中)

示例事件 avro 架构

为了说明,我将具有以下 avro 架构的事件发送到事件中心:

{
    "type" : "record",
    "name" : "twitter_schema",
    "namespace" : "com.test.avro",
    "fields" : [ 
                {"name" : "username","type" : "string"}, 
                {"name" : "tweet","type" : "string"},
                {"name" : "timestamp","type" : "long"}
    ],
}

示例事件

{
    "username": "stackoverflow",
    "tweet": "please help deserialize me",
    "timestamp": 1366150681
}

示例 avro 消息负载

(编码为字符串/注意包含 avro 模式)

Objavro.schema�{"type":"record","name":"twitter_schema","namespace":"com.test.avro","fields":[{"name":"username","type":"string"},{"name":"tweet","type":"string"},{"name":"timestamp","type":"long"}]}

所以最后这个有效载荷将作为字节存储在捕获 avro 文件的“正文”字段中。

.
.


我目前的做法

为了便于使用、测试和调试,我目前使用 pyspark jupyter notebook。

Spark Session 的配置:

%%configure
{
    "conf": {
        "spark.jars.packages": "com.databricks:spark-avro_2.11:4.0.0"
    }
}

将 avro 文件读入数据帧并输出结果:

capture_df = spark.read.format("com.databricks.spark.avro").load("[pathToCaptureAvroFile]")
capture_df.show()

结果:

+--------------+------+--------------------+----------------+----------+--------------------+
|SequenceNumber|Offset|     EnqueuedTimeUtc|SystemProperties|Properties|                Body|
+--------------+------+--------------------+----------------+----------+--------------------+
|            71|  9936|11/4/2018 4:59:54 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|
|            72| 10448|11/4/2018 5:00:01 PM|           Map()|     Map()|[4F 62 6A 01 02 1...|

获取 Body 字段的内容并将其转换为字符串:

msgRdd = capture_df.select(capture_df.Body.cast("string")).rdd.map(lambda x: x[0])

这就是我让代码工作的程度。花了很多时间尝试反序列化实际消息,但没有成功。我将不胜感激!

一些附加信息: Spark 在 Microsoft Azure HDInsight 3.6 群集上运行。 Spark 版本是 2.2。 Python 版本为 2.7.12。

【问题讨论】:

    标签: apache-spark pyspark avro azure-eventhub-capture


    【解决方案1】:

    您要做的是将.decode('utf-8') 应用于正文列中的每个元素。您必须从解码中创建一个UDF,以便您可以应用它。 UDF 可以使用

    创建
    from pyspark.sql import functions as f
    
    decodeElements = f.udf(lambda a: a.decode('utf-8'))
    

    下面是解析the IoT Hub to a custom Blob Storage endpoint存储的avro文件的完整示例:

    storage_account_name = "<YOUR STORACE ACCOUNT NAME>"
    storage_account_access_key = "<YOUR STORAGE ACCOUNT KEY>"
    
    # Read all files from one day. All PartitionIds are included. 
    file_location = "wasbs://<CONTAINER>@"+storage_account_name+".blob.core.windows.net/<IoT Hub Name>/*/2018/11/30/*/*"
    file_type = "avro"
    
    # Read raw data
    spark.conf.set(
      "fs.azure.account.key."+storage_account_name+".blob.core.windows.net",
      storage_account_access_key)
    
    reader = spark.read.format(file_type).option("inferSchema", "true")
    raw = reader.load(file_location)
    
    # Decode Body into strings
    from pyspark.sql import functions as f
    
    decodeElements = f.udf(lambda a: a.decode('utf-8'))
    
    jsons = raw.select(
        raw['EnqueuedTimeUtc'],
        raw['SystemProperties.connectionDeviceId'].alias('DeviceId'), 
        decodeElements(raw['Body']).alias("Json")
    )
    
    # Parse Json data
    from pyspark.sql.functions import from_json
    
    json_schema = spark.read.json(jsons.rdd.map(lambda row: row.Json)).schema
    data = jsons.withColumn('Parsed', from_json('Json', json_schema)).drop('Json')
    

    Disclamer:我对 Python 和 Databricks 都是新手,我的解决方案可能并不完美。但是我花了一天多的时间来完成这项工作,我希望这对某人来说是一个很好的起点。

    【讨论】:

    • 这个答案解释了如何从 Azure blob 存储(或 hdfs)读取 Avro 数据。提出的问题是如何将字节为 avro 有效负载的 RDD[Byte[]] 转换为数据帧。例如,使用RDD[String],其中字符串为json,您可以在pyspark spark.read.json(rdd) 中执行此操作,但对于avro,spark.read.format('avro').load(rdd) 不起作用,因为read.load() 只需要read.json() 接受RDD[String] 的路径.如果我找到解决方案,我会发布它,但到目前为止我还没有......
    • 没有。问题是关于“使用 azure 事件中心捕获功能存储 [...]”的事件。但是,这可能不是您的具体情况。感谢您的反对。
    • 你说得对,我看错了......我很抱歉投反对票!我尝试将其删除,但除非您编辑答案,否则它不会让我这样做。但是,我仍然认为您的答案不适用于问题中描述的用例。我确认如果捕获的消息(Body 字段中的消息)是 json,它可以工作,但在这个问题中,捕获的消息似乎也是用 avro 编写的。所以我们在 Avro 捕获文件中得到了一个 Avro 主体。我仍然没有找到一个好的解决方案。看起来 Spark 2.4 引入了一个可以提供帮助的 from_avro UDF,但我没有对其进行测试。
    • 我知道这是一个 python 问题——但今天我需要在 scala 中解决它。一种方法是 from_json 使用 gist.github.com/geoHeil/b1be2ec09f9c5e9a3b887073fe8bf004 显然,spark.apache.org/docs/latest/… 会更好,但我现在手头没有 json 模式。对于.select(from_avro('value, jsonFormatSchema)
    【解决方案2】:

    我想你也可以这样做:

    jsonRdd = raw.select(raw.Body.cast("string"))
    

    【讨论】:

      【解决方案3】:

      我有同样的问题。

      Spark 2.4 版本为我解决了这个问题。

      您可以在此处找到文档:https://databricks.com/blog/2018/11/30/apache-avro-as-a-built-in-data-source-in-apache-spark-2-4.html

      备注:你需要知道你的 AVRO 文件是什么样子才能创建你的架构(他们只是在这里加载它)。

      缺点:它目前仅在 Scala 和 Java 中可用。据我所知,这在 Python 中是不可能的。

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2020-11-22
        • 1970-01-01
        • 2020-03-04
        • 1970-01-01
        • 2019-07-12
        • 2020-08-29
        相关资源
        最近更新 更多