Akka演员承诺并发。 有什么更好的模拟方式,看看使用商品硬件和软件处理1000万条消息需要花费多少时间,而无需进行任何低级调整。我用Java编写了全部1000万条消息的处理过程,整个结果令我惊讶。

当我在具有Intel i5 – 4核,4 Gb RAM计算机和JVM堆的iMac计算机上以1024Mb运行该程序时,该程序在23秒内处理了1000万台计算机。 我多次运行该程序,平均时间为25秒。 因此,我收到的吞吐量几乎在每秒40万条消息的范围内,这是惊人的。

下图说明了用于模拟负载生成方案的流程。

使用Akka处理1000万条消息

警告:每条消息在1秒钟后发送响应,这对于实际情况而言并非正确的模拟。 在这种情况下,消息处理将消耗堆和gc活动上的一些资源,这些资源未考虑在内。

该程序使用了Akka发布者的总体指导:在75秒内处理了1000万条消息(每条消息1秒)! 尽管没有任何限制。

该程序的代码库位于以下位置– https://github.com/write2munish/Akka-Essentials

ApplicationManagerSystem创建actor,并在到WorkerActor的流量中进行泵送

  private ActorSystem system;
  private final ActorRef router;
  private final static int no_of_msgs = 10 * 1000000;
 
  public ApplicationManagerSystem() {
 
   final int no_of_workers = 10;
 
   system = ActorSystem.create('LoadGeneratorApp');
 
   final ActorRef appManager = system.actorOf(
     new Props(new UntypedActorFactory() {
      public UntypedActor create() {
       return new JobControllerActor(no_of_msgs);
      }
     }), 'jobController');
 
   router = system.actorOf(new Props(new UntypedActorFactory() {
    public UntypedActor create() {
     return new WorkerActor(appManager);
    }
   }).withRouter(new RoundRobinRouter(no_of_workers)));
  }
 
  private void generateLoad() {
   for (int i = no_of_msgs; i >= 0; i--) {
    router.tell('Job Id ' + i + '# send');
   }
   System.out.println('All jobs sent successfully');
  }
 

一旦WorkerActor收到了消息,则计划将响应在1000毫秒后发送

  

 
 
 public class WorkerActor extends UntypedActor {
 
  private ActorRef jobController;
 
  @Override
  public void onReceive(Object message) throws Exception {
    using scheduler to send the reply after 1000 milliseconds
   getContext()
     .system()
     .scheduler()
     .scheduleOnce(Duration.create(1000, TimeUnit.MILLISECONDS),
       jobController, 'Done');
  }
 
  public WorkerActor(ActorRef inJobController) {
   jobController = inJobController;
  }
 }

来自WorkerActor的响应消息被发送到JobControllerActor,后者收集所有响应。

 public class JobControllerActor extends UntypedActor {
 
  int count = 0;
  long startedTime = System.currentTimeMillis();
  int no_of_msgs = 0;
 
  @Override
  public void onReceive(Object message) throws Exception {
 
   if (message instanceof String) {
    if (((String) message).compareTo('Done') == 0) {
     count++;
     if (count == no_of_msgs) {
      long now = System.currentTimeMillis();
      System.out.println('All messages processed in '
        + (now - startedTime)  1000 + ' seconds');
 
      System.out.println('Total Number of messages processed '
        + count);
      getContext().system().shutdown();
     }
    }
   }
 
  }
 }

参考: 教程:Hibernate,JPA和Spring MVC –来自Akka Essentials博客的JCG合作伙伴 Munish K Gupta的第2部分

翻译自: https://www.javacodegeeks.com/2012/05/processing-10-million-messages-with.html

相关文章: