【问题标题】:How can I run concurrent "tasks" in Python with SimPy, where each task is waiting for multiple resources?如何使用 SimPy 在 Python 中运行并发“任务”,其中每个任务都在等待多个资源?
【发布时间】:2020-03-02 00:46:45
【问题描述】:

我正在建模的系统具有需要以一系列任务形式进行维护的对象。目前在这个模型中,他们请求一个“工作地点”,然后一旦他们抓住了那个,他们就会请求他们完成第一个任务所需的必要的“工人”资源。任务是列表中的对象,是对其进行维护的对象的属性,并且一些任务网络允许并行完成多个任务。

就目前而言,我在 for 循环中迭代了任务列表,其中嵌套的 for 循环然后请求并抓住必要的“工人”,然后一旦所有“工人”在任务期间超时已被扣押。

        with self.location.request() as loc_req: # request work location
            yield loc_req # wait for work location

            for task in self.tasks[:]:
                t_duration = task.calc_duration(self)
                if t_duration == 0: # skip tasks where probability sets duration to 0
                    continue
                needs = task.resources[:]
                ## check if available workers are useful; if not, release them
                task_cur_resources = []
                for res,req in self.resources['available'].copy():
                    if res.worker_id in needs:
                        self.resources['busy'].add((res,req))
                        task_cur_resources.append((res,req))
                        needs.remove(res.worker_id)
                    else:
                        res.release(req)
                    self.resources['available'].remove((res,req))
                ## acquire all resources needed for task
                for need in needs[:]:
                    priority = len(needs) # prioritize tasks closer to meeting needs
                    res = self.location.workers[need] # set resource to worker of type need
                    req = res.request(priority) # save the request object
                    yield req # wait for resource
                    ## stash resource and request in order to release later
                    task_cur_resources.append((res, req))
                    self.resources['busy'].add((res,req))
                    needs.remove(res.worker_id)

                ## perform task with task duration timeout
                yield self.env.process(task.perform(self, t_duration))

                ## make resources available
                for worker in task_cur_resources: 
                    self.resources['busy'].remove(worker)
                    self.resources['available'].add(worker)

            for res,req in self.resources['available']:
                res.release(req)
            self.resources['available'] = set()

问题是这不允许并发任务完成。这些任务是按基于输入参数的正态分布持续时间顺序完成的。我如何更改此设置以允许在其前任完成且工作人员可用时完成任务?我尝试了一个 while 循环,该循环遍历任务列表和已完成其前任的计划任务,但由于我明显滥用 SimPy 和产量,我一直以无限循环结束。有什么想法吗?

【问题讨论】:

    标签: python concurrency simulation modeling simpy


    【解决方案1】:

    env.process() 创建一个事件

    诀窍是将这些事件收集到一个列表中,然后使用 env.all_of 来处理该事件列表。还有 env.any_of

    这是一个例子

    """
    Demonstrates how to grab some resources concurrently, and then do some tasks concurently
    
    programer Michael R. Gibbs
    """
    
    import simpy
    
    class Asset():
        """
        asset that needs some maintenace
        creating the asset starts the assets processes
    
        to keep thins simple, I modeled  just the maintenace task and skipped a prime task that would 
        get interuppted when it was time to do maintence
        """
        def __init__(self, env, id, maintTasks):
            """
            store attributes, kick opp mantence process
            """
            self.id = id
            self.env = env
            self.maintTasks = maintTasks
    
            self.env.process(self.do_maintence())
    
        def do_maintence(self):
            """
            waits till time to do maintence
            grabs a work location resource
            do each maintence task
            each maintaince task has a list of resouces that are grabbed at the same time
            and once they have all been seized
            then the maintence task's list of processes are all kicked off at the same time
            affter all the process are finish, then all the resources are released
            """
    
            yield self.env.timeout(3)
    
            print(f'{self.env.now} object {self.id} is starting maintense')
    
            # grab work location
            with workLocRes.request() as workReq:
                yield workReq
                print(f'{self.env.now} object {self.id} got work loc, starting tasks')
    
                # for each maintTask, get the list of resources, and tasks
                for res, tasks in self.maintTasks:
                    print(f'{self.env.now} -- object {self.id} start loop tasks')
    
                    # get the requests for the needed resources
                    print(f'{self.env.now} object {self.id} res 1: {res1.count}, res 2 {res2.count}')
                    resList = []
                    for r in res:
                        req = r.request()
                        req.r = r  # save the resource in the request
                        resList.append(req)
                    # one yield that waits for all the requests
                    yield self.env.all_of(resList)
                    print(f'{self.env.now} object {self.id} res 1: {res1.count}, res 2 {res2.count}')
    
                    # start all the tasks and save the events
                    taskList = []
                    for t in tasks:
                        taskList.append(self.env.process(t(self.env, self.id)))
                    #one yield that waits for all the processes to finish
                    yield self.env.all_of(taskList)
    
                    for r in resList:
                        r.r.release(r)
                    print(f'{self.env.now} object {self.id} res 1: {res1.count}, res 2 {res2.count}')
                    print(f'{self.env.now} -- object {self.id} finish loop tasks')
    
                print(f'{self.env.now} object {self.id} finish all tasks')
    
    
    # some processes for a maintence task
    def task1(env, obj_id):
        print(f'{env.now} starting task 1 for object {obj_id}')
        yield env.timeout(3)
        print(f'{env.now} finish task 1 for object {obj_id}')
    
    def task2(env, obj_id):
        print(f'{env.now} starting task 2 for object {obj_id}')
        yield env.timeout(3)
        print(f'{env.now} finish task 2 for object {obj_id}')
        
    def task3(env, obj_id):
        print(f'{env.now} starting task 3 for object {obj_id}')
        yield env.timeout(3)
        print(f'{env.now} finish task 3 for object {obj_id}')
        
    
    env = simpy.Environment()
    
    workLocRes = simpy.Resource(env, capacity=3)
    res1 = simpy.Resource(env, capacity=4)
    res2 = simpy.Resource(env, capacity=5)
    
    # build the maintence task with nested list of resources, and nested processes
    maintTask = []
    maintTask.append(([res1],[task1]))
    maintTask.append(([res1,res2],[task2,task3]))
    
    # creating asset also starts it
    a = Asset(env,1,maintTask)
    
    env.run(20)
    

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 1970-01-01
      • 2017-07-08
      • 1970-01-01
      • 2016-06-04
      • 1970-01-01
      • 1970-01-01
      相关资源
      最近更新 更多