动态任务

参数:

dynamicTaskNameParam:来自任务输入的参数的名称,其值用于调度任务。 例如 如果参数的值为ABC,则调度的下一个任务类型为“ABC”。

Example

{
  "name": "user_task",
  "taskReferenceName": "t1",
  "inputParameters": {
    "files": "${workflow.input.files}",
    "taskToExecute": "${workflow.input.user_supplied_task}"
  },
  "type": "DYNAMIC",
  "dynamicTaskNameParam": "taskToExecute"
}

如果以user_supplied_task的值(假设值为user_task_2)来启动工作流,那么Conductor将在调度此动态任务时调度user_task_2。

决策任务:

决策任务与编程语言中的case ... switch语句相似。 该任务需要3个参数:
caseValueParam: 在决策案例中找不到匹配值时执行的任务列表(默认条件)

Example

{
  "name": "decide_task",
  "taskReferenceName": "decide1",
  "inputParameters": {
    "case_value_param": "${workflow.input.movieType}"
  },
  "type": "DECISION",
  "caseValueParam": "case_value_param",
  "decisionCases": {
    "Show": [
      {
        "name": "setup_episodes",
        "taskReferenceName": "se1",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      },
      {
        "name": "generate_episode_artwork",
        "taskReferenceName": "ga",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      }
    ],
    "Movie": [
      {
        "name": "setup_movie",
        "taskReferenceName": "sm",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      },
      {
        "name": "generate_movie_artwork",
        "taskReferenceName": "gma",
        "inputParameters": {
          "movieId": "${workflow.input.movieId}"
        },
        "type": "SIMPLE"
      }
    ]
  }
}

Fork任务:

fork用于调度并行的任务。
参数:
forkTasks:任务列表的列表。 每个子列表被安排并行执行。 然而,子列表中的任务是以串行方式安排的。

Example

{
  "forkTasks": [
    [
      {
        "name": "task11",
        "taskReferenceName": "t11"
      },
      {
        "name": "task12",
        "taskReferenceName": "t12"
      }
    ],
    [
      {
        "name": "task21",
        "taskReferenceName": "t21"
      },
      {
        "name": "task22",
        "taskReferenceName": "t22"
      }
    ]
  ]
}

参数的名字,此参数的值是一个映射,映射的key是fork任务的别名,value是fork任务的input

Example

{
  "dynamicTasks": "${taskA.output.dynamicTasksJSON}",
  "dynamicTasksInput": "${taskA.output.dynamicTasksInputJSON}",
  "type": "FORK_JOIN_DYNAMIC",
  "dynamicForkTasksParam": "dynamicTasks",
  "dynamicForkTasksInputParamName": "dynamicTasksInput"
}


假设taskA的输出为:
{
  "dynamicTasksInputJSON": {
    "forkedTask1": {
      "width": 100,
      "height": 100,
      "params": {
        "recipe": "jpg"
      }
    },
    "forkedTask2": {
      "width": 200,
      "height": 200,
      "params": {
        "recipe": "jpg"
      }
    }
  },
  "dynamicTasksJSON": [
    {
      "name": "encode_task",
      "taskReferenceName": "forkedTask1",
      "type": "SIMPLE"
    },
    {
      "name": "encode_task",
      "taskReferenceName": "forkedTask2",
      "type": "SIMPLE"
    }
  ]
}
执行时,动态fork任务将调度两个并行任务,类型为“encode_task”,引用名称为“forkedTask1”和“forkedTask2”,由_ dynamicTasksInputJSON_指定

任务参考名称列表,其中JOIN将等待完成。

Example

{
    "joinOn": ["taskRef1", "taskRef3"]
}

Join Task Output

fork任务的输出将是一个JSON对象,其中key是任务别名,值是fork任务的输出。

 

子工作流任务:

Sub Workflow任务允许在另一个工作流中嵌套工作流。

参数:

subWorkflowParam:任务参考名称列表,其中JOIN将等待完成。

Example

{
  "name": "sub_workflow_task",
  "taskReferenceName": "sub1",
  "inputParameters": {
    "requestId": "${workflow.input.requestId}",
    "file": "${encode.output.location}"
  },
  "type": "SUB_WORKFLOW",
  "subWorkflowParam": {
    "name": "deployment_workflow",
    "version": 1
  }
}

可以使用以下API检索服务器用于更新任务状态的SQS队列:
GET /queue

示例SQS有效负载:

{
  "some_key": "valuex",
  "externalId": "{\"taskRefName\":\"TASK_REFERENCE_NAME\",\"workflowId\":\"WORKFLOW_ID\"}"
}


HTTP任务:

response的JSON主体(如果存在)
headers: Response Headers
statusCode: Integer status code

Example

Task Input payload using vipAddress

{
  "http_request": {
    "vipAddress": "examplevip-prod",
    "uri": "/",
    "method": "GET",
    "accept": "text/plain"
  }
}

Task Input using an absolute URL

{
  "http_request": {
    "uri": "http://example.com/",
    "method": "GET",
    "accept": "text/plain"
  }
}

生产的事件的合格名称。 例如 conductor或sqs:sqs_queue_name

Example

{
    "sink": 'sqs:example_sqs_queue_name'
}
当使用conductor作为接收器生成事件时,事件名称遵循以下结构:conductor:<workflow_name>:<task_reference_name>
对于SQS,使用队列的名称,而不是URI。 conductor根据名称查找URI。

Supported Sinks

  • Conductor
  • SQS

事件任务输入:

给予事件任务的输入作为有效载荷提供给发布的消息。 例如 如果将消息放入SQS队列(sink is sqs)中,则消息有效内容将作为任务的输入。

时间任务输出:

event_produced生成的事件的名称。


 

 

 


相关文章:

  • 2021-09-18
  • 2021-12-29
  • 2021-04-26
  • 2021-10-15
猜你喜欢
  • 2021-12-24
  • 2021-08-17
  • 2021-06-16
  • 2022-12-23
  • 2021-04-30
  • 2021-07-19
相关资源
相似解决方案