【问题标题】:Java mqtt concurrent connections is taking timeJava mqtt 并发连接需要时间
【发布时间】:2019-04-16 18:41:33
【问题描述】:

我正在尝试与 MQTT 服务器建立并发会话,一旦建立所有连接,所有客户端将断开连接。

在下面的发布者代码中,我试图让每个并发会话发送 50 条消息。像这样将创建 500 个线程,每个线程将发送 50 条消息。但是创建 100 个连接需要 10 分钟。编码是否有任何错误,是否可以在下面的代码中降低连接速度的变化率,因为我在 Golang 中写过同样的事情,那里的连接率很高。

以下是发布者代码:

import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
import org.eclipse.paho.client.mqttv3.MqttSecurityException;
import org.eclipse.paho.client.mqttv3.MqttTopic;

public class Publisher extends Thread{

    public static final String test_topic = "test";
    private MqttClient client;
    public static final String BROKER_URL = "tcp://192.168.1.129:1883";
    CountDownLatch latch;

    public Publisher(CountDownLatch latch) {
      super();
      this.latch = latch;
     }

    public void Publisher() {
        String clientid=Thread.currentThread().getName();
        System.out.println("=========== "+clientid);
        MqttConnectOptions options = null;
        try {

             client = new MqttClient(BROKER_URL, clientid);
             options = new MqttConnectOptions();
            options.setCleanSession(true);
            options.setMaxInflight(50);
           client.connect(options);
        } catch (MqttException e) {
            try {
                client.connect(options);
            } catch (MqttSecurityException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            } catch (MqttException e1) {
                // TODO Auto-generated catch block
                e1.printStackTrace();
            }
            e.printStackTrace();
            //System.exit(1);
        }
    }

    @Override
    public void run() {
        // TODO Auto-generated method stub
        Publisher();
        System.out.println(Thread.currentThread().getName());
        try {
            for(int i=0;i<50;i++)
            {
            //Thread.sleep(20);
            publishTemperature();
            }


        } catch (MqttPersistenceException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } catch (MqttException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        } /*catch (InterruptedException e) {
            // TODO Auto-generated catch block
            e.printStackTrace();
        }*/
    }
    public void publishTemperature() throws MqttPersistenceException, MqttException {
        final MqttTopic test = client.getTopic(test_topic);
         final String temperature=""{\"test\":\"test\"}"";
         test.publish(new MqttMessage(temperature.getBytes()));

         //System.out.println("Published data. Topic: " + "test" + "  Message: " + temperature);

    }

    public MqttClient getClient() {
        return client;
    }

    public void setClient(MqttClient client) {
        this.client = client;
    }
}

下面是主要方法:

 import org.eclipse.paho.client.mqttv3.MqttException;
    import org.eclipse.paho.client.mqttv3.MqttPersistenceException;

    public class test {
        static Publisher[] Publisher=null;

        public static void main(String[] args) throws MqttPersistenceException, MqttException, InterruptedException {
            final CountDownLatch latch = new CountDownLatch(50);
            Publisher = new Publisher[500];
            for(int i=0;i<500;i++)
            {
                Thread.sleep(10);
                Publisher[i]=new Publisher(latch);
                Publisher[i].start();
            }
            latch.await();
            for(int i=0;i<500;i++)
            {
                //Thread.sleep(10);
                Publisher[i].getClient().disconnectForcibly(25);


            }

        }

    }

在这里,所有连接都将连接并建立持久连接,最多可达 500 个连接。之后,所有连接都会断开一次。

【问题讨论】:

    标签: java mqtt paho


    【解决方案1】:

    删除以下行:

    Publisher[i].join();
    

    Thread.join() 的文档如下:

    public final void join() 抛出 InterruptedException

    等待此线程终止。

    调用此方法的行为方式与 调用

    join(0)
    

    这意味着每次循环它都会停止并等待该线程完成它的任务,然后再创建新的。

    如果您删除该调用,它将允许所有线程并行运行。

    【讨论】:

    • 但在所有客户端连接并完成发送数据后,主线程应该继续。如果我删除 join 它甚至在子线程开始发布数据之前直接进入disconnet循环。
    • 然后在i == 499 时加入它,所以它只等待最后一个完成。或者只是让线程在完成后自行关闭。
    • 如果我给出 499 那么哪个线程最后完成将由 JVM 决定。并且main方法只有在所有发布完成后才能进入断开循环,我不能选择自己关闭线程。
    • 然后使用我提到的另一种方法。或者第三种选择是让每个线程在完成后增加一个计数器,并且仅在计数器达到 500 时才继续关闭连接
    • 我已经更新了代码请检查,我已经使用了 CountDownLatch 并且主线程正在等待所有线程完成但程序没有退出。
    猜你喜欢
    • 2017-04-01
    • 2012-03-17
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    • 1970-01-01
    相关资源
    最近更新 更多