【问题标题】:Strategies for concurrent pipelines in JavaJava中并发管道的策略
【发布时间】:2011-01-05 08:21:15
【问题描述】:

考虑以下 shell 脚本:

gzip -dc in.gz | sed -e 's/@/_at_/g' | gzip -c > out.gz 

这具有三个并行工作的进程来解压缩流、修改它并重新压缩它。运行time 我可以看到我的用户时间大约是我实际时间的两倍,这表明程序有效地并行工作。

我试图通过将每个任务放在它自己的线程中来在 Java 中创建相同的程序。不幸的是,对于上述示例,多线程 Java 程序仅比single threaded 版本大约30% faster。我试过同时使用ExchangerConcurrentLinkedQueue。 ConcurrentLinkedQueue 链接队列会引起很多争用,尽管所有三个线程通常都保持忙碌状态。 Exchanger 的争用较低,但更复杂,并且似乎无法让最慢的工作人员 100% 地运行。

我试图在不查看字节码编织框架或基于 JNI 的 MPI 的情况下找出解决此问题的纯 Java 解决方案。

大多数并发研究和 API 都与 divide-and-conquer 算法相关,为每个节点提供正交且不依赖于先前计算的工作。并发的另一种方法是管道方法,其中每个工作人员执行一些工作并将数据传递给下一个工作人员。

我并不是想找到最有效的方式来 sed 一个 gzip 文件,而是我正在研究如何有效地分解管道中的任务,以便将运行时间减少到最慢的时间任务。

目前10m行文件的时序如下:

Testing via shell

real    0m31.848s
user    0m58.946s
sys     0m1.694s

Testing SerialTest

real    0m59.997s
user    0m59.263s
sys     0m1.121s

Testing ParallelExchangerTest

real    0m41.573s
user    1m3.436s
sys     0m1.830s

Testing ConcurrentQueueTest

real    0m44.626s
user    1m24.231s
sys     0m10.856s

我悬赏 10% 的 Java 改进,这是在具有 1000 万行测试数据的四核系统上实时测量的。当前资源可在Bitbucket 获得。

【问题讨论】:

  • 您在 Java 中所做的代码示例对于提供改进非常有用。很难看出你尝试了什么
  • 您是否在多 CPU 机器上进行测试?不确定 JVM 是否可以使用超过 1 个 CPU。
  • 我正在运行 Solaris 10 的四核 AMD 机器上进行测试。测试的源代码位于此处:bitbucket.org/brianegge/java-concurrent/src/tip
  • 在这项工作的某个时候你不会受到 IO 限制吗?这将限制您可以实现的并行度。
  • 好问题!花了将近半天的时间进行测试 :-) 一些争用似乎来自 gc(在 jdk linux 双核上),并且使用更大的读取为此做了一些事情,因为字符串的数量会变得更小。但一切都归结为 java.concurrent 小工具效率不高。我还用管道输入/输出流进行了测试,它们在这里完全没用,因为所有操作都是同步的,一个线程阻塞另一个线程。我想有可能重写那些。

标签: java performance multithreading concurrency


【解决方案1】:

首先,该过程只会与最慢的部分一样快。如果时间分解是:

  • gunzip:1 秒
  • sed:5 秒
  • gzip:1 秒

通过多线程,您将在 最多 5 秒而不是 7 秒内完成。

其次,不要使用您正在使用的队列,而是尝试复制您正在复制的功能并使用PipedInputStreamPipedOutputStream 将进程链接在一起。

编辑:有几种方法可以使用 Java 并发工具处理相关任务。把它分成线程。首先创建一个通用基类:

public interface Worker {
  public run(InputStream in, OutputStream out);
}

这个接口的作用是代表一些处理输入并生成输出的任意作业。将这些链接在一起,您就有了一个管道。你也可以抽象出样板。为此,我们需要一个类:

public class UnitOfWork implements Runnable {
  private final InputStream in;
  private final OutputStream out;
  private final Worker worker;

