【问题标题】:How can I rewrite this main thread - worker threads synchronization我怎样才能重写这个主线程 - 工作线程同步
【发布时间】:2018-04-11 00:12:45
【问题描述】:

我有一个类似这样的程序

public class Test implements Runnable
{
    public        int local_counter
    public static int global_counter
    // Barrier waits for as many threads as we launch + main thread
    public static CyclicBarrier thread_barrier = new CyclicBarrier (n_threads + 1);

    /* Constructors etc. */

    public void run()
    {
        for (int i=0; i<100; i++)
        {
            thread_barrier.await();
            local_counter = 0;
            for(int j=0 ; j = 20 ; j++)
                local_counter++;
            thread_barrier.await();
        }
    }

    public void main()
    {
        /* Create and launch some threads, stored on thread_array */
        for(int i=0 ; i<100 ; i++)
        {
            thread_barrier.await();
            thread_barrier.await();

            for (int t=1; t<thread_array.length; t++)
            {
                global_counter += thread_array[t].local_counter;
            }
        }
    }
}

基本上,我有几个线程有自己的本地计数器,我正在这样做(循环)

        |----|           |           |----|
        |main|           |           |pool|
        |----|           |           |----|
                         |

-------------------------------------------------------
barrier (get local counters before they're overwritten)
-------------------------------------------------------
                         |
                         |   1. reset local counter
                         |   2. do some computations
                         |      involving local counter
                         |
-------------------------------------------------------
             barrier (synchronize all threads)
-------------------------------------------------------
                         |
1. update global counter |
   using each thread's   |
   local counter         |

这一切都应该很好,但事实证明这并不能很好地扩展。在 16 个物理节点的集群上,6-8 个线程后的加速可以忽略不计,所以我必须摆脱其中一个等待。我尝试过使用 CyclicBarrier,它的扩展性非常好,Semaphores,它做的一样多,还有一个自定义库 (jbarrier),它可以很好地工作,直到线程数多于物理内核,此时它的性能比顺序版本差。但是如果不停止所有线程两次,我就是想不出一种方法。

编辑:虽然我感谢您对我的程序中任何其他可能的瓶颈的所有见解,但我正在寻找有关此特定问题的答案。如果需要,我可以提供更具体的示例

【问题讨论】:

  • 您要解决的问题有多复杂? CountDownLatch 之类的东西是否会有所帮助,因为它可能会降低解决方案的复杂性?
  • @HarisNadeem 问题是 CountDownLatch 被设计为使用一次,而我在一个循环中连续使用这个 Barrier - 我想我可以在每个循环中创建一个新的 CountDownLatch,我没有尝试过,但我没有认为它会是有效的
  • 是的,每次都创建一个新的效率不高。如果您不介意,我对问题的大小和硬件有一些疑问。您尝试解决的问题可能是内存密集型?如果是这样,增加线程只会增加内存负载,并且会减慢其他线程可用的内存。线程中是否可能涉及IO?如果是这样,这可能是一个瓶颈,增加线程超过某个点可能无法解决。
  • 这对我来说看起来像是一个标准的生产者-消费者问题,为什么线程不能独立计算它们的结果并用它们的 id 将它们放入队列中。随后主要可以消耗它们吗?我假设消费者比生产者快得多。
  • 你能分享一个更详细的例子吗? (最好能看到源代码或可运行的示例。)

标签: java concurrency parallel-processing synchronization java.util.concurrent


【解决方案1】:

一些修复:假设您的线程数组 [0] 应该参与全局计数器总和,您对线程的迭代应该是 for(int t=0;...)。我们可以猜测它是一个测试数组,而不是线程。 local_counter 应该是 volatile 的,否则你可能看不到测试线程和主线程的真实值。

好的,现在,你有一个适当的 2 阶段循环,afaict。任何其他东西,例如相位器或每个循环都有一个新的倒计时闩锁的 1 个循环障碍,都只是同一主题的变体:让多个线程同意让主线程恢复,并让主线程一次性恢复多个线程。

更精简的实现可能涉及可重入锁、到达测试线程的计数器、恢复所有测试线程测试的条件以及恢复主线程的条件。 --count==0 时到达的测试线程应该发出主恢复条件的信号。所有测试线程等待测试恢复条件。 main 应在测试恢复条件下将计数器重置为 N 和 signalAll,然后在 main 条件下等待。线程(测试和主线程)每个循环只等待一次。

最后,如果最终目标是由任何线程更新的总和,您应该查看 LongAdder(如果不是 AtomicLong)以同时执行长加法,而不必停止所有线程(它们相互竞争和加法,不涉及主要的)。

否则,您可以让线程将其材料传递到主线程读取的阻塞队列。这样做的味道太多了。我很难理解为什么要挂起所有线程来收集数据。就是这样。这个问题过于简单化了,我们没有足够的约束来证明你在做什么。

不用担心 CyclicBarrier,它是通过可重入锁、一个计数器和一个条件来实现的,以将 signalAll() 触发到所有等待的线程。这是严格编码的,afaic。如果您想要无锁版本,您将面临太多繁忙的自旋循环浪费 cpu 时间,尤其是当您担心线程多于内核时的扩展时。

同时,您是否有可能实际上拥有 8 个看起来像 16 个 cpu 的超线程内核?

清理后,您的代码如下所示:

package tests;

import java.util.concurrent.BrokenBarrierException;
import java.util.concurrent.CyclicBarrier;
import java.util.stream.Stream;

public class Test implements Runnable {
    static final int n_threads = 8;
    static final long LOOPS = 10000;
    public static int global_counter;
    public static CyclicBarrier thread_barrier = new CyclicBarrier(n_threads + 1);

    public volatile int local_counter;

    @Override
    public void run() {
        try {
            runImpl();
        } catch (InterruptedException | BrokenBarrierException e) {
            //
        }
    }

    void runImpl() throws InterruptedException, BrokenBarrierException {
        for (int i = 0; i < LOOPS; i++) {
            thread_barrier.await();
            local_counter = 0;
            for (int j=0; j<20; j++)
                local_counter++;
            thread_barrier.await();
        }
    }

    public static void main(String[] args) throws InterruptedException, BrokenBarrierException {
        Test[] ra = new Test[n_threads];
        Thread[] ta = new Thread[n_threads];
        for(int i=0; i<n_threads; i++)
            (ta[i] = new Thread(ra[i]=new Test()).start();

        long nanos = System.nanoTime();
        for (int i = 0; i < LOOPS; i++) {
            thread_barrier.await();
            thread_barrier.await();

            for (int t=0; t<ra.length; t++) {
                global_counter += ra[t].local_counter;
            }
        }

        System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");

        Stream.of(ta).forEach(t -> t.interrupt());
    }
}

我的带有 1 个锁的版本如下所示:

package tests;

import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;

public class TwoPhaseCycle implements Runnable {
    static final boolean DEBUG = false;
    static final int N = 8;
    static final int LOOPS = 10000;

    static ReentrantLock lock = new ReentrantLock();
    static Condition testResume = lock.newCondition();
    static volatile long cycle = -1;
    static Condition mainResume = lock.newCondition();
    static volatile int testLeft = 0;

    static void p(Object msg) {
        System.out.println(Thread.currentThread().getName()+"] "+msg);
    }

    //-----
    volatile int local_counter;

    @Override
    public void run() {
        try {
            runImpl();
        } catch (InterruptedException e) {
            p("interrupted; ending.");
        }
    }

    public void runImpl() throws InterruptedException {
        lock.lock();
        try {
            if(DEBUG) p("waiting for 1st testResumed");
            while(cycle<0) {
                testResume.await();
            }
        } finally {
            lock.unlock();
        }

        long localCycle = 0;//for (int i = 0; i < LOOPS; i++) {
        while(true) {
            if(DEBUG) p("working");
            local_counter = 0;
            for (int j = 0; j<20; j++)
                local_counter++;
            localCycle++;

            lock.lock();
            try {
                if(DEBUG) p("done");
                if(--testLeft <=0)
                    mainResume.signalAll(); //could have been just .signal() since only main is waiting, but safety first.

                if(DEBUG) p("waiting for cycle "+localCycle+" testResumed");
                while(cycle < localCycle) {
                    testResume.await();
                }
            } finally {
                lock.unlock();
            }
        }
    }

    public static void main(String[] args) throws InterruptedException {
        TwoPhaseCycle[] ra = new TwoPhaseCycle[N];
        Thread[] ta = new Thread[N];
        for(int i=0; i<N; i++)
            (ta[i] = new Thread(ra[i]=new TwoPhaseCycle(), "\t\t\t\t\t\t\t\t".substring(0, i%8)+"\tT"+i)).start();

        long nanos = System.nanoTime();

        int global_counter = 0;
        for (int i=0; i<LOOPS; i++) {
            lock.lock();
            try {
                if(DEBUG) p("gathering");
                for (int t=0; t<ra.length; t++) {
                    global_counter += ra[t].local_counter;
                }
                testLeft = N;
                cycle = i;
                if(DEBUG) p("resuming cycle "+cycle+" tests");
                testResume.signalAll();

                if(DEBUG) p("waiting for main resume");
                while(testLeft>0) {
                    mainResume.await();
                }
            } finally {
                lock.unlock();
            }
        }

        System.out.println(global_counter+", "+1e-6*(System.nanoTime()-nanos)+" ms");

        p(global_counter);
        Stream.of(ta).forEach(t -> t.interrupt());
    }
}

当然,这绝不是一个稳定的微基准,但趋势表明它更快。希望你喜欢。 (我放弃了一些最喜欢的调试技巧,值得将调试变为现实......)

【讨论】:

  • 很抱歉忽略了这个问题,我在任何人回复并忘记它之前找到了一个合适的解决方案。 “如果最终目标是由任何线程更新的总和,您应该查看 LongAdder(如果不是 AtomicLong)以同时执行长加法,而不必停止所有线程(它们战斗和添加,不涉及主线程)。” -> 这正是我最终做的。为此,加上其他非常详尽的改进列表,我认为值得将其标记为最佳答案
【解决方案2】:

嗯。我不确定是否完全理解,但我认为您的主要问题是您尝试过多地重用一组预定义的线程。您应该让 Java 来处理这个问题(这就是执行器/分叉连接池的用途)。为了解决您的问题,拆分/处理/合并(或映射/减少)似乎适合我。从 java 8 开始,它是一种非常简单的实现方法(感谢流/fork-join pool/completable future APIs)。我在这里提出2个替代方案:

Java 8 流

对我来说,您的问题看起来可以恢复为 map/reduce 问题。如果您可以使用 Java 8 流,则可以将性能问题委托给它。我会做什么:
1. 创建一个包含处理输入的并行流(您甚至可以使用方法动态生成输入)。请注意,您可以实现自己的 Spliterator,以完全控制输入的浏览和拆分(网格上的单元格?)。
2. 使用地图处理输入。
3. 使用reduce方法合并所有之前计算的结果。

简单示例(基于您的示例):

// Create a pool with wanted number of threads
    final ForkJoinPool pool = new ForkJoinPool(4);
    // We give the entire procedure to the thread pool
    final int result = pool.submit(() -> {
        // Generate a hundred counters, initialized on 0 value
        return IntStream.generate(() -> 0)
                .limit(100)
                // Specify we want it processed in a parallel way
                .parallel()
                // The map will register processing method
                .map(in -> incrementMultipleTimes(in, 20))
                // We ask the merge of processing results
                .reduce((first, second) -> first + second)
                .orElseThrow(() -> new IllegalArgumentException("Empty dataset"));
    })
            // Wait for the overall result
            .get();

    System.out.println("RESULT: " + result);

    pool.shutdown();
    pool.awaitTermination(10, TimeUnit.SECONDS);

需要注意的一些事项:
1.默认情况下,并行流在JVM Common fork-join pool上执行任务,可以限制执行者的数量。但是有一些方法可以使用您自己的游泳池:see this answer.
2.如果配置好的话,我认为这是最好的方法,因为并行逻辑已经由JDK开发者自己处理了。

移相器

如果你不能使用 java8 功能(或者我误解了你的问题,或者你真的想自己处理底层管理),我能给你的最后一条线索是:Phaser object。 正如文档所述,它是循环屏障和倒计时锁存器的可重复使用组合。我已经多次使用它了。使用起来很复杂,但它也非常强大。它可以用作循环屏障,所以我认为它适合您的情况。

【讨论】:

    【解决方案3】:

    您真的可以考虑遵循其 (CyclicBarrier) documentation 中的“官方”示例:

     class Solver {
       final int N;
       final float[][] data;
       final CyclicBarrier barrier;
    
       class Worker implements Runnable {
         int myRow;
         Worker(int row) { myRow = row; }
         public void run() {
           while (!done()) {
             processRow(myRow);
    
             try {
               barrier.await();
             } catch (InterruptedException ex) {
               return;
             } catch (BrokenBarrierException ex) {
               return;
             }
           }
         }
       }
    
       public Solver(float[][] matrix) {
         data = matrix;
         N = matrix.length;
         barrier = new CyclicBarrier(N,
                                     new Runnable() {
                                       public void run() {
                                         mergeRows(...);
                                       }
                                     });
         for (int i = 0; i < N; ++i)
           new Thread(new Worker(i)).start();
    
         waitUntilDone();
       }
     }
    

    你的情况

    • processRow() 会生成部分生成(任务被分成 N 块,worker 可以在初始化时获取它们的编号,或者直接使用 barrier.await() 返回的编号(在这种情况下,worker 应该从 await 开始)
    • mergeRows(),在匿名的Runnable 中传递给构建时的障碍,是整整一代准备好的地方,你可以在屏幕上打印它或其他东西(也许交换一些'currentGen'和'nextGen'缓冲区) .当此方法返回时(或更准确地说是 run()),worker 中的 barrier.await() 调用也会返回并开始计算下一代(或不返回,请参阅下一个要点)
    • done() 决定线程何时应该退出(而不是产生新的一代)。它可以是“真正的”方法,但 static volatile boolean 变量也可以工作
    • waitUntilDone() 可能是所有线程的循环,join()-ing 它们。或者只是等待程序退出时可以触发的东西(从“mergeRows”)

    【讨论】:

      猜你喜欢
      • 1970-01-01
      • 2012-04-15
      • 2021-07-14
      • 2021-12-16
      • 2023-03-13
      • 2020-10-27
      • 1970-01-01
      • 2020-03-07
      • 1970-01-01
      相关资源
      最近更新 更多