【问题标题】:Fast calculation of Pareto front in PythonPython中帕累托前沿的快速计算
【发布时间】:2015-09-25 23:12:35
【问题描述】:

我在 3D 空间中有一组点,我需要从中找到帕累托边界。执行速度在这里非常重要,随着我添加测试点,时间增加得非常快。

点集如下所示:

[[0.3296170319979843, 0.0, 0.44472108843537406], [0.3296170319979843,0.0, 0.44472108843537406], [0.32920760896951373, 0.0, 0.4440408163265306], [0.32920760896951373, 0.0, 0.4440408163265306], [0.33815192743764166, 0.0, 0.44356462585034007]]

现在,我正在使用这个算法:

def dominates(row, candidateRow):
    return sum([row[x] >= candidateRow[x] for x in range(len(row))]) == len(row) 

def simple_cull(inputPoints, dominates):
    paretoPoints = set()
    candidateRowNr = 0
    dominatedPoints = set()
    while True:
        candidateRow = inputPoints[candidateRowNr]
        inputPoints.remove(candidateRow)
        rowNr = 0
        nonDominated = True
        while len(inputPoints) != 0 and rowNr < len(inputPoints):
            row = inputPoints[rowNr]
            if dominates(candidateRow, row):
                # If it is worse on all features remove the row from the array
                inputPoints.remove(row)
                dominatedPoints.add(tuple(row))
            elif dominates(row, candidateRow):
                nonDominated = False
                dominatedPoints.add(tuple(candidateRow))
                rowNr += 1
            else:
                rowNr += 1

        if nonDominated:
            # add the non-dominated point to the Pareto frontier
            paretoPoints.add(tuple(candidateRow))

        if len(inputPoints) == 0:
            break
    return paretoPoints, dominatedPoints

在这里找到:http://code.activestate.com/recipes/578287-multidimensional-pareto-front/

在一组解中找到一组非支配解的最快方法是什么?或者,简而言之,Python 能比这个算法做得更好吗?

