本文的实现是在 << 一、Mosquitto 介绍&安装>> << 二、 Mosquitto 的使用说明 >> 两篇文章搭建好 Mosquitto 服务基础上实现的。如果你还没有搭建 Mosquitto 服务 请参考我上述两篇文章进行 Mosquitto 服务的搭建。
Java 实现 Mosquitto 的客户端主要使用 Eclipse Paho Java Client 提供的 SDK 来实现的。有兴趣的可以直接去 Eclipse Paha 官网下载对应的sdk 和使用说明。
一、 准备工作
本本讲解项目是Maven项目、如果还有对 Maven 不了解或者不熟悉的同学可以网上去学习下、本文不在这讲解 Maven 的使用。
添加依赖
<dependency>
<groupId>org.eclipse.paho</groupId>
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.0.2</version>
</dependency>
二、本文实现 Mosquitto 消息发送主要分为三个类
1> ClientMQTT 客户端类
2> PushCallback 消息回调类
3> ServerMQTT 服务端类
四、 下面将直接上对应的 code
1> 客户端
1 import java.util.concurrent.ScheduledExecutorService; 2 3 import org.eclipse.paho.client.mqttv3.MqttClient; 4 import org.eclipse.paho.client.mqttv3.MqttConnectOptions; 5 import org.eclipse.paho.client.mqttv3.MqttException; 6 import org.eclipse.paho.client.mqttv3.MqttTopic; 7 import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; 8 9 public class ClientMQTT { 10 11 public static final String HOST = "tcp://172.16.192.102:1883"; 12 public static final String TOPIC = "root/topic/123"; 13 private static final String clientid = "client11"; 14 private MqttClient client; 15 private MqttConnectOptions options; 16 private String userName = "admin"; 17 private String passWord = "admin"; 18 19 private ScheduledExecutorService scheduler; 20 21 private void start() { 22 try { 23 // host为主机名,clientid即连接MQTT的客户端ID,一般以唯一标识符表示,MemoryPersistence设置clientid的保存形式,默认为以内存保存 24 client = new MqttClient(HOST, clientid, new MemoryPersistence()); 25 // MQTT的连接设置 26 options = new MqttConnectOptions(); 27 // 设置是否清空session,这里如果设置为false表示服务器会保留客户端的连接记录,这里设置为true表示每次连接到服务器都以新的身份连接 28 options.setCleanSession(true); 29 // 设置连接的用户名 30 options.setUserName(userName); 31 // 设置连接的密码 32 options.setPassword(passWord.toCharArray()); 33 // 设置超时时间 单位为秒 34 options.setConnectionTimeout(10); 35 // 设置会话心跳时间 单位为秒 服务器会每隔1.5*20秒的时间向客户端发送个消息判断客户端是否在线,但这个方法并没有重连的机制 36 options.setKeepAliveInterval(20); 37 // 设置回调 38 client.setCallback(new PushCallback()); 39 MqttTopic topic = client.getTopic(TOPIC); 40 // setWill方法,如果项目中需要知道客户端是否掉线可以调用该方法。设置最终端口的通知消息 41 options.setWill(topic, "close".getBytes(), 2, true); 42 43 client.connect(options); 44 // 订阅消息 45 int[] Qos = { 1 }; 46 String[] topic1 = { TOPIC }; 47 client.subscribe(topic1, Qos); 48 49 } catch (Exception e) { 50 e.printStackTrace(); 51 } 52 } 53 54 public static void main(String[] args) throws MqttException { 55 ClientMQTT client = new ClientMQTT(); 56 client.start(); 57 } 58 }