【问题标题】:How to sign gcs blob from the dataflow worker如何从数据流工作人员签署 gcs blob
【发布时间】:2020-04-20 18:44:15
【问题描述】:

我的 beam 数据流作业在本地成功(使用 DirectRunner)并在云端失败(使用 DataflowRunner

这个代码sn-p本地化的问题:

class SomeDoFn(beam.DoFn):
  ...
  def process(self, gcs_blob_path):
    gcs_client = storage.Client()
    bucket = gcs_client.get_bucket(BUCKET_NAME)
    blob = Blob(gcs_blob_path, bucket)

    # NEXT LINE IS CAUSING ISSUES! (when run remotely)
    url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')

并且数据流指向错误:“AttributeError:您需要一个私钥来签署凭据。您当前使用的凭据仅包含一个令牌。”

我的数据流作业使用服务帐户(PipelineOptions 中提供了适当的service_account_email),但是我看不到如何将该服务帐户的 .json 凭据文件传递给数据流作业。我怀疑我的作业在本地运行成功,因为我设置了环境变量GOOGLE_APPLICATION_CREDENTIALS=<path to local file with service account credentials>,但是如何为远程数据流工作人员进行类似的设置呢?或者如果有人可以提供帮助,也许还有其他解决方案

【问题讨论】:

    标签: google-cloud-platform google-cloud-dataflow apache-beam gcloud


    【解决方案1】:

    您需要使用环境变量 GOOGLE_APPLICATION_CREDENTIALS 提供服务帐户 JSON 密钥,类似于您在本地执行的操作。

    为此,您可以遵循question 的答案中提到的一些方法。比如使用PipelineOptions传递它

    但是,请记住,最安全的方法是将 JSON 密钥存储在 GCP 存储桶中,然后从那里获取文件。

    简单但不安全的解决方法是获取密钥,打开它,然后在您的代码中创建一个基于它的 json 对象以便稍后传递。

    【讨论】:

    • 谢谢。我想我理解这两种方法 1)在 gcs 中保留 .json 和 2)在本地创建 .json 并将其传递给工作人员。但是,当您说“例如使用 PipelineOptions 传递它”时,我不确定我是否正确。你知道gcp支持的具体选项吗?正如我在问题中提到的,我使用 service_account_email 但没有看到任何其他相关选项。
    • 我的意思是使用“temp_location”和“staging_location”来指定将有json键的Bucket。
    【解决方案2】:

    您可以查看here 示例,了解如何将自定义选项添加到您的 Beam 管道。有了这个,我们可以创建一个--key_file 参数,该参数将指向存储在 GCS 中的凭据:

    parser.add_argument('--key_file',
                      dest='key_file',
                      required=True,
                      help='Path to service account credentials JSON.')
    

    这将允许您在运行作业时添加--key_file gs://PATH/TO/CREDENTIALS.json 标志。

    然后,您可以从作业中读取它并将其作为侧面输入传递给需要对 blob 签名的DoFn。从示例here 开始,我们创建一个credentials PCollection 来保存JSON 文件:

    credentials = (p 
      | 'Read Credentials from GCS' >> ReadFromText(known_args.key_file))
    

    我们将它广播给所有处理 SignFileFn 函数的工作人员:

    (p
      | 'Read File from GCS' >> beam.Create([known_args.input]) \
      | 'Sign File' >> beam.ParDo(SignFileFn(), pvalue.AsList(credentials)))
    

    ParDo 内部,我们构建JSON 对象来初始化客户端(使用方法here)并签署文件:

    class SignFileFn(beam.DoFn):
      """Signs GCS file with GCS-stored credentials"""
      def process(self, gcs_blob_path, creds):
        from google.cloud import storage
        from google.oauth2 import service_account
    
        credentials_json=json.loads('\n'.join(creds))
        credentials = service_account.Credentials.from_service_account_info(credentials_json)
    
        gcs_client = storage.Client(credentials=credentials)
    
        bucket = gcs_client.get_bucket(gcs_blob_path.split('/')[2])
        blob = bucket.blob('/'.join(gcs_blob_path.split('/')[3:]))
    
        url = blob.generate_signed_url(datetime.timedelta(seconds=300), method='GET')
        logging.info(url)
        yield url
    

    查看完整代码here

    【讨论】:

    • 这就是方法!感谢您的详细回答
    • 也许一种选择是在本地读取凭据并将其作为字符串传递给SignFileFn 构造函数。您知道一种方式或其他方式是否有任何好处吗?
    • 是的,我也考虑过这种可能性,我认为实现起来会更简单,但我认为这种可能性对于审计/控制访问会更好(因为它是使用控制器服务帐户而不是最终用户启动作业)。它还可以扩展,以便定期刷新侧面输入,以防您需要轮换凭证文件
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2018-11-05
    相关资源
    最近更新 更多