我建议您查看Akka。他们提供了一个更适合这个用例的 Actor 框架。由于没有定义您自己的 ExecutorService 接口实现,JDK 提供的默认实现只是无法对调度提供太多控制。
创建一个硬编码的 ExecutionServices 数组不会非常动态或健壮,尤其是因为每个 ExecutionService 会有一个线程池。可以将数组替换为哈希映射,然后将其放在 ExecutionService 的自定义实现后面,这样做的好处是可以向调用者隐藏这些细节,但不会解决拥有这么多线程池的线程浪费问题。
在 Akka 中,每个 Actor 都有自己的消息队列与之关联。每个 Actor 在其自己的线程中有效地运行,一次处理来自其队列的每条消息。 Akka 将管理跨多个 Actor 的线程共享。因此,如果您要为每种消息类型创建一个 Actor,然后将消息与这些 Actor 一起排队,那么您将获得让每种消息类型一次最多由一个线程处理同时仅由一个池支持的目标线程数。
技术演示:
Maven 对 Akka 的依赖。
<dependency>
<groupId>com.typesafe.akka</groupId>
<artifactId>akka-actor_2.11</artifactId>
<version>2.4.17</version>
</dependency>
Java 8 代码。复制并粘贴到 Java 文件中,然后在 IDE 中运行 main 方法。
package com.softwaremosaic.demos.akka;
import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;
import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;
public class ActorDemo {
public static void main( String[] args ) throws InterruptedException {
// The following partitioner will spread the requests over
// multiple actors, which I chose to demonstrate the technique.
// You will need to change it to one that better maps the the
// jobs to your use case. Remember that jobs that get mapped
// to the same key, will get executed in serial (probably
// but not necessarily) by the same thread.
ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );
for ( int i=0; i<100; i++ ) {
int id = i;
exectorService.submit( () -> System.out.println("JOB " + id) );
}
exectorService.shutdown();
exectorService.awaitTermination( 1, TimeUnit.MINUTES );
System.out.println( "DONE" );
}
}
class ActorExecutionService extends AbstractExecutorService {
private final ActorSystem actorSystem;
private final Function<Runnable, String> partitioner;
private final ConcurrentHashMap<String,ActorRef> actors = new ConcurrentHashMap<>();
public ActorExecutionService( Function<Runnable,String> partitioner ) {
this.actorSystem = ActorSystem.create("demo");
this.partitioner = partitioner;
}
public void execute( Runnable command ) {
String partitionKey = partitioner.apply( command );
ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );
actorRef.tell( command, actorRef );
}
private ActorRef createNewActor( String partitionKey ) {
return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
}
public void shutdown() {
actorSystem.terminate();
}
public List<Runnable> shutdownNow() {
actorSystem.terminate();
try {
awaitTermination( 1, TimeUnit.MINUTES );
} catch ( InterruptedException e ) {
throw new RuntimeException( e );
}
return Collections.emptyList();
}
public boolean isShutdown() {
return actorSystem.isTerminated();
}
public boolean isTerminated() {
return actorSystem.isTerminated();
}
public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
actorSystem.awaitTermination();
return actorSystem.isTerminated();
}
}
class ExecutionServiceActor extends UntypedActor {
public void onReceive(Object message) throws Exception {
if (message instanceof Runnable) {
((Runnable) message).run();
} else {
unhandled(message);
}
}
}
NB 上面的代码将以未定义的顺序打印 1-100。由于批处理(Akka 这样做以获得额外的性能优势),订单看起来主要是串行的。但是,由于不同的线程散布在工作中,您会看到数字的一些随机性。每个作业运行的时间越长,分配给 Akka 线程池的线程越多,使用的分区键越多,底层 CPU 内核越多,序列可能变得越随机。