【问题标题】:Does Java LongAdder's increment() & sum() prevent getting the same value twice?Java LongAdder 的 increment() 和 sum() 是否会阻止两次获得相同的值?
【发布时间】:2017-12-29 09:21:42
【问题描述】:

目前我在我的应用程序中使用AtomicLong 作为同步计数器,但我发现它具有高并发/争用,例如由于明显的原因(例如并发 CAS),使用 8 个线程,我的吞吐量比单线程低得多(低 75%)。

用例: 一个计数器变量

  • 由多个线程同时更新
  • 具有高写入争用,基本上线程中的每次使用都由写入和之后立即读取组成
  • 要求是每次从计数器读取(在写入后立即)都获得一个唯一的递增值。 并不要求每个检索到的计数器值都以与不同线程(写入器)递增值的顺序相同的顺序递增。

所以我尝试将AtomicLong 替换为LongAdder,实际上,从我的测量结果来看,我的 8 线程吞吐量要好得多 - (仅)比单线程低 20%(相比 75% )。

但是我不确定我是否正确理解LongAdder 的工作方式。 JavaDoc 说:

当多线程时,这个类通常比 AtomicLong 更可取 更新用于收集等目的的共同金额 统计,不适用于细粒度的同步控制。

对于sum()

返回当前总和。返回的值不是原子快照; 在没有并发更新的情况下调用返回一个准确的 结果,但是在求和时发生的并发更新 计算可能不会被合并。

什么是细粒度同步控制... 通过查看this so question 以及AtomicLongStriped64 的来源,我想我理解如果AtomicLong 上的更新由于另一个线程发出的CAS 指令而被阻止,则更新存储在线程本地并在以后积累以获得一些最终的一致性。所以没有进一步的同步,因为LongAdder中的incrementAndGet()不是原子的,而是两条指令,我担心以下是可能的:

private static final LongAdder counter = new LongAdder(); // == 0
// no further synchronisation happening in java code
Thread#1 :  counter.increment();
Thread#2 :  counter.increment();  // CAS T#1 still ongoing, storing +1 thread-locally
Thread#2 :  counter.sum();       // == 1
Thread#3 :  counter.increment();  // CAS T#1 still ongoing, storing +1 thread-locally
Thread#3 :  counter.sum();       // == 1
Thread#1 :  counter.sum();       // == 3 (after merging everything)

如果可能,AtomicLong 并不真正适合我的用例,这可能算作“细粒度同步控制”。

然后用我的 write/read^n 模式我可能不能做得比 AtomicLong 更好?

