WAMP-网络程序消息协议

wamp是一个开放式的标准的websocket子协议,在一个统一协议中提供两种应用程序的消息模式: 远程过程调用  +  发布&订阅

它在使用不同语言的开放的WebSocket协议中提供统一应用程序路由。使用WAMP,可以用松耦合实现实时通信的应用程序组件构建分布式系统。

因为应用程序通常对两种形式的通信都有自然的需要,并且不应该要求为这些形式使用不同的协议/手段, 所以这就是为什么WAMP提供这这两种通信模式。

WAMP的核心是为应用程序组件提供两种通信模式,以便彼此通信:

         发布&订阅  + 远程过程调用

一、统一应用路由

WAMP提供给了我们所谓的应用程序的统一应用路由:在一个协议中的应用组件之间路由两个事件(用于PubSub)和路由呼叫(用于RPC)。

WAMP-网络程序消息协议

统一路由可能最好通过与传统方法进行对比来解释。 在客户端 - 服务器模型中,远程过程调用直接从调用者到被调用者:

  调用者-->被调者

在客户端 - 服务器模型中,Caller需要知道Callee驻留在哪里以及如何到达它,这引入Caller和Callee之间的强耦合。 这是很麻烦的,因为应用程序可能很快变得复杂和无法维护。

所以WAMP如何解决这一点的呢?

WAMP-网络程序消息协议

在发布 - 订阅模型中,发布者向抽象的“主题”提交信息,并且订阅者仅通过它们对应“主题”的兴趣而间接地接收到信息。 两个人不知道彼此。 它们通过“主题”和通常称为代理的中间体解耦: 发布者-->代理商-->订阅者

代理商保留订阅:谁目前订阅了哪个主题。 当发布者向主题发布一些信息(“事件”)时,代理将查找当前对该主题订阅的用户:确定发布到主题上的订阅者的集合,然后将信息(“事件”)转发给所有这些订阅者。 而确定信息接收者(独立于提交的信息)和将信息转发到接收者的行为就被称为路由。

WAMP-网络程序消息协议

现在,WAMP将松耦合的好处转化为RPC。与客户端 - 服务器模型不同,WAMP还通过引入中介:经销商来解除Caller和Callees的联系: 调用者-->经销商-->被调者

类似于Broker在PubSub中的作用,经销商负责将来自Caller的呼叫路由到被叫方,并路由返回结果或者错误,反之亦然。 两者不知道彼此:对等体驻留在哪里以及如何到达它,这些内容被封装在经销商中。

使用WAMP,被叫方在经销商处以抽象名称注册过程:标识过程的URI。 当调用者想要调用远程过程时,它与经销商谈话,并且仅提供要调用的过程的URI加上任何调用参数。 经销商将在他的注册程序书中查找要援引的程序。 书中的信息包括执行程序的Callee所在的位置,以及如何到达它。

实际上,Caller和Callees是分离的,应用程序可以使用RPC并仍然受益于松耦合。

WAMP调用路由器:路由 = 代理商 + 经销商,路由器能够路由呼叫,因此可以支持使用RPC和PubSub的灵活的解耦架构。

这里是一个例子。 想象一下,你有一个像Arduino Yun这样的小型嵌入式设备,具有传感器(如温度传感器)和致动器(如灯或电机)连接。 您希望将设备集成到整个系统中,用户面向前端以控制执行器,并连续处理后端组件中的传感器值。

WAMP-网络程序消息协议

 使用WAMP,可以有一个基于浏览器的UI及嵌入式设备和后端实时交谈,从基于浏览器的UI打开设备上的灯自然通过在设备(1)上调用远程过程来完成。 并且由设备连续生成的传感器值通过发布和订阅(2)自然地传输到后端组件(并且可能是其他组件)。

二、多语言化

WAMP的设计有一流的支持,支持不同的语言。 WAMP中没有要求特定于单个编程语言。 只要编程语言具有WAMP实现,它就可以与使用WAMP支持的任何其他语言编写的应用程序组件实现透明通信。

三、什么是RPC?什么是发布-订阅?

远程过程调用(RPC)是一种涉及三个角色的同级的消息模式:

呼叫者、被叫者、经销商

调用者通过提供过程URI和调用的任何参数来向远程过程发出调用。 被调者将使用提供的调用参数执行该过程,并将调用结果返回给调用者。

调用者注册它们向经销商提供的程序,呼叫者首先向经销商发起程序呼叫,经销商将来自呼叫者的呼叫路由实现到被调者,并将呼叫结果从被调者路由返回到呼叫者。

Caller和Callee通常运行应用程序代码,而经销商作为一个通用路由器,用于远程过程调用解耦Caller和Callees。

发布和订阅(PubSub)是一种包含三个角色的同行的消息模式:

发布者、订阅者、经纪人

发布者通过提供主题URI和事件的任何有效内容将主题发布,主题的订阅者将事件内容接收。

