【问题标题】:How to fetch/get Google cloud transfer job's run history details in python?如何在 python 中获取/获取谷歌云传输作业的运行历史详细信息?
【发布时间】:2021-07-16 15:27:36
【问题描述】:

我的 GCP 帐户中运行的 Google 云传输作业很少,它会将数据从 Azure 传输到 GCS 存储桶。

根据本文档 - https://cloud.google.com/storage-transfer/docs/reference/rest/v1/transferJobs/get?apix_params=%7B%22jobName%22%3A%22transferJobs%2F213858246512856794%22%2C%22projectId%22%3A%22merlincloud-gcp-preprod%22%7D “get”方法可以获取作业的详细信息,例如名称、描述、bucketName、状态、includePrefixes、storageAccount 等。

这是“get”方法的示例输出。

{
  "name": "transferJobs/<job_name>",
  "description": "<description given while creating job>",
  "projectId": "<project_id>",
  "transferSpec": {
    "gcsDataSink": {
      "bucketName": "<destination_bucket>"
    },
    "objectConditions": {
      "includePrefixes": [
        "<prefix given while creating job>"
      ],
      "lastModifiedSince": "2021-06-30T18:30:00Z"
    },
    "transferOptions": {
      
    },
    "azureBlobStorageDataSource": {
      "storageAccount": "<account_name>",
      "container": "<container_name>"
    }
  },
  "schedule": {
    "scheduleStartDate": {
      "year": 2021,
      "month": 7,
      "day": 1
    },
    "startTimeOfDay": {
      "hours": 13,
      "minutes": 45
    },
    "repeatInterval": "86400s"
  },
  "status": "ENABLED",
  "creationTime": "2021-07-01T06:08:19.392111916Z",
  "lastModificationTime": "2021-07-01T06:13:32.460934533Z",
  "latestOperationName": "transferOperations/transferJobs-<job_name>"
}

现在,我如何在 python 中获取特定作业的运行历史详细信息?

“运行历史详细信息”是指 GTS 控制台中显示的指标(数据传输、文件数量、状态、大小、持续时间),如下图所示。

