【问题标题】:Flooding of message at side client from server channel and wrong message with CometD frame work来自服务器通道的客户端消息泛滥和 CometD 框架的错误消息
【发布时间】:2022-01-20 03:55:48
【问题描述】:

我正在开发一个客户端-服务器应用程序,我希望在客户端-服务器之间建立持久连接,因此我选择了 CometD 框架。 我成功创建了 CometD 应用程序。

客户 -

import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;

import org.cometd.bayeux.Channel;
import org.cometd.bayeux.Message;
import org.cometd.bayeux.client.ClientSessionChannel;
import org.cometd.client.BayeuxClient;
import org.cometd.client.transport.LongPollingTransport;
import org.eclipse.jetty.client.HttpClient;
import org.eclipse.jetty.util.ssl.SslContextFactory;

import com.synacor.idm.auth.LdapAuthenticator;
import com.synacor.idm.resources.LdapResource;

public class CometDClient {
    private volatile BayeuxClient client;
    private final AuthListner authListner = new AuthListner();
    private LdapResource ldapResource;
public static void main(String[] args) throws Exception {

    org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.LEVEL", "ERROR");
    org.eclipse.jetty.util.log.Log.getProperties().setProperty("org.eclipse.jetty.util.log.announce", "false");
    org.eclipse.jetty.util.log.Log.getRootLogger().setDebugEnabled(false);
    CometDClient client = new CometDClient();
client.run();
}

public void run()  {
    String url = "http://localhost:1010/cometd";
    HttpClient httpClient = new HttpClient();

    try {
        httpClient.start();
    } catch (Exception e) {
        // TODO Auto-generated catch block
        e.printStackTrace();
    }
    
    client = new BayeuxClient(url, new LongPollingTransport(null, httpClient));
    client.getChannel(Channel.META_HANDSHAKE).addListener(new InitializerListener());
    client.getChannel(Channel.META_CONNECT).addListener(new ConnectionListener());
    client.getChannel("/ldapAuth").addListener(new AuthListner());
    
    
    
    client.handshake();
    boolean success = client.waitFor(1000, BayeuxClient.State.CONNECTED);
    if (!success) {
        System.err.printf("Could not handshake with server at %s%n", url);
        return;
    }

}

private void initialize() {
    client.batch(() -> {

        
        ClientSessionChannel authChannel = client.getChannel("/ldapAuth");
        authChannel.subscribe(authListner);

    });
}

private class InitializerListener implements ClientSessionChannel.MessageListener {
    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        if (message.isSuccessful()) {
            initialize();
        }
    }
}

private class ConnectionListener implements ClientSessionChannel.MessageListener {
    private boolean wasConnected;
    private boolean connected;

    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        if (client.isDisconnected()) {
            connected = false;
            connectionClosed();
            return;
        }

        wasConnected = connected;
        connected = message.isSuccessful();
        if (!wasConnected && connected) {
            connectionEstablished();
        } else if (wasConnected && !connected) {
            connectionBroken();
        }
    }
}
private void connectionEstablished() {
    System.err.printf("system: Connection to Server Opened%n");
}

private void connectionClosed() {
    System.err.printf("system: Connection to Server Closed%n");
}

private void connectionBroken() {
    System.err.printf("system: Connection to Server Broken%n");
}


private class AuthListner implements ClientSessionChannel.MessageListener{

    @Override
    public void onMessage(ClientSessionChannel channel, Message message) {
        Object data2 = message.getData();
        System.err.println("Authentication String     " + data2 );
        if(data2 != null && data2.toString().indexOf("=")>0) {
        String[] split = data2.toString().split(",");
        String userString = split[0];
        String passString = split[1];
        String[] splitUser = userString.split("=");
        String[] splitPass = passString.split("=");
        LdapAuthenticator authenticator = new LdapAuthenticator(ldapResource);
        if(authenticator.authenticateToLdap(splitUser[1], splitPass[1])) {
//          client.getChannel("/ldapAuth").publish("200:success from client "+user);
//          channel.publish("200:Success "+user);
            Map<String, Object> data = new HashMap<>();
            // Fill in the structure, for example:
            data.put(splitUser[1], "Authenticated");
            channel.publish(data, publishReply -> {
                if (publishReply.isSuccessful()) {
                    System.out.print("message sent successfully on server");
                }
            });
        }
        }
        
    }
    
}

}