订阅者订阅他们对Brokers感兴趣的主题,发布者首先在Brokers发布。 代理将发布者传入的事件路由到订阅相应主题的订阅者。

发布者和订阅者通常运行应用程序代码,而代理作为通用路由器用于将发布者与订阅者分离。

下面是一个简单的WAMP应用小程序,搭建一个CrossBar服务器,开启服务器,就可以运行wamp小程序了。

发布者(Publish):

package ws.wamp.jawampa;

import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;

import rx.Scheduler;
import rx.Subscription;
import rx.functions.Action0;
import rx.functions.Action1;
import rx.schedulers.Schedulers;
import ws.wamp.jawampa.ApplicationError;
import ws.wamp.jawampa.Request;
import ws.wamp.jawampa.WampError;
import ws.wamp.jawampa.WampClient;
import ws.wamp.jawampa.WampClientBuilder;

public class Publish {
    
    String url;
    String realm;
    int flag = 0;
    WampClient client;
    
    Subscription addProcSubscription;
    Subscription counterPublication;
    Subscription onHelloSubscription;
    
    // Scheduler for this example
    ExecutorService executor = Executors.newSingleThreadExecutor();
    Scheduler rxScheduler = Schedulers.from(executor);
    
    static final int TIMER_INTERVAL = 1000; // 1s
    int counter = 0;
    
    Publish(String url, String realm) throws Exception {
        this.url = url;
        this.realm = realm;
    }
    
    
    public static void main(String[] args) throws Exception {
      if (args.length != 2) {
          System.out.println("Need 2 commandline arguments: Router URL and ream name");
          return;
      }
      String args1 ="ws://172.16.100.55:8080/ws";
      String args2 = "realm1";
      new Publish(args1, args2).run();
  }
  
  void run() {
      
      WampClientBuilder builder = new WampClientBuilder();
      try {
          builder.witUri(url)
                 .withRealm(realm)
                 .withInfiniteReconnects()
                 .withCloseOnErrors(true)
                 .withReconnectInterval(5, TimeUnit.SECONDS);
          client = builder.build();
      } catch (WampError e) {
          e.printStackTrace();
          return;
      }

      // Subscribe on the clients status updates
      client.statusChanged()
            .observeOn(rxScheduler)
            .subscribe(new Action1<WampClient.Status>() {
          @Override
          public void call(WampClient.Status t1) {
              System.out.println("Session status changed to " + t1);

              if (t1 == WampClient.Status.Connected) {
                  
               
                  // PUBLISH and CALL every second .. forever
                  counter = 0;
                  final int published = 4;
                  onHelloSubscription= client.publish("glf", published)
                                .observeOn(rxScheduler)
                                .subscribe(new Action1<Long>() {
                                  @Override
                                  public void call(Long t1) {
                                      System.out.println("published to 'oncounter' with counter " + published);
                                  }
                              }, new Action1<Throwable>() {
                                  @Override
                                  public void call(Throwable e) {
                                      System.out.println("Error during publishing to 'oncounter': " + e);
                                  }
                              });
                          
                           CALL a remote procedure
                            
                              flag =1;
                               System.out.println(flag);
                          counter++;
                     
              }
              else if (t1 == WampClient.Status.Disconnected) {
                  closeSubscriptions();
              }
          }
      }, new Action1<Throwable>() {
          @Override
          public void call(Throwable t) {
              System.out.println("Session ended with error " + t);
          }
      }, new Action0() {
          @Override
          public void call() {
              System.out.println("Session ended normally");
          }
      });

      client.open();
      
      waitUntilKeypressed();
      while (flag == 0){
          try {
              Thread.sleep(1000);
          } catch (InterruptedException e) {
              
              e.printStackTrace();
          }
          
          System.out.println("waiting");
          }
      System.out.println("Shutting down");
      closeSubscriptions();
      client.close();
      try {
          client.getTerminationFuture().get();
      } catch (Exception e) {}
      
      executor.shutdown();
  }
  
  void closeSubscriptions() {
      if (onHelloSubscription != null)
          onHelloSubscription.unsubscribe();
      onHelloSubscription = null;
      if (counterPublication != null)
          counterPublication.unsubscribe();
      counterPublication = null;
      if (addProcSubscription != null)
          addProcSubscription.unsubscribe();
      addProcSubscription = null;
  }
  
  private void waitUntilKeypressed() {
      try {
          System.in.read();
          while (System.in.available() > 0) {
              System.in.read();
          }
      } catch (IOException e) {
          e.printStackTrace();
      }
  }

}
View Code

相关文章:

  • 2021-12-23
  • 2022-02-08
  • 2021-11-29
  • 2021-09-21
  • 2022-01-03
  • 2019-08-14
  • 2022-01-05
猜你喜欢
  • 2021-06-19
  • 2021-09-24
  • 2021-05-12
  • 2021-07-07
  • 2022-12-23
  • 2021-12-14
  • 2021-07-24
相关资源
相似解决方案