【问题标题】:Java: Executor Service with multiple queuesJava:具有多个队列的执行器服务
【发布时间】:2017-09-07 09:23:13
【问题描述】:

要求:

  1. 我将消息分组为不同类型,例如Type1, Type2 ... Type100
  2. 我想并行执行不同类型的消息。假设在 10 个线程中,但是所有相同类型的消息必须一个一个地执行。执行顺序无关紧要。
  3. 一旦线程完成TypeX 的所有消息。它应该开始处理另一个类型。

我经历了不同的答案: 他们中的大多数建议执行器服务来处理多线程。 假设我们创建了类似的执行器服务

ExecutorService executorService = Executors.newFixedThreadPool(10);

但是一旦我们使用executorService.submit(runnableMessage);提交消息

我们无法控制仅将特定类型的消息分配给特定线程。

解决方案:

创建一个单线程执行器数组

ExecutorService[] pools = new ExecutorService[10];

并最初传递Type1,Type2 ... Type10的消息 然后如果任何 executor 已完成执行,则将 Type11 分配给它并继续执行,直到所有 Types 都得到处理。

有没有更好的办法?

类似于具有多个队列的执行器服务,我可以将每种类型的消息推送到不同的队列?

【问题讨论】:

  • 嗯,ExecutorService 还提供了 ScheduledExecutorService,您可以在 scheduleAtFixedRate() 或 scheduleAtFixedDelay() 之间进行选择。我建议您将消息推送到 10 个不同的 ArrayDeques 中,每个用于特定的消息类型,您在队列模式 (FIFO) 中操作。每个 ScheduledExecutorService 以给定的间隔或延迟运行它自己的消息队列,直到您停止系统。我想这应该可以解决你的问题。

标签: java multithreading executorservice


【解决方案1】:

我建议您查看Akka。他们提供了一个更适合这个用例的 Actor 框架。由于没有定义您自己的 ExecutorService 接口实现,JDK 提供的默认实现只是无法对调度提供太多控制。

创建一个硬编码的 ExecutionServices 数组不会非常动态或健壮,尤其是因为每个 ExecutionService 会有一个线程池。可以将数组替换为哈希映射,然后将其放在 ExecutionService 的自定义实现后面,这样做的好处是可以向调用者隐藏这些细节,但不会解决拥有这么多线程池的线程浪费问题。

在 Akka 中,每个 Actor 都有自己的消息队列与之关联。每个 Actor 在其自己的线程中有效地运行,一次处理来自其队列的每条消息。 Akka 将管理跨多个 Actor 的线程共享。因此,如果您要为每种消息类型创建一个 Actor,然后将消息与这些 Actor 一起排队,那么您将获得让每种消息类型一次最多由一个线程处理同时仅由一个池支持的目标线程数。

技术演示:

Maven 对 Akka 的依赖。

    <dependency>
        <groupId>com.typesafe.akka</groupId>
        <artifactId>akka-actor_2.11</artifactId>
        <version>2.4.17</version>
    </dependency>

Java 8 代码。复制并粘贴到 Java 文件中,然后在 IDE 中运行 main 方法。

package com.softwaremosaic.demos.akka;

import akka.actor.ActorRef;
import akka.actor.ActorSystem;
import akka.actor.Props;
import akka.actor.UntypedActor;

import java.util.Collections;
import java.util.List;
import java.util.concurrent.AbstractExecutorService;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.function.Function;


public class ActorDemo {

    public static void main( String[] args ) throws InterruptedException {
        // The following partitioner will spread the requests over
        // multiple actors, which I chose to demonstrate the technique.
        // You will need to change it to one that better maps the the
        // jobs to your use case.   Remember that jobs that get mapped
        // to the same key, will get executed in serial (probably
        // but not necessarily) by the same thread.
        ExecutorService exectorService = new ActorExecutionService( job -> job.hashCode()+"" );

        for ( int i=0; i<100; i++ ) {
            int id = i;
            exectorService.submit( () -> System.out.println("JOB " + id) );
        }

        exectorService.shutdown();
        exectorService.awaitTermination( 1, TimeUnit.MINUTES );

        System.out.println( "DONE" );
    }

}


class ActorExecutionService extends AbstractExecutorService {

    private final ActorSystem                              actorSystem;
    private final Function<Runnable, String>               partitioner;
    private final ConcurrentHashMap<String,ActorRef>       actors = new ConcurrentHashMap<>();

    public ActorExecutionService( Function<Runnable,String> partitioner ) {
        this.actorSystem = ActorSystem.create("demo");
        this.partitioner = partitioner;
    }


    public void execute( Runnable command ) {
        String partitionKey = partitioner.apply( command );

        ActorRef actorRef = actors.computeIfAbsent( partitionKey, this::createNewActor );

        actorRef.tell( command, actorRef );
    }

    private ActorRef createNewActor( String partitionKey ) {
        return actorSystem.actorOf( Props.create(ExecutionServiceActor.class), partitionKey );
    }


    public void shutdown() {
        actorSystem.terminate();
    }

