【问题标题】:Failing to decompress streaming data by using UDF on Azure Databricks - Python无法在 Azure Databricks 上使用 UDF 解压缩流数据 - Python
【发布时间】:2019-05-06 20:45:58
【问题描述】:

我正在尝试使用 Azure DataBricks 和 python (PySpark) 读取 Azure EventHub GZIP 压缩消息,但使用 UDF 无法处理 BinaryType 数据。

嗯,这里是我检查身体里有什么的部分

df = eventHubStream.withColumn("body", eventHubStream["body"]).select("body")
display(df, truncate=False)

这会显示压缩良好的数据,如下所示:H4sIAKeM0FwC/3VS22rbQBB9z1cIQ6ElWN37JW8baeMKZEmRNk4LhcXUppg2cYncy...

但是,当我尝试将数据发送到我的 UDF 时,它的行为不像预期的那样。该函数实际上什么都不做,但输出看起来已经被转换了:

import zlib
from pyspark.sql.types import StringType

def streamDecompress(val: BinaryType()):
  #return zlib.decompress(val)
  return val

func_udf = udf(lambda x: streamDecompress(x), StringType())

df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select("body")
display(df, truncate=False)

这是输出:

[B@49d3f786

所以,正如预期的那样,当我尝试使用 zlib 解压缩时它失败了。

有人知道我是怎么做到的吗?

【问题讨论】:

    标签: python pyspark binary azure-eventhub azure-databricks


    【解决方案1】:

    嗯,这比我想象的要简单得多。我基本上是在尝试显示类似字节的数据哈哈。

    下面的代码解决了这个问题:

    import zlib
    
    def streamDecompress(val):   
      return str(zlib.decompress(val, 15+32))
    
    func_udf = udf(lambda x: streamDecompress(x))
    
    df = eventHubStream.withColumn("body", func_udf(eventHubStream["body"])).select('body')
    
    display(df, truncate=False)
    

    【讨论】:

    • 您可以简化这一点(我认为),但有一个问题.. 15+32 有什么作用?!看起来很神奇,但我看到它在某种程度上与其他谷歌搜索的标题和校验和有关......你仍然很奇怪!
    • 正如这里的文档中所述:docs.python.org/3/library/zlib.html 在解压缩部分:“+40 到 +47 = 32 +(8 到 15):使用值的低 4 位作为窗口大小对数,并自动接受 zlib 或 gzip 格式。"
    • 这已经很简单了。我不确定我们是否真的可以简化它。但是您可以做的一件事是删除显式 UDF 声明并使其隐式:@udf def streamDecompress(value): return str(zlib.decompress(value, 15+32))
    【解决方案2】:

    非常感谢。由于上周我一直在努力使用 zStandard 做同样的事情,所以我想我会添加我的代码 sn-p 以防其他人正在寻找类似的解决方案(我在任何地方都找不到):

    import zstandard as zstd
    def streamDecompress(val):
        return str(zstd.ZstdDecompressor().decompress(val))
    
    my_udf=udf(lambda x: streamDecompress(x))
    decompressedStream= pyStreamIn.withColumn("body",my_udf(pyStreamIn["body"]))
    

    【讨论】:

      猜你喜欢
      • 2021-10-22
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多