服务器 - 服务类

import java.util.List;
import java.util.concurrent.BlockingQueue;

import org.cometd.bayeux.MarkedReference;
import org.cometd.bayeux.Promise;
import org.cometd.bayeux.server.BayeuxServer;
import org.cometd.bayeux.server.ConfigurableServerChannel;
import org.cometd.bayeux.server.ServerChannel;
import org.cometd.bayeux.server.ServerMessage;
import org.cometd.bayeux.server.ServerSession;
import org.cometd.server.AbstractService;
import org.cometd.server.ServerMessageImpl;

import com.synacor.idm.resources.AuthenticationResource;
import com.synacor.idm.resources.AuthenticationResource.AuthC;


public class AuthenticationService extends AbstractService implements AuthenticationResource.Listener {

    String authParam;
    BayeuxServer bayeux;
    BlockingQueue<String> sharedResponseQueue;
    public AuthenticationService(BayeuxServer bayeux) {

        super(bayeux, "ldapagentauth");
        addService("/ldapAuth", "ldapAuthentication");  
        this.bayeux = bayeux;
    }
    public void ldapAuthentication(ServerSession session, ServerMessage message) {
        System.err.println("********* inside auth service ***********");
        Object data = message.getData();
        System.err.println("****** got data back from client " +data.toString());
        sharedResponseQueue.add(data.toString());
    }
    @Override
    public void onUpdates(List<AuthC> updates) {
        System.err.println("********* inside auth service listner ***********");

        MarkedReference<ServerChannel> createChannelIfAbsent = bayeux.createChannelIfAbsent("/ldapAuth", new ConfigurableServerChannel.Initializer() {
            public void configureChannel(ConfigurableServerChannel channel)
            {
                channel.setPersistent(true);
                channel.setLazy(true);
            }
        });
        ServerChannel reference = createChannelIfAbsent.getReference();
        for (AuthC authC : updates) {

            authParam = authC.getAuthStr();
            this.sharedResponseQueue= authC.getsharedResponseQueue();
            ServerChannel channel = bayeux.getChannel("/ldapAuth");
            ServerMessageImpl serverMessageImpl = new ServerMessageImpl();
            serverMessageImpl.setData(authParam);

            reference.setBroadcastToPublisher(false);
            reference.publish(getServerSession(), authParam, Promise.noop());
        }

    }


}

事件触发类-

public class AuthenticationResource implements Runnable{
      private final JerseyClientBuilder clientBuilder;
      private final BlockingQueue<String> sharedQueue; 
      private final BlockingQueue<String> sharedResponseQueue;
      private boolean isAuthCall = false; 
      private String userAuth;
        private final List<Listener> listeners = new CopyOnWriteArrayList<Listener>();
        Thread runner;

    public AuthenticationResource(JerseyClientBuilder clientBuilder,BlockingQueue<String> sharedQueue,BlockingQueue<String> sharedResponseQueue) {
        super();
        this.clientBuilder = clientBuilder;
        this.sharedQueue = sharedQueue;
        this.sharedResponseQueue= sharedResponseQueue;
          this.runner = new Thread(this);
            this.runner.start();
    }
  public List<Listener> getListeners()
  {
      return listeners;
  }
  

    @Override
    public void run() {
      List<AuthC> updates = new ArrayList<AuthC>();

//      boolean is =  true;
      while(true){
        if(sharedQueue.size()<=0) {
            continue;
        }
          try {
             userAuth  = sharedQueue.take();
             // Notify the listeners
             for (Listener listener : listeners)
               
             {
               updates.add(new AuthC(userAuth,sharedResponseQueue));
                 listener.onUpdates(updates);
             }
             updates.add(new AuthC(userAuth,sharedResponseQueue));
                  System.out.println("****** Auth consume ******** " +  userAuth);

             if(userAuth != null) {
               isAuthCall = true;
             }

          } catch (Exception err) {
             err.printStackTrace();
          break;
          }
//          if (sharedQueue.size()>0) {
//              is = false;
//          }
          
      } 

    }
    
  public static class AuthC
  {
      private final String authStr;
      private final BlockingQueue<String> sharedResponseQueue;

