【问题标题】:numpy.load from io.BytesIO stream来自 io.BytesIO 流的 numpy.load
【发布时间】:2019-08-31 19:32:32
【问题描述】:

我在 Azure Blob 存储中保存了 numpy 数组,我正在将它们加载到这样的流中:

stream = io.BytesIO()
store.get_blob_to_stream(container, 'cat.npy', stream)

我从stream.getvalue() 知道该流包含用于重建数组的元数据。这是前 150 个字节:

b"\x93NUMPY\x01\x00v\x00{'descr': '|u1', 'fortran_order': False, 'shape': (720, 1280, 3), }                                                  \n\xc1\xb0\x94\xc2\xb1\x95\xc3\xb2\x96\xc4\xb3\x97\xc5\xb4\x98\xc6\xb5\x99\xc7\xb6\x9a\xc7"

是否可以使用numpy.load 或其他一些简单的方法加载字节流?

我可以将阵列保存到磁盘并从磁盘加载,但出于几个原因我想避免这种情况...

编辑:只是强调一下,输出需要是一个 numpy 数组,其形状和 dtype 在流的前 128 个字节中指定。

【问题讨论】:

  • 如果你把它保存到磁盘,你会如何加载它?
  • @hpaulj by numpy.load()
  • load 接受像对象这样的打开文件。
  • 好的,是的,无法让它工作......

标签: python numpy azure-storage


【解决方案1】:

我尝试了几种方法来实现您的需求。

这是我的示例代码。

from azure.storage.blob.baseblobservice import BaseBlobService
import numpy as np

account_name = '<your account name>'
account_key = '<your account key>'
container_name = '<your container name>'
blob_name = '<your blob name>'

blob_service = BaseBlobService(
    account_name=account_name,
    account_key=account_key
)

示例 1. 使用 sas 令牌生成 blob url 以通过 requests 获取内容

from azure.storage.blob import BlobPermissions
from datetime import datetime, timedelta
import requests

sas_token = blob_service.generate_blob_shared_access_signature(container_name, blob_name, permission=BlobPermissions.READ, expiry=datetime.utcnow() + timedelta(hours=1))
print(sas_token)
url_with_sas = blob_service.make_blob_url(container_name, blob_name, sas_token=sas_token)
print(url_with_sas)

r = requests.get(url_with_sas)
dat = np.frombuffer(r.content)
print('from requests', dat)

示例2.通过BytesIO将blob内容下载到内存中

import io
stream = io.BytesIO()
blob_service.get_blob_to_stream(container_name, blob_name, stream)
dat = np.frombuffer(stream.getbuffer())
print('from BytesIO', dat)

示例 3. 使用 numpy.fromfileDataSource 打开带有 sas 令牌的 blob url,它实际上会将 blob 文件下载到本地文件系统中。

ds = np.DataSource()
# ds = np.DataSource(None)  # use with temporary file
# ds = np.DataSource(path) # use with path like `data/`
f = ds.open(url_with_sas)
dat = np.fromfile(f)
print('from DataSource', dat)

我认为样本 1 和 2 更适合您。

【讨论】:

  • 嘿,非常感谢您的回复,但我想我忘了强调我特别需要一个形状和 dtype 的 numpy 数组,如 stream.getvalue() 前 128 个字节... @987654331 @ 实际上可以正常工作!
【解决方案2】:

当涉及到 np.savez 时,上述解决方案通常需要工作。

上传到存储:

import io    
import numpy as np    

stream = io.BytesIO()  
arr1 = np.random.rand(20,4)  
arr2 = np.random.rand(20,4)  
np.savez(stream, A=arr1, B=arr2)  
block_blob_service.create_blob_from_bytes(container, 
                                          "my/path.npz", 
                                          stream.getvalue())

从存储下载:

from numpy.lib.npyio import NpzFile 

stream = io.BytesIO()  
block_blob_service.get_blob_to_stream(container, "my/path.npz", stream)  

ret = NpzFile(stream, own_fid=True, allow_pickle=True)  
print(ret.files)  
""" ['A', 'B'] """  
print(ret['A'].shape)  
""" (20, 4) """  

【讨论】:

    【解决方案3】:

    这是我想出的一种 hacky 方式,它基本上只是从前 128 个字节中获取元数据:

    def load_npy_from_stream(stream_):
        """Experimental, may not work!
    
        :param stream_: io.BytesIO() object obtained by e.g. calling BlockBlobService().get_blob_to_stream() containing
            the binary stream of a standard format .npy file.
        :return: numpy.ndarray
        """
        stream_.seek(0)
        prefix_ = stream_.read(128)  # first 128 bytes seem to be the metadata
        dict_string = re.search('\{(.*?)\}', prefix_[1:].decode())[0]
        metadata_dict = eval(dict_string)
    
        array = np.frombuffer(stream_.read(), dtype=metadata_dict['descr']).reshape(metadata_dict['shape'])
    
        return array
    

    可能会以多种方式失败,但如果有人想试一试,我会在此处发布。我将对此进行测试,并在了解更多信息后返回。

    【讨论】:

      【解决方案4】:

      有点晚了,但如果有人想使用 numpy.load 执行此操作,代码如下(Azure SDK v12.8.1):

      from azure.storage.blob import BlobServiceClient
      import io
      import numpy as np
      
      # define your connection parameters
      connect_str = ''
      container_name = ''
      blob_name = ''
      
      blob_service_client = BlobServiceClient.from_connection_string(connect_str)
      
      blob_client = blob_service_client.get_blob_client(container=container_name,
                                                        blob=blob_name)
      
      # Get StorageStreamDownloader
      blob_stream = blob_client.download_blob()
      stream = io.BytesIO()
      blob_stream.download_to_stream(stream)
      stream.seek(0)
      
      # Load form io.BytesIO object
      data = np.load(stream, allow_pickle=False)
      
      print(data.shape)
      

      【讨论】:

      • 这非常有效。我猜是 seek(0) 使它起作用? allow_pickle=False 似乎是默认值。
      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 2017-08-05
      • 1970-01-01
      • 1970-01-01
      • 2018-07-13
      • 2020-07-21
      • 2020-09-06
      • 2019-09-17
      相关资源
      最近更新 更多