【问题标题】:Pass parameter to AWS Data pipeline - Built in template from Lambda function将参数传递到 AWS 数据管道 - 来自 Lambda 函数的内置模板
【发布时间】:2019-09-25 11:44:25
【问题描述】:

我想创建一个由 lambda 函数调用的数据管道。数据管道是“Load s3 data into RDS MYSQL”,使用AWS自己提供的模板构建。

通过我的 lambda 函数,我无法定义要发送到我的数据管道的参数。我想将以下参数从 lambda 发送到数据管道,

"myRDSInstanceId": "source-dev",
"myRDSUsername": "username",
"myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
"*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
"myInputS3Loc": "s3://services/employee/",
"myRDSTableName": "employee"

这怎么可能??任何帮助 下面还给出了 lambda 的 python 代码和我的管道定义。

from __future__ import print_function
import json
import urllib
import boto3
def lambda_handler(event, context):

    client = boto3.client('datapipeline')
    print('Loading function here')
    client.activate_pipeline(
    pipelineId='df-095524176JKK0DOHDDDC',
    parameterValues=[{'id':'myRDSTableName','stringValue':'employee'}])

     return {
        'statusCode': 200,
        'body': json.dumps('Hello from Lambda!')
    }

管道定义

{
  "objects": [
    {
      "output": {
        "ref": "DestinationRDSTable"
      },
      "input": {
        "ref": "S3InputDataLocation"
      },
      "dependsOn": {
        "ref": "RdsMySqlTableCreateActivity"
      },
      "name": "DataLoadActivity",
      "id": "DataLoadActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "type": "CopyActivity"
    },
    {
      "subnetId": "subnet-XXXXX",
      "instanceType": "m1.medium",
      "name": "Ec2Instance",
      "actionOnTaskFailure": "terminate",
      "securityGroups": "#{myEc2RdsSecurityGrps}",
      "id": "Ec2Instance",
      "type": "Ec2Resource",
      "terminateAfter": "1 Hours"
      "terminateAfter": "1 Hours"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "RdsMySqlTableCreateActivity",
      "runsOn": {
        "ref": "Ec2Instance"
      },
      "id": "RdsMySqlTableCreateActivity",
      "type": "SqlActivity",
      "script": "#{myRDSCreateTableSql}"
    },
    {
      "*password": "password",
      "name": "rds_mysql",
      "id": "rds_mysql",
      "type": "RdsDatabase",
      "rdsInstanceId": "#{myRDSInstanceId}",
      "username": "#{myRDSUsername}"
    },
    {
      "name": "DataFormat1",
      "columnSeparator": "|",
      "id": "DataFormat1",
      "type": "TSV",
      "recordSeparator": "\\n"
    },
    {
      "failureAndRerunMode": "CASCADE",
      "resourceRole": "DataPipelineDefaultResourceRole",
      "role": "DataPipelineDefaultRole",
      "pipelineLogUri": "s3://logs/",
      "scheduleType": "ONDEMAND",
      "name": "Default",
      "id": "Default"
    },
    {
      "database": {
        "ref": "rds_mysql"
      },
      "name": "DestinationRDSTable",
      "insertQuery": "#{myRDSTableInsertSql}",
      "id": "DestinationRDSTable",
      "type": "SqlDataNode",
      "table": "#{myRDSTableName}",
      "selectQuery": "select * from #{table}"
    },
    {
      "directoryPath": "#{myInputS3Loc}",
      "dataFormat": {
        "ref": "DataFormat1"
      },
      "name": "S3InputDataLocation",
      "id": "S3InputDataLocation",
      "type": "S3DataNode"
    }
  ],
  "parameters": [
    {
      "description": "RDS MySQL password",
      "id": "*myRDSPassword",
      "type": "String"
    },
    {
      "watermark": "security group name",
      "helpText": "The names of one or more EC2 security groups that have access to the RDS MySQL cluster.",
      "description": "RDS MySQL security group(s)",
      "isArray": "true",
      "optional": "true",
      "id": "myEc2RdsSecurityGrps",
      "type": "String"
    },
    {
      "description": "RDS MySQL username",
      "id": "myRDSUsername",
      "type": "String"
    },
    {
      "description": "Input S3 file path",
      "id": "myInputS3Loc",
      "type": "AWS::S3::ObjectKey"
    },
    {
      "helpText": "The SQL statement to insert data into the RDS MySQL table.",
      "watermark": "INSERT INTO #{table} (col1, col2, col3) VALUES(?, ?, ?) ;",
      "description": "Insert SQL query",
      "id": "myRDSTableInsertSql",
      "type": "String"
    },
    {
      "helpText": "The name of an existing table or a new table that will be created based on the create table SQL query parameter below.",
      "description": "RDS MySQL table name",
      "id": "myRDSTableName",
      "type": "String"
    },
    {
      "watermark": "DB Instance",
      "description": "RDS Instance ID",
      "id": "myRDSInstanceId",
      "type": "String"
    }
  ],
  "values": {
    "myRDSInstanceId": "source-dev",
    "myRDSUsername": "username",
    "myRDSTableInsertSql": "INSERT INTO employee(id,name,salary) VALUES(?,?,?,)",
    "*myRDSPassword": "https://www.ec2instances.info/?filter=m3",
    "myInputS3Loc": "s3://services/employee/",
    "myRDSTableName": "employee"
  }
}

【问题讨论】:

    标签: python-3.x amazon-web-services aws-lambda aws-data-pipeline


    【解决方案1】:

    在 Lambda 代码中,您仅为参数 myRDSTableName 提供了值。当您运行 Lambda 函数时,它只会传递 myRDSTableName 的值,其他参数将为空。 您需要在 Lambda 函数中传递运行管道所需的其他参数的值,或者在管道定义中(在参数对象部分)设置默认参数值。

    【讨论】:

      【解决方案2】:

      activate_pipeline 的参数由列表输入给出,所以

      client.activate_pipeline(
          pipelineId='df-095524176JKK0DOHDDDC',
          parameterValues=[
              {
                  'id':'myRDSTableName',
                  'stringValue':'employee'
              },
              {
                  'id':'blah',
                  'stringValue':'blah'
              },
              ...
          ]
      )
      

      重复添加您的参数值。更多详情可以到boto3documentation查看。

      【讨论】:

        猜你喜欢
        • 2017-06-14
        • 2017-12-30
        • 2015-09-10
        • 2017-03-07
        • 1970-01-01
        • 2016-04-10
        • 2020-11-06
        • 2021-03-27
        相关资源
        最近更新 更多