      public AuthC(String authStr,BlockingQueue<String> sharedResponseQueue)
      {
          this.authStr = authStr;
          this.sharedResponseQueue=sharedResponseQueue;

      }


      public String getAuthStr()
      {
          return authStr;
      }

      public BlockingQueue<String> getsharedResponseQueue()
      {
          return sharedResponseQueue;
      }

  }
    
  public interface Listener extends EventListener
  {
      void onUpdates(List<AuthC> updates);
  }

}

我已成功建立客户端和服务器之间的连接。 问题 -

1- 当我从服务器向客户端发送消息时,同一条消息被发送多次。我只期待一种请求-响应机制。 在我的情况下 - 服务器正在发送用户凭据,我期待结果,无论用户是否经过身份验证。

您可以在图像中看到它是如何在客户端使用相同的字符串泛滥的 -

2- 客户端和服务器之间的消息循环存在其他问题,我可以通过添加来解决,但仍有一些时间循环消息发生。

serverChannel.setBroadcastToPublisher(false);

3- 如果我更改服务器上的身份验证字符串,在客户端它似乎是旧的。 比如——

  • 来自服务器的 1 个请求 - 身份验证字符串 -> user=foo,pass=bar -> at 客户端 - user=foo,pass=bar
  • 来自服务器的 2 个请求 - 身份验证字符串 user=myuser,pass=mypass -> 在客户端 - user=foo,pass=bar

这是三个问题,请指导我并帮助我解决这个问题。

【问题讨论】:

    标签: java cometd


    【解决方案1】:

    CometD 在clientserver 上使用远程调用提供请求/响应式消息传递(您想在服务器上使用annotated services)。

    频道/ldapAuth 有2 个订阅者:远程客户端(使用authChannel.subscribe(...) 订阅)和服务器端AuthenticationService(使用addService("/ldapAuth", "ldapAuthentication") 订阅)。

    因此,每次您从AuthenticationService.onUpdates(...) 发布到该频道时,您都会发布到远程客户端,然后再返回到AuthenticationService,这就是为什么调用setBroadcastToPublisher(false) 会有所帮助。

    对于身份验证消息,最好坚持使用远程调用,因为它们具有自然的请求/响应语义,而不是广播语义。 请阅读how applications should interact with CometD

    关于其他循环,CometD 没有触发循环。 您的应用程序中有循环(在AuthenticationService.onUpdates(...) 中),并且您从可能多次具有相同信息的队列中获取(在AuthenticationResource.run() 中 - 顺便说一句,它是一个自旋循环,可能会将 CPU 内核旋转到 100 % 利用率——你应该解决这个问题)。

    您看到陈旧数据这一事实可能不是 CometD 的问题,因为 CometD 不会将消息存储在任何地方,因此它无法构成用户特定的数据。

    我建议您使用远程调用和带注释的服务来清理您的代码。 此外,从自旋循环中清理您自己的代码。

    如果您在上述建议后仍然存在问题,请更加仔细地查找应用程序错误,这不太可能是 CometD 问题。

    【讨论】:

    • @感谢您的建议,是的,我需要对代码进行一些重构。除此之外,我没有得到 AuthenticationService 订阅 /ldapAuth 频道的位置,请指出该代码。此外,无法在 CometD 文档中看到服务器端的远程调用实现。
    • 我用您询问的缺失信息更新了回复。
    • 感谢您的更新。我有一个不同的用例,这就是我不使用远程调用的原因。我有 CometD 客户端,它将驻留在 prim LDAP 机器上,它将连接到云上的 CometD 服务器。来自云端,这意味着来自 CometD 服务器,我将请求使用 prim ldap 实例向客户端验证用户身份,现在我需要通过同一通道将身份验证状态发送回服务器。(CometD 服务器-> req Auth 用户-> 客户端)...(客户端-> resp auth用户->服务器)。是否可以使用远程调用来实现,或者在 CometD 中有没有其他方法可以实现这一点
    • 从客户端发起远程调用。您可以使用服务通道和ServerSession.deliver(...) 来实现您的用例。
    猜你喜欢
    • 1970-01-01
    • 2017-06-18
    • 1970-01-01
    • 2013-03-28
    • 2015-02-25
    • 1970-01-01
    • 1970-01-01
    • 2012-01-21
    • 1970-01-01
    相关资源
    最近更新 更多