【问题标题】:Using Amazon SWF To communicate between servers使用 Amazon SWF 在服务器之间进行通信
【发布时间】:2013-01-27 13:59:34
【问题描述】:

使用 Amazon SWF 在服务器之间传递消息?

  1. 我想在服务器 A 上运行脚本 A
  2. 完成后,我想向服务器 B 发送消息以运行脚本 B
  3. 如果成功完成,我希望它从工作流队列中清除作业

我很难弄清楚如何结合使用 Boto 和 SWF 来执行此操作。我不追求一些完整的代码,但我追求的是是否有人可以解释更多关于所涉及的内容。

  • 我实际上如何告诉服务器 B 检查脚本是否完成 一种?
  • 如何确保服务器 A 不会收到脚本的完成 A 并尝试运行脚本 B(因为服务器 B 应该运行它)?
  • 如何实际通知 SWF 脚本 A 完成?你是一面旗帜,还是一面 消息,还是什么?

我对这一切感到很困惑。我应该使用什么设计?

【问题讨论】:

    标签: python linux amazon-web-services boto amazon-swf


    【解决方案1】:

    您可以使用 SNS, 当脚本 A 完成时,它应该触发 SNS,这将触发到服务器 B 的通知

    【讨论】:

    • 遗憾的是,SNS 没有我需要的功能
    【解决方案2】:

    我没有任何示例代码要分享,但您绝对可以使用 SWF 来协调跨两台服务器执行脚本。这样做的主要思想是创建与 SWF 对话的三段代码:

    • 一个组件,它知道首先执行哪个脚本,以及在第一个脚本执行完成后要做什么。这在 SWF 术语中称为“决定者”。
    • 两个组件各自了解如何执行您希望在每台计算机上运行的特定脚本。这些在 SWF 术语中称为“活动工作者”。

    第一个组件(决策程序)调用两个 SWF API:PollForDecisionTask 和 RespondDecisionTaskCompleted。轮询请求将为决策程序组件提供执行工作流的当前历史记录,基本上是脚本运行器的“我在哪里”状态信息。您编写代码来查看这些事件并确定应该执行哪个脚本。这些执行脚本的“命令”将以活动任务调度的形式出现,该活动任务作为调用 RespondDecisionTaskCompleted 的一部分返回。

    您编写的第二个组件,即活动工作者,每个都调用两个 SWF API:PollForActivityTask 和 RespondActivityTaskCompleted。轮询请求将给活动工作者一个指示,表明它应该执行它知道的脚本,SWF 称之为活动任务。从轮询请求返回到 SWF 的信息可以包括作为活动任务调度的一部分发送到 SWF 的单个执行特定数据。您的每台服务器都将独立轮询 SWF 以获取活动任务,以指示在该主机上执行本地脚本。 Worker 执行完脚本后,它会通过 RespondActivityTaskCompleted API 回调 SWF。

    从您的活动工作者到 SWF 的回调导致将新的历史记录分发给我已经提到的决策器组件。它将查看历史记录,看到第一个脚本已完成,并安排第二个脚本执行。一旦它看到第二个已经完成,它就可以使用另一种类型的决策“关闭”工作流。

    您可以通过调用 StartWorkflowExecution API 来启动在每个主机上执行脚本的整个过程。这会在 SWF 中创建整个进程的记录,并将第一个历史记录踢出到决策进程,以安排第一个脚本在第一台主机上的执行。

    希望这能提供更多关于如何使用 SWF 完成此类工作流的背景信息。如果您还没有,我会查看 SWF 页面上的开发指南以获取更多信息。

    【讨论】:

      【解决方案3】:

      我认为您提出了一些非常好的问题,这些问题突出了 SWF 作为一项服务的帮助。简而言之,您不会告诉您的服务器在它们之间协调工作。在 SWF 服务的帮助下,您的决策者会为您安排这一切。

      您的工作流程的实施如下:

      1. 向服务注册您的工作流及其活动(一次性)。
      2. 实施决策者和工作人员。
      3. 让您的工作人员和决策者运行。
      4. 开始新的工作流程。

      有多种方法可以将凭据输入 boto.swf 的代码。出于本练习的目的,我建议在运行以下代码之前将它们导出到环境中:

      export AWS_ACCESS_KEY_ID=<your access key>
      export AWS_SECRET_ACCESS_KEY=<your secret key>
      

      1) 要注册域、工作流和活动,请执行以下操作:

      # ab_setup.py
      import boto.swf.layer2 as swf
      
      DOMAIN = 'stackoverflow'
      ACTIVITY1 = 'ServerAActivity'
      ACTIVITY2 = 'ServerBActivity'
      VERSION = '1.0'
      
      swf.Domain(name=DOMAIN).register()
      swf.ActivityType(domain=DOMAIN, name=ACTIVITY1, version=VERSION, task_list='a_tasks').register()
      swf.ActivityType(domain=DOMAIN, name=ACTIVITY2, version=VERSION, task_list='b_tasks').register()
      swf.WorkflowType(domain=DOMAIN, name='MyWorkflow', version=VERSION, task_list='default_tasks').register()
      

      2) 实施和运行决策者和工作人员。

      # ab_decider.py
      import time
      import boto.swf.layer2 as swf
      
      DOMAIN = 'stackoverflow'
      ACTIVITY1 = 'ServerAActivity'
      ACTIVITY2 = 'ServerBActivity'
      VERSION = '1.0'
      
      class ABDecider(swf.Decider):
      
          domain = DOMAIN
          task_list = 'default_tasks'
          version = VERSION
      
          def run(self):
              history = self.poll()
              # Print history to familiarize yourself with its format.
              print history
              if 'events' in history:
                  # Get a list of non-decision events to see what event came in last.
                  workflow_events = [e for e in history['events']
                                     if not e['eventType'].startswith('Decision')]
                  decisions = swf.Layer1Decisions()
                  # Record latest non-decision event.
                  last_event = workflow_events[-1]
                  last_event_type = last_event['eventType']
                  if last_event_type == 'WorkflowExecutionStarted':
                      # At the start, get the worker to fetch the first assignment.
                      decisions.schedule_activity_task('%s-%i' % (ACTIVITY1, time.time()),
                         ACTIVITY1, VERSION, task_list='a_tasks')
                  elif last_event_type == 'ActivityTaskCompleted':
                      # Take decision based on the name of activity that has just completed.
                      # 1) Get activity's event id.
                      last_event_attrs = last_event['activityTaskCompletedEventAttributes']
                      completed_activity_id = last_event_attrs['scheduledEventId'] - 1
                      # 2) Extract its name.
                      activity_data = history['events'][completed_activity_id]
                      activity_attrs = activity_data['activityTaskScheduledEventAttributes']
                      activity_name = activity_attrs['activityType']['name']
                      # 3) Optionally, get the result from the activity.
                      result = last_event['activityTaskCompletedEventAttributes'].get('result')
      
                      # Take the decision.
                      if activity_name == ACTIVITY1:
                          # Completed ACTIVITY1 just came in. Kick off ACTIVITY2.
                          decisions.schedule_activity_task('%s-%i' % (ACTIVITY2, time.time()),
                              ACTIVITY2, VERSION, task_list='b_tasks', input=result)
                      elif activity_name == ACTIVITY2:
                          # Server B completed activity. We're done.
                          decisions.complete_workflow_execution()
      
                  self.complete(decisions=decisions)
                  return True
      

      worker 简单多了,不想用继承就不用了。

      # ab_worker.py
      import os
      import time
      import boto.swf.layer2 as swf
      
      DOMAIN = 'stackoverflow'
      ACTIVITY1 = 'ServerAActivity'
      ACTIVITY2 = 'ServerBActivity'
      VERSION = '1.0'
      
      class MyBaseWorker(swf.ActivityWorker):
      
          domain = DOMAIN
          version = VERSION
          task_list = None
      
          def run(self):
              activity_task = self.poll()
              print activity_task
              if 'activityId' in activity_task:
                  # Get input.
                  # Get the method for the requested activity.
                  try:
                      self.activity(activity_task.get('input'))
                  except Exception, error:
                      self.fail(reason=str(error))
                      raise error
      
                  return True
      
          def activity(self, activity_input):
              raise NotImplementedError
      
      class WorkerA(MyBaseWorker):
          task_list = 'a_tasks'
      
          def activity(self, activity_input):
              result = str(time.time())
              print 'worker a reporting time: %s' % result
              self.complete(result=result)
      
      class WorkerB(MyBaseWorker):
          task_list = 'b_tasks'
      
          def activity(self, activity_input):
              result = str(os.getpid())
              print 'worker b returning pid: %s' % result
              self.complete(result=result)
      

      3) 运行您的决策者和工作人员。 您的决策者和工作人员可能在不同的主机上运行,​​也可能在同一台机器上运行。打开四个终端并运行你的actors:

      首先是你的决策者

      $ python -i ab_decider.py 
      >>> while ABDecider().run(): pass
      ... 
      

      然后是工人 A,你可以从服务器 A 执行此操作:

      $ python -i ab_workers.py 
      >>> while WorkerA().run(): pass
      

      然后是工人 B,可能来自服务器 B,但如果你从笔记本电脑上运行它们,它也可以正常工作:

      $ python -i ab_workers.py 
      >>> while WorkerB().run(): pass
      ... 
      

      4) 最后,启动工作流程。

      $ python
      Python 2.6.5 (r265:79063, Apr 16 2010, 13:57:41) 
      [GCC 4.4.3] on linux2
      Type "help", "copyright", "credits" or "license" for more information.
      >>> import boto.swf.layer2 as swf
      >>> workflows = swf.Domain(name='stackoverflow').workflows()
      >>> workflows
      [<WorkflowType 'MyWorkflow-1.0' at 0xdeb1d0>]
      >>> execution = workflows[0].start(task_list='default_tasks')
      >>> 
      

      切换回来看看你的演员会发生什么。他们可能会在不活动一分钟后断开与服务的连接。如果发生这种情况,请按向上箭头+回车键重新进入轮询循环。

      您现在可以转到 AWS 管理控制台的 SWF 面板,查看执行情况并查看其历史记录。或者,您可以通过命令行查询它。

      >>> execution.history()
      [{'eventId': 1, 'eventType': 'WorkflowExecutionStarted', 
      'workflowExecutionStartedEventAttributes': {'taskList': {'name': 'default_tasks'}, 
      'parentInitiatedEventId': 0, 'taskStartToCloseTimeout': '300', 'childPolicy': 
      'TERMINATE', 'executionStartToCloseTimeout': '3600', 'workflowType': {'version': 
      '1.0', 'name': 'MyWorkflow'}}, 'eventTimestamp': 1361132267.5810001}, {'eventId': 2, 
      'eventType': 'DecisionTaskScheduled', 'decisionTaskScheduledEventAttributes': 
      {'startToCloseTimeout': '300', 'taskList': {'name': ...
      

      这只是一个连续执行活动的工作流示例,但决策者也可以发送至schedule and coordinate parallel execution of activities

      我希望这至少能让你开始。对于一个稍微复杂的串行工作流示例,我推荐looking at this

      【讨论】:

      • 非常感谢您,这是一个非常全面的回复。
      • @oozie- 很好的答案,很棒的课程。需要在 Boto 文档中-
      • 感谢您的精彩回答!很高兴看到它如此具体。不过我有一个问题:是什么迫使 workerA 只执行 Activity1 任务?是worker类中的task_list吗?
      • 是的。当工作人员轮询工作时,它会从特定任务列表中请求任务。
      • 截至昨晚,layer2 API 参考可在docs.pythonboto.org/en/latest/ref/…获得
      【解决方案4】:

      很好的例子,

      此外,如果您不想将凭据导出到环境,您可以在类中调用:

      swf.set_default_credentials(AWS_ACCESS_KEY_ID, AWS_SECRET_ACCESS_KEY) 
      

      【讨论】:

        猜你喜欢
        • 2013-01-08
        • 1970-01-01
        • 2013-08-31
        • 2018-08-29
        • 2011-01-10
        • 1970-01-01
        • 1970-01-01
        • 1970-01-01
        • 2015-01-30
        相关资源
        最近更新 更多