【问题标题】:How to write a list to a file in Google Cloud Storage using Google Cloud Functions with Python如何使用带有 Python 的 Google Cloud Functions 将列表写入 Google Cloud Storage 中的文件
【发布时间】:2023-03-16 04:57:01
【问题描述】:

我正在尝试使用 Cloud Functions 将列表成员写入 Cloud Storage 存储桶中的文件。

我发现 this 页面显示如何将文件上传到我的存储桶,但我需要遍历列表中的成员并将它们写入 Cloud Storage 中的文件。

我需要能够使用从我的 Google Cloud SQL 数据库中读取的 Cloud Functions 来执行此操作。我希望能够将 PostreSQL 数据库中某些表中的数据作为文件存储在 Cloud Storage 中。

谢谢。

【问题讨论】:

  • 到目前为止你有什么尝试?

标签: python google-cloud-functions google-cloud-storage google-cloud-sql


【解决方案1】:
  • 如果您只需要在 Python 中循环您的列表并编写结果 到一个文件,你可以在线使用多个 Python 示例中的任何一个,或者 在 Stack Overflow 中,如this 一:

    with open('your_file.txt', 'w') as f:
        for item in my_list:
            f.write("%s\n" % item)
    

    当然,这取决于您的列表的外观、数据以及您需要写入 Cloud Storage 的文件类型;这些必须根据您的需要进行更换。

  • 从 Cloud Function 连接到 Cloud SQL for PostgreSQL 数据库你可以关注documentation。一个例子使用 SQLAlchemyUnix 套接字是:

       db = sqlalchemy.create_engine(
           # Equivalent URL:
           # postgres+pg8000://<db_user>:<db_pass>@/<db_name>?unix_sock=/cloudsql/<cloud_sql_instance_name>/.s.PGSQL.5432
           sqlalchemy.engine.url.URL(
               drivername='postgres+pg8000',
               username=db_user,
               password=db_pass,
               database=db_name,
               query={
                   'unix_sock': '/cloudsql/{}/.s.PGSQL.5432'.format(
                       cloud_sql_connection_name)
               }
           ),
       )
    

    其中db_userdb_passdb_name 必须替换为您的数据库的用户名、密码和数据库的 名字。

  • 您引用的link 提到了如何将 blob 上传到 Cloud 正如您可能知道的那样,使用 Python 进行存储,所以一旦数据 从数据库中提取并写入your_file.txt for 例如,您可以通过以下方式将其上传到 Cloud Storage:

    from google.cloud import storage
    
    
    def upload_blob(bucket_name, source_file_name, destination_blob_name):
        """Uploads a file to the bucket."""
        bucket_name = "your-bucket-name"
        source_file_name = "local/path/to/file/your_file.txt"
        destination_blob_name = "storage-object-name"
    
        storage_client = storage.Client()
        bucket = storage_client.bucket(bucket_name)
        blob = bucket.blob(destination_blob_name)
    
        blob.upload_from_filename(source_file_name)
    
        print(
            "File {} uploaded to {}.".format(
                source_file_name, destination_blob_name
            )
        )
    

    your-bucket-name 替换为您的 Cloud Storage 存储分区的名称,将 local/path/to/file/your_file.txt 替换为文件的本地路径,将 storage-object-name 替换为您希望文件上传到 Cloud Storage 存储分区后的名称和扩展名.

将所有这些放在一起,您就可以实现您的目标。

【讨论】:

    【解决方案2】:

    我设法用以下 python 代码完成了它:

    import datetime
    import logging
    import os
    import sqlalchemy
    from google.cloud import storage
    import pandas as pd
    
    # Remember - storing secrets in plaintext is potentially unsafe. Consider using
    # something like https://cloud.google.com/kms/ to help keep secrets secret.
    db_user = "<DB_USER>"#os.environ.get("DB_USER")
    db_pass = "<DB_PASS>"#os.environ.get("DB_PASS")
    db_name = "<DB_NAME>"#os.environ.get("DB_NAME")
    cloud_sql_connection_name = "<Cloud SQL Instance Connection Name>"#os.environ.get("CLOUD_SQL_CONNECTION_NAME")
    logger = logging.getLogger()
    
    # [START cloud_sql_postgres_sqlalchemy_create]
    db = sqlalchemy.create_engine(
        # Equivalent URL:
        # postgres+pg8000://<db_user>:<db_pass>@/<db_name>?unix_sock=/cloudsql/<cloud_sql_instance_name>/.s.PGSQL.5432
        sqlalchemy.engine.url.URL(
            drivername='postgres+pg8000',
            username=db_user,
            password=db_pass,
            database=db_name,
            query={
                'unix_sock': '/cloudsql/{}/.s.PGSQL.5432'.format(
                    cloud_sql_connection_name)
            }
        ),
        # ... Specify additional properties here.
        pool_size=5,
        max_overflow=2,
        pool_timeout=30,  # 30 seconds
        pool_recycle=1800,  # 30 minutes
    )
    
    def read_source_data(request):
        bucket_name = <YOUR_BUCKET_NAME>
        folder_name = "sample_files"
        file_name = "test.txt"
    
        with db.connect() as conn:
            sales_records = conn.execute(
                "SELECT * FROM sales;"
            ).fetchall()
    
        if len(sales_records) > 0:
            #for val in sales_records:
                #print(val)
            df = pd.DataFrame(sales_records)
            df.columns = sales_records[0].keys()
            create_file(bucket_name, "sample_files/test.txt", df)
            return "Done!"
        else:
            print("Nothing!")
            return "Nothing!"
    
    def create_file(bucketname, path, records_read):
      storage_client = storage.Client()
      bucket = storage_client.get_bucket(bucketname)
      blob = storage.Blob(
            name=path,
            bucket=bucket,
        )
    
      content = records_read.to_csv(index=False)#'\n'.join(map(str, records_read))
    
      blob.upload_from_string(
            data=content,
            content_type='text/plain',
            client=storage_client,
        )
    

    我从多个代码 sn-ps 将其拼接在一起,作为非 python 开发人员,我很确定有更好的方法来完成这项工作。 然后我使用

    部署了我的功能
    gcloud deployment-manager deployments  create
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2018-04-22
      • 2021-12-26
      • 2020-05-05
      • 2019-02-14
      • 1970-01-01
      • 2012-01-02
      • 2021-07-27
      • 2020-12-01
      相关资源
      最近更新 更多