【问题讨论】:

    标签: java multithreading thread-safety increment


    【解决方案1】:

    两种实现之间存在细微差别。

    AtomicLong 包含一个数字,每个线程都将尝试更新该数字。因此,正如您已经发现的那样,一次只有一个线程可以更新此值。不过,优势在于,当调用 get 时,该值将始终是最新的,因为那时没有正在进行的 adds。

    另一方面,LongAdder 由多个值组成,每个值将由线程的子集更新。这会在更新值时减少争用,但是如果在 add 正在进行时完成,sum 可能会有不完整的值,类似于您描述的场景。

    LongAdder 推荐用于您将并行执行一堆adds 后跟sum 的情况。对于您的用例,我编写了以下内容,确认大约十分之一的总和被重复(这使得 LongAdder 无法用于您的用例)。

    public static void main (String[] args) throws Exception
    {
        LongAdder adder = new LongAdder();
        ExecutorService executor = Executors.newFixedThreadPool(10);
        Map<Long, Integer> count = new ConcurrentHashMap<>();
        for (int i = 0; i < 10; i++)
        {
            executor.execute(() -> {
                for (int j = 0; j < 1000000; j++)
                {
                    adder.add(1);
                    count.merge(adder.longValue(), 1, Integer::sum);
                }
            });
        }
        executor.shutdown();
        executor.awaitTermination(1, TimeUnit.HOURS);
        count.entrySet().stream().filter(e -> e.getValue() > 1).forEach(System.out::println);
    }
    

    【讨论】:

    • get 被调用时,当然可以有adds “进行中”。事实上,可以有任意数量的线程同时执行add 方法(它不像方法是同步的)。如果您使用AtomicLong 代替LongCounter 运行相同的测试,并在merge() 行上保留longValue() 的单独读取,您会得到same 重复项。问题不在于“并发更新”,因为对任一类的两个单独调用永远不会是原子的。 AtomicLong 的不同之处在于它提供了 LongCounter 不提供的“复合操作”。
    【解决方案2】:

    LongAdder 绝对不适合您的唯一整数生成用例,但您无需了解实现或深入了解 java 内存模型的复杂性即可确定这一点。只需查看 API:它没有复合“增量和获取”类型方法,可以让您以原子方式递增值并取回旧/新值。

    在添加值方面,它只提供void add(long x)void increment() 方法,但这些方法不返回值。你提到:

    LongAdder 中的 incrementAndGet 不是原子的

    ...但我在LongAdder 中根本看不到incrementAndGet。你在看哪里?

    你的想法:

    • 线程中的使用将包括写入和之后立即读取
    • 要求是每次读取 从柜台(写完后立即)得到一个独特的 增值。不需要每个检索到的计数器 值以与不同的顺序相同的顺序增加 线程(作者)增加值。

    即使AtomicLong也不起作用,除非“写后读”是指调用incrementAndGet 方法。我认为不言而喻,如果没有一些外部锁定,对 AtomicLongLongAdder(或任何其他对象)的两个单独调用永远不会是原子的。

    所以在我看来,Java 文档有点令人困惑。是的,您不应该使用sum() 进行同步控制,并且是的“可能不会合并在计算总和时发生的并发更新”;但是,AtomicLong 及其 get() 方法也是如此调用get() 时发生的增量可能会或可能不会反映在get() 返回的值中。

    AtomicLong 相比,LongAdder 的一些保证较弱。使用AtomicLong 获得的一个保证是,一系列操作通过一系列特定的值转换对象,并且无法保证线程将看到什么特定值,所有值都应该来自真正的转换集价值观。

    例如,考虑从一个值为 0 的 AtomicLong 开始,两个线程同时递增它,分别递增 1 和 3。最终值将始终为 4,并且只有两个可能的转换路径是可能的:0 -&gt; 1 -&gt; 40 -&gt; 3 -&gt; 4。对于给定的执行,只有一个可能发生,并且所有并发读取都将与该执行保持一致。也就是说,如果任何线程读取1,则没有线程可以读取3,反之亦然(当然,不能保证任何线程都会看到1或@ 987654344@,他们都可能看到04

    LongCounter 不提供该保证。由于写入进程未锁定,而读取进程以非原子方式将多个值相加,因此在同一执行过程中,一个线程可能会看到1,而另一个线程可能会看到3。当然,它仍然不会合成“假”值——例如,您永远不应该读取“2”。

    现在这是一个微妙的概念,Javadoc 不能很好地理解它。取而代之的是,他们使用了一个非常薄弱且不是特别正式的声明。最后,我认为您不能通过纯增量(而不是加法)观察上述行为,因为那时只有一条路径:0 -&gt; 1 -&gt; 2 -&gt; 3 等。所以对于增量,我认为 AtomicLong.get()LongCounter.sum() 非常几乎相同的保证。

    有用的东西

    好的,所以我会给你一些可能有用的东西。你仍然可以高效地实现你想要的东西,只要你对每个线程获取的计数器值和它们被读取的顺序之间的确切关系没有严格的要求。

    重新利用 LongAdder 理念

    您可以使LongAdder 想法适用于独特的计数器生成。 LongAdder 的基本思想是将计数器分散到 N 个不同的计数器中(它们位于不同的缓存行上)。任何给定的调用都会根据当前线程 ID2 更新这些计数器中的 一个,并且读取需要对来自 所有 个计数器的值求和。这意味着写入的争用较少,但代价是复杂性更高,读取的成本也很高。

    现在按设计写入的方式不允许您读取完整的LongAdder 值,但由于您只想要一个唯一值,您可以使用相同的代码,除了顶部或底部 N 位3 为每个计数器设置唯一。

    现在写入可以返回先前的值,例如getAndIncrement,它将是唯一的,因为固定位使其在该对象的所有计数器中保持唯一。

    线程局部计数器

    一种非常快速且简单的方法是使用每个线程的唯一值和线程本地计数器。初始化线程本地时,它会从共享计数器(每个线程仅一次)获取唯一 ID,然后将该 ID 与线程本地计数器组合 - 例如,ID 的底部 24 位和本地计数器的前 40 位1。这应该非常快,更重要的是基本上零争用。

    缺点是计数器的值在线程之间不会有任何特定的关系(尽管它们可能仍然在线程内严格增加)。例如,最近请求计数器值的线程可能会获得比长期存在的值小得多的值。你还没有描述你将如何使用这些,所以我不知道这是否是一个问题。

    此外,您没有一个地方可以读取分配的“总”计数器数量 - 您必须检查所有本地计数器才能做到这一点。如果您的应用程序需要,这是可行的(并且有一些与LongAdder.sum() 函数相同的注意事项)。

    如果您希望线程间的数字“通常随时间增加”,并且知道每个线程都合理频繁地请求计数器值,则另一种解决方案是使用单个全局计数器,该计数器请求本地“分配”数量的 ID,然后它将以线程本地方式分配各个 ID。例如,线程可能会请求 10 个 ID,因此三个线程将被分配范围 0-9、10-19 和 20-29 等。然后它们在该范围之外分配,直到用完并返回哪个点到全球柜台。这类似于内存分配器如何划分公共池的块,然后可以在线程本地分配。

    上面的示例将保持 ID 大致按时间递增的顺序,并且每个线程的 ID 也将严格递增。但是它没有提供任何严格的保证:分配范围为 0-9 的线程,在使用 0 后可以很好地休眠几个小时,然后在其他线程上的计数器更高时使用“1”。它将争用减少 10 倍。

    您可以使用多种其他方法,其中大多数方法是在减少争用与计数器分配的“准确性”与实时之间进行权衡。如果您可以访问硬件,您可能可以使用快速递增的时钟,例如循环计数器(例如,rdtscp)和核心 ID,以获得与实时密切相关的唯一值非常 (假设操作系统正在同步计数器)。


    1 应根据应用程序中预期的线程数和每个线程的增量仔细选择位域大小。通常,如果您不断创建新线程并且您的应用程序是长期存在的,您可能希望在线程 ID 的更多位方面犯错,因为您总是可以检测到本地计数器的回绕并获得一个新线程ID,因此分配给线程 ID 的位可以有效地与本地计数器共享(但不能反过来)。

    2 最佳选择是使用“CPU ID”,但这在 Java 中不能直接访问(即使在汇编级别也没有快速和可移植的方式来获取它,AFAIK) -所以线程ID被用作代理。

    3 其中 N 是lg2(number of counters)

    【讨论】:

    • 谢谢 - 这回答了我的问题。而且对于其他指针,不幸的是我不能使用它们,因为我已经在使用类似的技术来构造一个包含时间戳(类似于 UUID)的 64 位唯一 ID,但我已经限制为 16 位计数器部分,不想通过池或类似方法进一步限制数字空间。
    • 一个低争用技巧,您可以仅针对对应部分或整个 UUID 应用,即“生成并检查”。例如,假设您正在通过读取高分辨率时钟生成一个可能唯一的值,并且您想让它绝对唯一。您使用您读取的值来索引哈希或类似数组的结构,以查看该值是否已被使用(几乎从未使用过),如果未使用,则标记它已与 CAS 一起使用。这里非常常见的快速路径是结构上相对无争议的 CAS。如果使用该值,则只需尝试下一个值。 @masch82
    • 这是有效的,因为您可以将争用分散到任意多的缓存行中(您可能只需要说 CPU 数量的 2 倍即可消除大多数争用),并且会发生慢速路径只有在真正有争执的时候。当然,仍然有各种有趣的实现细节,例如如何垃圾收集散列/数组的旧部分等,但这正在进入单独的问题领域。
    • 最后,如果您关心的是完全使用 16 位空间,您仍然可以使用批处理技术,但要更改算法,使其在限制范围内使用所有空间,例如,在某个时间点16 位周期(例如,当您接近环绕时),从分配它们但未使用它们的线程中“窃取”任何未使用的值。您还可以使批量大小自适应:例如,首先您从全局计数器请求 1 个 ID,但如果您很快再请求 2 个,等等。所以只有活动线程才能获得大批量。结合窃取,您可以减少争用,但仍会使用整个 16 位空间。
    猜你喜欢
    • 2016-06-11
    • 2017-05-11
    • 2016-10-02
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2021-10-01
    • 1970-01-01
    相关资源
    最近更新 更多