【问题讨论】:

    标签: python numpy


    【解决方案1】:

    如果您担心实际速度,您肯定想使用 numpy(因为巧妙的算法调整可能比使用数组操作获得的收益要小得多)。以下是三种解决方案,它们都计算相同的函数。 is_pareto_efficient_dumb 解决方案在大多数情况下速度较慢,但​​随着成本数量的增加而变得更快,is_pareto_efficient_simple 解决方案在许多方面比哑解决方案效率更高,最终的is_pareto_efficient 函数可读性较差但最快(所以所有都是帕累托有效的!)。

    import numpy as np
    
    
    # Very slow for many datapoints.  Fastest for many costs, most readable
    def is_pareto_efficient_dumb(costs):
        """
        Find the pareto-efficient points
        :param costs: An (n_points, n_costs) array
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            is_efficient[i] = np.all(np.any(costs[:i]>c, axis=1)) and np.all(np.any(costs[i+1:]>c, axis=1))
        return is_efficient
    
    
    # Fairly fast for many datapoints, less fast for many costs, somewhat readable
    def is_pareto_efficient_simple(costs):
        """
        Find the pareto-efficient points
        :param costs: An (n_points, n_costs) array
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            if is_efficient[i]:
                is_efficient[is_efficient] = np.any(costs[is_efficient]<c, axis=1)  # Keep any point with a lower cost
                is_efficient[i] = True  # And keep self
        return is_efficient
    
    
    # Faster than is_pareto_efficient_simple, but less readable.
    def is_pareto_efficient(costs, return_mask = True):
        """
        Find the pareto-efficient points
        :param costs: An (n_points, n_costs) array
        :param return_mask: True to return a mask
        :return: An array of indices of pareto-efficient points.
            If return_mask is True, this will be an (n_points, ) boolean array
            Otherwise it will be a (n_efficient_points, ) integer array of indices.
        """
        is_efficient = np.arange(costs.shape[0])
        n_points = costs.shape[0]
        next_point_index = 0  # Next index in the is_efficient array to search for
        while next_point_index<len(costs):
            nondominated_point_mask = np.any(costs<costs[next_point_index], axis=1)
            nondominated_point_mask[next_point_index] = True
            is_efficient = is_efficient[nondominated_point_mask]  # Remove dominated points
            costs = costs[nondominated_point_mask]
            next_point_index = np.sum(nondominated_point_mask[:next_point_index])+1
        if return_mask:
            is_efficient_mask = np.zeros(n_points, dtype = bool)
            is_efficient_mask[is_efficient] = True
            return is_efficient_mask
        else:
            return is_efficient
    

    分析测试(使用从正态分布中提取的点):

    10,000 个样本,2 个成本:

    is_pareto_efficient_dumb: Elapsed time is 1.586s
    is_pareto_efficient_simple: Elapsed time is 0.009653s
    is_pareto_efficient: Elapsed time is 0.005479s
    

    1,000,000 个样本,2 个成本:

    is_pareto_efficient_dumb: Really, really, slow
    is_pareto_efficient_simple: Elapsed time is 1.174s
    is_pareto_efficient: Elapsed time is 0.4033s
    

    10,000 个样本,15 个成本:

    is_pareto_efficient_dumb: Elapsed time is 4.019s
    is_pareto_efficient_simple: Elapsed time is 6.466s
    is_pareto_efficient: Elapsed time is 6.41s
    

    请注意,如果您担心效率问题,您可以通过预先重新排序数据来获得 2 倍左右的速度提升,请参阅 here

    【讨论】:

    • 哇,我错过了,谢谢彼得!我不太确定我得到了成本数组,你能提供一个简短的例子吗?再次感谢,这看起来很棒。
    • 成本数组只是一个二维数组,其中 cost[i, j] 是第 i 个数据点的第 j 个成本。我认为它与您的 inputPoints 数组相同。你可以看到tests here,它演示了它的使用。
    • 我用一个简单的例子对其进行了测试,前两个函数不返回帕累托前沿。示例:numpy.array([[1,2], [3,4], [2,1], [1,1]]) 它返回以下内容:[ True False True True] 但它应该按照帕累托前面的定义返回:[ False False False True]
    • 对 def is_pareto_efficient(costs) 的修复:看起来像这样(替换提到的行):is_efficient[is_efficient] = np.any(costs[is_efficient]&lt;c, axis=1) # Remove dominated pointsis_efficient[i] = True
    • 你能分享关于这些算法的科学论文的参考吗?
    【解决方案2】:

    2019 年 8 月更新

    这是另一个简单的实现,对于适度的尺寸来说非常快。假设输入点是唯一的。

    def keep_efficient(pts):
        'returns Pareto efficient row subset of pts'
        # sort points by decreasing sum of coordinates
        pts = pts[pts.sum(1).argsort()[::-1]]
        # initialize a boolean mask for undominated points
        # to avoid creating copies each iteration
        undominated = np.ones(pts.shape[0], dtype=bool)
        for i in range(pts.shape[0]):
            # process each point in turn
            n = pts.shape[0]
            if i >= n:
                break
            # find all points not dominated by i
            # since points are sorted by coordinate sum
            # i cannot dominate any points in 1,...,i-1
            undominated[i+1:n] = (pts[i+1:] >= pts[i]).any(1) 
            # keep points undominated so far
            pts = pts[undominated[:n]]
        return pts
    

    我们首先根据坐标总和对点进行排序。这很有用,因为

    • 对于许多数据分布,具有最大坐标和的点将支配大量点。
    • 如果点x的坐标和大于点y,则y不能支配x

    这里有一些与 Peter 的回答相关的基准,使用 np.random.randn

    N=10000 d=2
    
    keep_efficient
    1.31 ms ± 11.6 µs per loop (mean ± std. dev. of 7 runs, 1000 loops each)
    is_pareto_efficient
    6.51 ms ± 23.9 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    
    
    N=10000 d=3
    
    keep_efficient
    2.3 ms ± 13.3 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    is_pareto_efficient
    16.4 ms ± 156 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    
    
    N=10000 d=4
    
    keep_efficient
    4.37 ms ± 38.4 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    is_pareto_efficient
    21.1 ms ± 115 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    
    
    N=10000 d=5
    
    keep_efficient
    15.1 ms ± 491 µs per loop (mean ± std. dev. of 7 runs, 100 loops each)
    is_pareto_efficient
    110 ms ± 1.01 ms per loop (mean ± std. dev. of 7 runs, 10 loops each)
    
    
    N=10000 d=6
    
    keep_efficient
    40.1 ms ± 211 µs per loop (mean ± std. dev. of 7 runs, 10 loops each)
    is_pareto_efficient
    279 ms ± 2.54 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    
    
    N=10000 d=15
    
    keep_efficient
    3.92 s ± 125 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    is_pareto_efficient
    5.88 s ± 74.3 ms per loop (mean ± std. dev. of 7 runs, 1 loop each)
    

    凸壳启发式

    我最近最终研究了这个问题,并发现了一个有用的启发式方法,如果有很多独立分布的点并且维度很少,它会很好地工作。

    这个想法是计算点的凸包。由于维度少且点独立分布,凸包的顶点数会很少。直观地说,我们可以预期凸包的一些顶点支配许多原始点。此外,如果凸包中的一个点不受凸包中任何其他点的支配,那么它也不受原始集合中的任何点支配。

    这给出了一个简单的迭代算法。我们反复

    1. 计算凸包。
    2. 从凸包中保存 Pareto 未支配点。
    3. 过滤点以去除那些由凸包元素支配的点。

    我为维度 3 添加了一些基准。似乎对于某些点分布,这种方法产生了更好的渐近性。

    import numpy as np
    from scipy import spatial
    from functools import reduce
    
    # test points
    pts = np.random.rand(10_000_000, 3)
    
    
    def filter_(pts, pt):
        """
        Get all points in pts that are not Pareto dominated by the point pt
        """
        weakly_worse   = (pts <= pt).all(axis=-1)
        strictly_worse = (pts < pt).any(axis=-1)
        return pts[~(weakly_worse & strictly_worse)]
    
    
    def get_pareto_undominated_by(pts1, pts2=None):
        """
        Return all points in pts1 that are not Pareto dominated
        by any points in pts2
        """
        if pts2 is None:
            pts2 = pts1
        return reduce(filter_, pts2, pts1)
    
    
    def get_pareto_frontier(pts):
        """
        Iteratively filter points based on the convex hull heuristic
        """
        pareto_groups = []
    
        # loop while there are points remaining
        while pts.shape[0]:
            # brute force if there are few points:
            if pts.shape[0] < 10:
                pareto_groups.append(get_pareto_undominated_by(pts))
                break
    
            # compute vertices of the convex hull
            hull_vertices = spatial.ConvexHull(pts).vertices
    
            # get corresponding points
            hull_pts = pts[hull_vertices]
    
            # get points in pts that are not convex hull vertices
            nonhull_mask = np.ones(pts.shape[0], dtype=bool)
            nonhull_mask[hull_vertices] = False
            pts = pts[nonhull_mask]
    
            # get points in the convex hull that are on the Pareto frontier
            pareto   = get_pareto_undominated_by(hull_pts)
            pareto_groups.append(pareto)
    
            # filter remaining points to keep those not dominated by
            # Pareto points of the convex hull
            pts = get_pareto_undominated_by(pts, pareto)
    
        return np.vstack(pareto_groups)
    
    # --------------------------------------------------------------------------------
    # previous solutions
    # --------------------------------------------------------------------------------
    
    def is_pareto_efficient_dumb(costs):
        """
        :param costs: An (n_points, n_costs) array
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            is_efficient[i] = np.all(np.any(costs>=c, axis=1))
        return is_efficient
    
    
    def is_pareto_efficient(costs):
        """
        :param costs: An (n_points, n_costs) array
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            if is_efficient[i]:
                is_efficient[is_efficient] = np.any(costs[is_efficient]<=c, axis=1)  # Remove dominated points
        return is_efficient
    
    
    def dominates(row, rowCandidate):
        return all(r >= rc for r, rc in zip(row, rowCandidate))
    
    
    def cull(pts, dominates):
        dominated = []
        cleared = []
        remaining = pts
        while remaining:
            candidate = remaining[0]
            new_remaining = []
            for other in remaining[1:]:
                [new_remaining, dominated][dominates(candidate, other)].append(other)
            if not any(dominates(other, candidate) for other in new_remaining):
                cleared.append(candidate)
            else:
                dominated.append(candidate)
            remaining = new_remaining
        return cleared, dominated
    
    # --------------------------------------------------------------------------------
    # benchmarking
    # --------------------------------------------------------------------------------
    
    # to accomodate the original non-numpy solution
    pts_list = [list(pt) for pt in pts]
    
    import timeit
    
    # print('Old non-numpy solution:s\t{}'.format(
        # timeit.timeit('cull(pts_list, dominates)', number=3, globals=globals())))
    
    print('Numpy solution:\t{}'.format(
        timeit.timeit('is_pareto_efficient(pts)', number=3, globals=globals())))
    
    print('Convex hull heuristic:\t{}'.format(
        timeit.timeit('get_pareto_frontier(pts)', number=3, globals=globals())))
    

    结果

    # >>= python temp.py # 1,000 points
    # Old non-numpy solution:      0.0316428339574486
    # Numpy solution:              0.005961259012110531
    # Convex hull heuristic:       0.012369581032544374
    # >>= python temp.py # 1,000,000 points
    # Old non-numpy solution:      70.67529802105855
    # Numpy solution:              5.398462114972062
    # Convex hull heuristic:       1.5286884519737214
    # >>= python temp.py # 10,000,000 points
    # Numpy solution:              98.03680767398328
    # Convex hull heuristic:       10.203076395904645
    

    原帖

    我尝试通过一些调整来重写相同的算法。我认为您的大部分问题都来自inputPoints.remove(row)。这需要搜索点列表;按索引删除会更有效。 我还修改了dominates 函数以避免一些冗余比较。这在更高维度上可能会很方便。

    def dominates(row, rowCandidate):
        return all(r >= rc for r, rc in zip(row, rowCandidate))
    
    def cull(pts, dominates):
        dominated = []
        cleared = []
        remaining = pts
        while remaining:
            candidate = remaining[0]
            new_remaining = []
            for other in remaining[1:]:
                [new_remaining, dominated][dominates(candidate, other)].append(other)
            if not any(dominates(other, candidate) for other in new_remaining):
                cleared.append(candidate)
            else:
                dominated.append(candidate)
            remaining = new_remaining
        return cleared, dominated
    

    【讨论】:

    • 谢谢,我正在尝试。知道它与这里的第一个答案相比如何:stackoverflow.com/questions/21294829/…
    • 感谢您的回答。知道如何在作为帕累托边界返回的点集中选择最佳点也很不错。此外,可能有一些目标需要最小化,一些需要最大化,还有一些需要保持不变。
    • @Nav 我不确定我是否完全遵循您的建议。对于帕累托前沿中的每个点,都可以写下一个目标函数 s.t.那一点是“最好的”。你在考虑多目标优化吗?这不是我对这个答案的想法。
    • 是的;多目标优化。我最终意识到我可以只使用你的函数,然后使用 K-Means 计算帕累托最优点的质心来找到近似值。 “最佳”点。另外,由于min(f(x)) = - max(-f(x)),我可以使用您的函数并简单地否定我想要最小化的数字列。
    • 2019 年 8 月更新中的代码似乎无法正常工作 - 返回的集合不是有效边界。例如,对于这组 (gist.github.com/vfilimonov/4e368a7a6235ae2aff8e9c47a569e974),帕累托边界的许多点都丢失了
    【解决方案3】:

    彼得,很好的回应。

    我只是想概括一下那些想要在最大化和默认最小化之间进行选择的人。这是一个微不足道的修复,但很高兴在这里记录:

    def is_pareto(costs, maximise=False):
        """
        :param costs: An (n_points, n_costs) array
        :maximise: boolean. True for maximising, False for minimising
        :return: A (n_points, ) boolean array, indicating whether each point is Pareto efficient
        """
        is_efficient = np.ones(costs.shape[0], dtype = bool)
        for i, c in enumerate(costs):
            if is_efficient[i]:
                if maximise:
                    is_efficient[is_efficient] = np.any(costs[is_efficient]>=c, axis=1)  # Remove dominated points
                else:
                    is_efficient[is_efficient] = np.any(costs[is_efficient]<=c, axis=1)  # Remove dominated points
        return is_efficient
    

    【讨论】:

      【解决方案4】:

      dominates 的定义不正确。 A 支配 B 当且仅当它在所有维度上都优于或等于 B,并且至少在一个维度上严格优于 B。

      【讨论】:

        【解决方案5】:

        我在这里可能有点晚了,但我尝试了建议的解决方案,似乎它们未能返回所有 Pareto 点。我已经做了一个递归实现(这明显更快)来找到帕累托前沿,你可以在https://github.com/Ragheb2464/preto-front找到它

        【讨论】:

        • 请提供反例
        • 不错!你的解决方案对我有用,其他所有的都没有(我不知道如何检索成本数组)
        【解决方案6】:

        更正上一篇文章中发现的一个错误,这里是新版本的keep_efficient函数。

        def keep_efficient(pts):
            'returns Pareto efficient row subset of pts'
            # sort points by decreasing sum of coordinates
            pts = pts[pts.sum(1).argsort()[::-1]]
            # initialize a boolean mask for undominated points
            # to avoid creating copies each iteration
            undominated = np.ones(pts.shape[0], dtype=bool)
            for i in range(pts.shape[0]):
                # process each point in turn
                n = pts.shape[0]
                if i >= n:
                    break
                # find all points not dominated by i
                # since points are sorted by coordinate sum
                # i cannot dominate any points in 1,...,i-1
                undominated[i+1:n] = (pts[i+1:] >= pts[i]).any(1) 
                # keep points undominated so far
                pts = pts[undominated[:n]]
                undominated = np.array([True]*len(pts))
        
        return pts
        

        (请注意,上一篇文章中的错误是函数 keep_efficient(pts) 返回了错误的帕累托前沿,输入为:pts = [[5,5],[4,3], [0,6]]。在编辑之前,结果是 [5,5] 而预期结果是 [[5 5], [0 6]]。修复是添加 for 循环的最后一行: undowned = np.array([True ]*len(pts)))

        【讨论】:

          【解决方案7】:

          为了清楚上面的例子,获取 Pareto-front 的函数与上面的代码略有不同,应该只包含一个

          def is_pareto(costs):
              is_efficient = np.ones(costs.shape[0], dtype=bool)
          
              for i, c in enumerate(is_efficient):
                  if is_efficient[i]:
                     is_efficient[is_efficient] = np.any(costs[is_efficient]<c, axis=1) 
          
              return is_efficient
          

          免责声明:这只是部分正确,因为统治本身被定义为

          【讨论】:

            猜你喜欢
            • 1970-01-01
            • 2014-08-24
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 1970-01-01
            • 2018-08-18
            相关资源
            最近更新 更多