【问题讨论】:

    标签: google-cloud-platform google-api google-cloud-storage


    【解决方案1】:

    我对中转服务不熟悉,但我对 GCP 非常熟悉。

    该服务提供的唯一其他资源是transferOperations

    这是否提供了您需要的数据?

    如果不是 (!),则可能是 Google 没有在控制台之外公开此功能。即使意图始终是(公共)API,这种情况偶尔也会发生。

    您可以进行调查的一种方法是检查浏览器的开发人员工具“网络”选项卡,以查看控制台为满足请求而进行的 REST API 调用。另一种方法是使用等效的 gcloud 命令并附加 --log-http 以通过这种方式查看底层 REST API 调用。

    【讨论】:

      【解决方案2】:

      正如@DazWilkin 提到的,我能够使用transferOperations - list API 获取每个作业的运行历史详细信息

      我编写了一个云函数来通过调用 API 来获取 GTS 指标。 最初,它会调用tansferJobs - list API 并获取作业列表,其中仅获取所需的作业详细信息。然后它会调用“transferOperations”API 并传递作业名称以获取运行历史详细信息。

      代码如下:

      from googleapiclient import discovery
      from oauth2client.client import GoogleCredentials
      from datetime import datetime
      
      import logging
      
      """
      requirements.txt
      google-api-python-client==2.3.0
      oauth2client==4.1.3
      """
      
      
      class GTSMetrics:
      
          def __init__(self):
              self.project = "<your_gcp_project_name>"
              self.source_type_mapping = {"gcsDataSource": "Google Cloud Storage", "awsS3DataSource": "Amazon S3",
                                          "azureBlobStorageDataSource": "Azure Storage"}
              self.transfer_job_names = ["transferJobs/<your_job_name>"]
              self.credentials = GoogleCredentials.get_application_default()
              self.service = discovery.build('storagetransfer', 'v1', credentials=self.credentials)
              self.metric_values = {}
      
          def build_run_history_metrics(self, job=None):
              try:
                  if job:
                      operation_filters = {"projectId": self.project, "jobNames": [job['name']]}
      
                      request = self.service.transferOperations().list(name='transferOperations', filter=operation_filters)
      
                      while request is not None:
                          response = request.execute()
                          if 'operations' in response:
      
                              self.metric_values['total_runs'] = len(response['operations'])
      
                              metadata = response['operations'][0]['metadata']
      
                              status = metadata['status'] if 'status' in metadata else ""
                              start_time = metadata['startTime'] if 'startTime' in metadata else ""
                              end_time = metadata['endTime'] if 'endTime' in metadata else ""
      
                              start_time_object = datetime.strptime(start_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
                              end_time_object = datetime.strptime(end_time[:-4], "%Y-%m-%dT%H:%M:%S.%f")
                              gts_copy_duration = end_time_object - start_time_object
      
                              self.metric_values['latest_run_status'] = status
                              self.metric_values['latest_run_time'] = str(start_time_object)
                              self.metric_values['latest_run_errors'] = ""
                              self.metric_values['start_time'] = str(start_time_object)
                              self.metric_values['end_time'] = str(end_time_object)
                              self.metric_values['duration'] = gts_copy_duration.total_seconds()
      
                              if status == "FAILED":
                                  if 'errorBreakdowns' in metadata:
                                      errors = metadata['errorBreakdowns'][0]['errorCount']
                                      error_code = metadata['errorBreakdowns'][0]['errorCode']
                                      self.metric_values['latest_run_errors'] = f"{errors} - {error_code}"
                              elif status == "SUCCESS":
                                  counters = metadata['counters']
                                  data_bytes = counters['bytesCopiedToSink'] if 'bytesCopiedToSink' in counters else '0 B'
                                  obj_from_src = str(
                                      counters['objectsFoundFromSource']) if 'objectsFoundFromSource' in counters else 0
                                  obj_copied_sink = str(
                                      counters['objectsCopiedToSink']) if 'objectsCopiedToSink' in counters else 0
                                  data_skipped_bytes = counters[
                                      'bytesFromSourceSkippedBySync'] if 'bytesFromSourceSkippedBySync' in counters else '0 B'
                                  data_skipped_files = counters[
                                      'objectsFromSourceSkippedBySync'] if 'objectsFromSourceSkippedBySync' in counters else '0'
      
                                  self.metric_values['data_transferred'] = data_bytes
                                  self.metric_values['files_found_in_source'] = obj_from_src
                                  self.metric_values['files_copied_to_sink'] = obj_copied_sink
                                  self.metric_values['data_skipped_in_bytes'] = data_skipped_bytes
                                  self.metric_values['data_skipped_files'] = data_skipped_files
      
                          break
                          # request = self.service.transferOperations().list_next(previous_request=request,
                          #                                                       previous_response=response)
      
              except Exception as e:
                  logging.error(f"Exception in build_run_history_metrics - {str(e)}")
      
          def build_job_metrics(self, job):
              try:
                  transfer_spec = list(job['transferSpec'].keys())
      
                  source = ""
                  source_type = ""
                  if "gcsDataSource" in transfer_spec:
                      source_type = self.source_type_mapping["gcsDataSource"]
                      source = job['transferSpec']["gcsDataSource"]["bucketName"]
                  elif "awsS3DataSource" in transfer_spec:
                      source_type = self.source_type_mapping["awsS3DataSource"]
                      source = job['transferSpec']["awsS3DataSource"]["bucketName"]
                  elif "azureBlobStorageDataSource" in transfer_spec:
                      source_type = self.source_type_mapping["azureBlobStorageDataSource"]
      
                  frequency = "Once"
                  schedule = list(job['schedule'].keys())
                  if "repeatInterval" in schedule:
                      interval = job['schedule']['repeatInterval']
      
                      if interval == "86400s":
                          frequency = "Every day"
                      elif interval == "604800s":
                          frequency = "Every week"
                      else:
                          frequency = "Custom"
      
                  prefix = ""
                  if 'objectConditions' in transfer_spec:
                      obj_con = job['transferSpec']['objectConditions']
                      if 'includePrefixes' in obj_con:
                          prefix = job['transferSpec']['objectConditions']['includePrefixes'][0]
      
                  self.metric_values['job_description'] = job['description']
                  self.metric_values['job_name'] = job['name']
                  self.metric_values['source_type'] = source_type
                  self.metric_values['source'] = source
                  self.metric_values['destination'] = job['transferSpec']['gcsDataSink']['bucketName']
                  self.metric_values['frequency'] = frequency
                  self.metric_values['prefix'] = prefix
              except Exception as e:
                  logging.error(f"Exception in build_job_metrics - {str(e)}")
      
          def build_metrics(self):
              try:
                  request = self.service.transferJobs().list(pageSize=None, pageToken=None, x__xgafv=None,
                                                             ilter={"projectId": self.project})
      
                  while request is not None:
                      response = request.execute()
      
                      for transfer_job in response['transferJobs']:
                          if transfer_job['name'] in self.transfer_job_names:
                              
                              # fetch job details
                              self.build_job_metrics(job=transfer_job)
      
                              # fetch run history details for the job
                              self.build_run_history_metrics(job=transfer_job)
      
                      request = self.service.transferJobs().list_next(previous_request=request, previous_response=response)
      
                  logging.info(f"GTS Metrics - {str(self.metric_values)}")
              except Exception as e:
                  logging.error(f"Exception in build_metrics - {str(e)}")
      
      
      def build_gts_metrics(request):
          gts_metrics = GTSMetrics()
          gts_metrics.build_metrics()
      
      

      【讨论】:

        猜你喜欢
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2023-03-19
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        相关资源
        最近更新 更多