    public List<Runnable> shutdownNow() {
        actorSystem.terminate();

        try {
            awaitTermination( 1, TimeUnit.MINUTES );
        } catch ( InterruptedException e ) {
            throw new RuntimeException( e );
        }

        return Collections.emptyList();
    }

    public boolean isShutdown() {
        return actorSystem.isTerminated();
    }

    public boolean isTerminated() {
        return actorSystem.isTerminated();
    }

    public boolean awaitTermination( long timeout, TimeUnit unit ) throws InterruptedException {
        actorSystem.awaitTermination();

        return actorSystem.isTerminated();
    }
}

 class ExecutionServiceActor extends UntypedActor {
    public void onReceive(Object message) throws Exception {
        if (message instanceof Runnable) {
            ((Runnable) message).run();
        } else {
            unhandled(message);
        }
    }
}

NB 上面的代码将以未定义的顺序打印 1-100。由于批处理(Akka 这样做以获得额外的性能优势),订单看起来主要是串行的。但是,由于不同的线程散布在工作中,您会看到数字的一些随机性。每个作业运行的时间越长,分配给 Akka 线程池的线程越多,使用的分区键越多,底层 CPU 内核越多,序列可能变得越随机。

【讨论】:

    【解决方案2】:

    这是我的非常基本的示例,说明它的外观。 您创建一个包含 10 个 ArrayDeques 的 Map,这些 ArrayDeques 由它们的“Typ”寻址。 您还启动了 10 个 ScheduledExecutors。 每个最初等待 5 秒,然后每 200 毫秒轮询一次其队列。 在这个当前示例中,输出将始终是“TypeX 的当前消息队列:null”,因为队列都是空的。

    但是您现在可以启动它并将您的消息传递到匹配的队列中。该服务将每 200 毫秒获取一次,并随心所欲地使用它。 当您使用队列时,消息的处理方式也会自动排序。

    import java.util.ArrayDeque;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.concurrent.Executors;
    import java.util.concurrent.ScheduledExecutorService;
    import java.util.concurrent.TimeUnit;
    
    public class Messages {
    
        public static void main(String[] args) {
    
            Map<String, ArrayDeque<String>> messages = new HashMap<String, ArrayDeque<String>>();
            ScheduledExecutorService service = Executors.newSingleThreadScheduledExecutor();
            long initialDelay = 5000;
            long period = 200;
    
            // create 10 Queues, indexed by the type
            // create 10 executor-services, focused on their message queue
            for(int i=1; i<11; i++) {
                String type = "Type" + i;
    
                Runnable task = () -> System.out.println(
                         "current message of " + type + ": " + messages.get(type).poll()
                );
    
                messages.put(type, new ArrayDeque<String>());
                service.scheduleAtFixedRate(task, initialDelay, period, TimeUnit.MILLISECONDS);
            }
    
        }
    }
    

    【讨论】:

    • 它是如何创建 10 个执行者的?以及当我想在整个生命周期内执行消息时,以固定速率调度消息有什么意义?
    • 创建发生在循环中。 “service.scheduleAtFixedRate(任务,initialDelay,周期,TimeUnit.MILLISECONDS);” schedule...- 方法被调用了 10 次。每次调用它都会创建一个新任务。这些任务会定期运行,直到被取消,并且每 200 毫秒检查一次队列的内容。如果存在新消息,它将被处理。 (您必须用您的特定消息处理代码替换可运行文件中的打印方法)。因此,他们确实会在整个生命周期内处理您的消息。每当您将消息放入队列时,只要应用程序运行,它们就会被处理。
    • 也许您打算以其他方式设计或运行您的应用程序。但是我的代码首先反映了我的理解。 -- 你会得到你按特定类型划分的消息。我的排队概念涵盖了这一点。您想转发这些消息,并且知道每条消息的类型。甚至这也包括使用队列和另外 10 个预定服务,每个服务只关心他的特定队列并运行直到被您取消。
    • 如有其他问题,请随时提问。 ?
    • scheduleAtFixedRate 并没有像你想象的那样做。您的解决方案不会并行运行任何东西。请参阅 javadoc 中的 newSingleThreadScheduledExecutor “任务保证按顺序执行,在任何给定时间不会有超过一个任务处于活动状态。”也就是说,只需将其更改为 newScheduledThreadPool(10)
    【解决方案3】:

    一个更简单的解决方案可能是:

    而不是让每条消息都可以运行。 我们可以根据类型创建群组消息:

    例如我们为 type1

    的所有消息创建 Group1
    class MessageGroup implements Runnable {
        String type;
        String List<Message> messageList;
    
        @Override
        public void run() {
          for(Message message : MessageList) {
             message.process();
          }
        }
    } 
    

    我们可以使用固定线程创建通常的执行器服务,例如

    ExecutorService executorService = Executors.newFixedThreadPool(10); 
    

    我们可以提交一组消息,而不是提交单个消息,例如

    executorService.submit(runnableGroup);
    

    并且每个组将在同一个线程中顺序执行相同类型的消息。

    【讨论】:

    • 这需要您在内存中预先为组构建整个消息列表。
    猜你喜欢
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 2013-07-29
    • 2019-03-27
    • 2020-01-31
    相关资源
    最近更新 更多