【问题标题】:Searching the list using multiprocessing使用多处理搜索列表
【发布时间】:2018-06-28 01:35:44
【问题描述】:

这是一个示例代码,用于检查列表中是否存在项目并将其写入另一个 a 列表:

list_1 = [(0,1),(3,2),(1,5),(0,3),(3,7)]

a = []
for i in list_1:
    if i[0]==3:
       a.append(i)
       break

输出:

[(3, 2)]

当找到第一个满足条件的元素时终止循环。

假设list_1的列表很大,在这种情况下如何应用多处理?

【问题讨论】:

  • 我们所说的规模有多大?多处理在这里可以提供帮助,但尚不清楚是否需要大量增加复杂性。
  • 其实我越想越意识到这是一个数据结构问题而不是多处理问题。首先不要使用列表。
  • 通常会搜索来自list(itertools.product (tuple (range (64)), repeat = 4)) 或更大的值。
  • 如果第一个匹配停止搜索,这必须是顺序的,在这种情况下,多处理不是答案。如果是 any 第一场比赛,那就另当别论了。无论如何,我倾向于同意@wim,这更像是一个数据结构问题而不是多处理。
  • 在我的真实案例中,这不是关于数据结构的问题,而是在我的算法中存在一个更广泛的if条件形式的瓶颈,它结合一个大的搜索列表给出的结果很差。这就是为什么我只给出了一个简化的问题,这样没有人会不必要地深入了解代码的整体意义。

标签: python python-3.x for-loop multiprocessing


【解决方案1】:
Brake your big list into smaller lists
>>> big= [(0,1),(3,2),(1,5),(0,3),(3,7)]
>>> a=big[0:2]
>>> b=big[2:]
define a function to look for your tuple and return it or return None
>>> def doit(L):
        for i in L:
            if i[0]==3:
                return i
        return None
figure out how to create a new process and call it with each of the smaller lists
>>> doit(a)
(3, 2)
>>> doit(b)
(3, 7)
look at the tuples returned that are not None in order and choose the first one

【讨论】:

  • 酷而清晰。你现在可以并行doit (a)doit (b) 吗?并且如果其中一个返回非空值,它将破坏计算。因为这样我就不会赢得时间。我将不胜感激。
【解决方案2】:

@Marichyasana 是一个非常好的解决方案。我在回答您的问题并测试了多个池大小时有点过火了。代码大约有 114 行,您可以以此为基础进行偏离。

我正在测试 100,000 个样本列表的多个 Pool() 大小。我不认为我的代码非常“Pythonic”,任何反馈都会很棒。我用过这个example

结果:

进程数:3;处理时间为:00:00:01
长度 结果列表 1000

进程数:10;处理时间为:00:00:02
长度 结果列表 1000

进程数:20;处理时间为:00:00:09
长度 结果列表 1000

进程数:33;处理时间为:00:00:04
长度 结果列表 1000

代码:

# from multiprocessing import Queue
from multiprocessing import Pool
from random import randint
from Timer import Timer # custom timer wraper I wrote

def create_list_indexes(m_int_list_len):
    """
    this method creates as many evenly spaces segments for the list of data

    m_int_list_len
    type: int
    desc: length of the list of data; number of samples of data

    returns
    type: list
    desc: list of lists; indexes for data list
    list_return[x][0] -> type: int; low index
    list_return[x][1] -> type: int; high index
    """
    # segment length    
    int_seg_len = 100
    list_return = list()

    # get number of segments fo the list
    if m_int_list_len % int_seg_len == 0:
        int_num_seg = int(m_int_list_len / int_seg_len)
        bool_zero_mod = True
    else:
        int_num_seg = int(m_int_list_len / int_seg_len) + 1
        bool_zero_mod = False

    # create indexes of list
    for int_i in range(0, int_num_seg):        
        # check for zero mod
        if ~bool_zero_mod and int_i == int_num_seg - 1:
            int_low = int_i * int_seg_len       
            int_high = m_int_list_len
        else:
            int_low = int_i * int_seg_len
            int_high = int_low + int_seg_len - 1

        list_return.append([int_low, int_high])

    return list_return

def test_pools(m_tuple_args):
    """
    this method tests the different number of pools on a large list of data

    m_list_tasks
    type: list
    desc: list of tuples
    m_tuple_args[0] -> type: int; target to search for
    m_tuple_args[1] -> type: list; list of indexes
        m_tuple_args[1][0] -> type: int; low index
        m_tuple_args[1][1] -> type: int; high index    
    m_tuple_args[2] -> type: list; the data

    returns
    type: list
    desc: list of lists; samples which are lists of length 2
    """

    # unpack tuple for simplicity
    int_target = m_tuple_args[0]
    int_low, int_high = m_tuple_args[1]
    list_data = m_tuple_args[2]

    list_results = list()
    for list_sample in list_data[int_low:int_high]:
        if list_sample == int_target:
            list_results.append(list_sample)

    # return results
    return list_results

if __name__ == '__main__':
    # data structures for example
    list_data = list()
    list_proc = [3, 10, 20, 33]
    int_num_data = 100000
    int_max_int = 100
    int_target = 42

    # create random data list
    for int_i in range(0, int_num_data):
        list_data.append([randint(0, int_max_int),
                          randint(0, int_max_int)])

    # pools of different sizes to compare
    list_pools = [Pool(processes = x) for x in list_proc]

    # create indexes for list
    list_indexes = create_list_indexes(int_num_data)    

    # compare pools
    int_counter = 0
    for pool in list_pools:
        # create task list
        list_tasks = list()
        for int_i in range(0, len(list_indexes)):
            list_tasks.append((int_target, list_indexes[int_i], list_data))

        # test pool
        timer_pool = Timer()
        list_pool = pool.map(test_pools, list_tasks)
        string_pool = 'number of processes: ' + str(list_proc[int_counter])
        timer_pool.stop_timer(string_pool)
        print('length of results list', len(list_pool))
        print()

        # increment counter
        int_counter += 1

【讨论】:

  • 是的,同事接下来给出了一个很酷很清晰的解决方案。但我也重视对问题的不同看法和人的努力:)
  • @TomaszPrzemski 在我的解决方案中,您可以使用list_final.extend(list_pool[0]) 的列表。我忘记将结果格式化为元组列表,可以轻松修改。结果,list_pool 是一个列表列表,因为 list_data 是如何根据段长度进行切片的。
猜你喜欢
  • 2017-06-08
  • 1970-01-01
  • 2012-12-30
  • 2020-11-17
  • 1970-01-01
  • 2016-06-14
  • 1970-01-01
  • 2013-09-10
  • 1970-01-01
相关资源
最近更新 更多