  public UnitOfWork(InputStream in, OutputStream out, Worker worker) {
    if (in == null) {
      throw new NullPointerException("in is null");
    }
    if (out == null) {
      throw new NullPointerException("out is null");
    }
    if (worker == null) {
      throw new NullPointerException("worker is null");
    }
    this.in = in;
    this.out = out;
    this.worker = worker;
  }

  public final void run() {
    worker.run(in, out);
  }
}

例如,Unzip PART:

public class Unzip implements Worker {
  protected void run(InputStream in, OutputStream out) {
    ...
  }
}

SedZip 以此类推。然后将它绑定在一起的是:

public static void pipe(InputStream in, OutputStream out, Worker... workers) {
  if (workers.length == 0) {
    throw new IllegalArgumentException("no workers");
  }
  OutputStream last = null;
  List<UnitOfWork> work = new ArrayList<UnitOfWork>(workers.length);
  PipedOutputStream last = null;
  for (int i=0; i<workers.length-2; i++) {
    PipedOutputStream out = new PipedOutputStream();
    work.add(new UnitOfWork(
      last == null ? in, new PipedInputStream(last), out, workers[i]);
    last = out;
  }
  work.add(new UnitOfWork(new PipedInputStream(last),
    out, workers[workers.length-1);
  ExecutorService exec = Executors.newFixedThreadPool(work.size());
  for (UnitOfWork w : work) {
    exec.submit(w);
  }
  exec.shutdown();
  try {
    exec.awaitTermination(Long.MAX_VALUE, TimeUnit.NANOSECONDS);
  } catch (InterruptedExxception e) {
    // do whatever
  }
}

我不确定你能做得比这更好,而且为每项工作编写的代码最少。那么你的代码就变成了:

public static processFile(String inputName, String outputName) {
  pipe(new FileInputStream(inputFile), new FileOutputStream(outputFile),
    new Zip(), new Sed(), new Unzip());
}

【讨论】:

  • 在我对多线程加密进行的一些测试中,我做了一些非常相似的事情,但是当我进行自己的缓冲实现时,性能真正得到了提升。管道流和缓冲流已经被缓冲,但是缓冲区大小会引入大量开销,除非缓冲区与变换算法对齐。因此,如果 zip 一次在 1k 字节上工作,则使用该维度的自定义缓冲区在压缩之前输入数据,如果 sed 使用一行完全一次获取 128 行,这种东西会大大提高速度,减少争用和开销(和复杂性,otoh...)。
【解决方案2】:

我单独验证了所花费的时间,看起来阅读花费的时间不到 10%,而阅读加处理花费的时间不到整个时间的 30%。 所以我采用了 ParallelExchangerTest (您的代码中表现最好的)并将其修改为 只有 2 个线程,第一个线程读取和替换,第二个线程写入。

这是要比较的数字(在我的机器上运行 ubuntu 和 1gb ram 的英特尔双核(不是 core2))

> 通过 shell 测试

真正的 0m41.601s

用户 0m58.604s

系统 0m1.032s

> 测试 ParallelExchangerTest

真正的 1m55.424s

用户 2m14.160s

系统 0m4.768s

> ParallelExchangerTestMod(2 线程)

真正的 1m35.524s

用户 1m55.319s

系统 0m3.580s

我知道字符串处理需要更长的时间,所以我替换了 line.repalce 使用 matcher.replaceAll,我得到了这个数字

> ParallelExchangerTestMod_Regex(2 线程)

真正的 1m12.781s

用户 1m33.382s

系统 0m2.916s

现在我向前迈了一步,而不是一次读一行,我读 各种大小的 char[] 缓冲区并对其进行计时,(使用正则表达式搜索/替换,) 我得到了这些数字

> 测试 ParallelExchangerTestMod_Regex_Buff(一次处理 100 字节)

真正的 1m13.804s

用户 1m32.494s

系统 0m2.676s

> 测试 ParallelExchangerTestMod_Regex_Buff(一次处理 500 字节)

真正的 1m6.286s

用户 1m29.334s

系统 0m2.324s

> 测试 ParallelExchangerTestMod_Regex_Buff(一次处理 800 字节)

真正的 1m12.309s

用户 1m33.910s

系统 0m2.476s

看起来 500 字节最适合数据大小。

我在此处分叉并保存了我的更改副本

https://bitbucket.org/chinmaya/java-concurrent_response/

【讨论】:

  • 我检查了您的更改并在 Solaris 机器上运行它们。结果与 Ubuntu 有很大不同。最快的一个比我的 ParallelExchangerTest 快 1.5 秒。 ParallelExchangerTestMod_Regex real 0m40.418s user 0m56.314s sys 0m1.374s 在 Ubuntu、Cygwin 和 OS X 上运行相同的测试显示结果因平台而异。
  • 当然,JVM 实现会因平台而异。您可能想在编译时 (javac -O) 和运行时 (java -X) 尝试优化。
【解决方案3】:

您也可以在 Java 中使用管道。它们以 Streams 的形式实现,有关详细信息,请参阅 PipedInputStreamPipedOutputStream

为防止堵塞,我建议放置合适的管道尺寸。

【讨论】:

  • 当您需要将输出流连接到输入流时,PipedOutputStream 和 PipedInputStream 很有用。串行测试中的“Sed”类有效地完成了上述两个类的工作。我不是试图实现管道,而是并发管道。管道就像汽车装配线,将工作从一个阶段传递到下一个阶段。在它自己的线程中运行每个阶段提供了在不能并行运行的任务中并发的可能性。
  • 此外;我刚刚测试了 PidedIOStream 的性能,它实际上只是一个信号量,一次只有一个线程可以在底层缓冲区上工作。我想有可能重写类以使用多个缓冲区,并增加吞吐量。
【解决方案4】:

鉴于你没有说你是如何测量经过的时间,我假设你正在使用类似的东西:

time java org.egge.concurrent.SerialTest < in.gz > out.gz
time java org.egge.concurrent.ConcurrentQueueTest < in.gz > out.gz

问题在于你在这里测量两件事:

  1. JVM 启动需要多长时间,以及
  2. 程序运行需要多长时间。

您只能通过更改代码来更改第二个。使用您提供的数据:

Testing SerialTest
real    0m6.736s
user    0m6.924s
sys     0m0.245s

Testing ParallelExchangerTest
real    0m4.967s
user    0m7.491s
sys     0m0.850s

如果我们假设 JVM 启动需要 3 秒,那么“程序运行时间”分别为 3.7 秒和 1.9 秒,这几乎是 100% 的加速。我强烈建议您使用更大的数据集进行测试,以便将 JVM 启动对计时结果的影响降至最低。

编辑:根据您对此问题的回答,您很可能正遭受锁争用的困扰。在 java 中解决这个问题的最佳方法可能是使用管道读取器和写入器,从管道读取,一次读取一个字节,并将输入流中的任何 '@' 字符替换为输出流中的 "_at_"。您可能会遇到这样一个事实,即每个字符串都被扫描了 3 次,并且任何替换都需要构建一个新对象,并且该字符串最终会再次被复制。希望这会有所帮助...

【讨论】:

  • 如果我用一条记录运行测试,我可以看到 0m0.292s 的实时时间。测试并不理想,因为最后阶段的强度是第一阶段的两倍多。
  • 我使用的测试工具在bitbucket.org/brianegge/java-concurrent/src/tip/bin/test。运行 10m 行,ParallelExchanger 显示的“用户”时间几乎与 shell 脚本相同,但实际时间要长 10 秒。如果能提高效率,说不定可以和shell脚本同时执行。
【解决方案5】:

减少读取和对象的数量让我的性能提高了 10% 以上。

但是java.util.concurrent的表现还是有点让人失望。

并发队列测试:

private static class Reader implements Runnable {

@Override
  public void run() {
   final char buf[] = new char[8192];
   try {

    int len;
    while ((len = reader.read(buf)) != -1) {
     pipe.put(new String(buf,0,len));
    }
    pipe.put(POISON);

   } catch (IOException e) {
    throw new RuntimeException(e);
   } catch (InterruptedException e) {
    throw new RuntimeException(e);
   }
  }

【讨论】:

    猜你喜欢
    • 2020-12-14
    • 1970-01-01
    • 1970-01-01
    • 2014-09-01
    • 2014-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多