如今的互联网已经朝着物联网时代埋进了,而要在物联网中进行开发,MQTT协议是绕不开的一环。今天icode9小编将为大家带来基于JAVA的MQTT协议开发操作,包括在Java项目中使用MQTT实现客户端与broker之间的连接、订阅、取消订阅、发布、接收消息等功能。大家一起来看看吧
添加依赖
本文的开发环境为:
-
构建工具:Maven
-
集成开发环境:IntelliJ IDEA
-
Java:JDK 1.8.0
我们将使用Eclipse Paho Java Client作为客户端,它是 Java 语言中使用最广泛的 MQTT 客户端库。
将以下依赖项添加到pom.xml
文件中。
org.eclipse.pahoorg.eclipse.paho.client.mqttv31.2.5" data-lang="" style="box-sizing: border-box;">
<dependencies> <dependency> <groupId>org.eclipse.paho</groupId> <artifactId>org.eclipse.paho.client.mqttv3</artifactId> <version>1.2.5</version> </dependency> </dependencies>
创建 MQTT 连接
MQTT 代理
本文将使用基于EMQX Cloud创建的公共 MQTT 服务器。服务器访问信息如下:
-
经纪商:broker.emqx.io
-
TCP端口:1883
-
SSL/TLS 端口:8883
连接
设置MQTT的基本连接参数。用户名和密码是可选的。
String broker = "tcp://broker.emqx.io:1883"; // TLS/SSL // String broker = "ssl://broker.emqx.io:8883"; String username = "emqx"; String password = "public"; String clientid = "publish_client";
然后创建一个 MQTT 客户端并连接到代理。
MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); client.connect(options);
指示:
-
MqttClient:MqttClient 提供了一组方法,一旦 MQTT 操作完成,这些方法就会阻止并将控制权返回给应用程序。
-
MqttClientPersistence:表示一个持久性数据存储,用于存储传输中的出站和入站消息,从而实现指定 QoS 的交付。
-
MqttConnectOptions:包含控制客户端如何连接到服务器的选项集。下面是一些常用的方法:
-
setUserName:设置用于连接的用户名。
-
setPassword:设置用于连接的密码。
-
setCleanSession:设置客户端和服务器是否应该在重新启动和重新连接时记住状态。
-
setKeepAliveInterval:设置“保持活动”间隔。
-
setConnectionTimeout:设置连接超时值。
-
setAutomaticReconnect:设置如果连接丢失,客户端是否会自动尝试重新连接到服务器。
-
使用 TLS/SSL 连接
如果要为 TLS/SSL 连接使用自签名证书,请将bcpkix-jdk15on添加到pom.xml
文件中。
org.bouncycastlebcpkix-jdk15on1.70" data-lang="" style="box-sizing: border-box;">
<!-- https://mvnrepository.com/artifact/org.bouncycastle/bcpkix-jdk15on --> <dependency> <groupId>org.bouncycastle</groupId> <artifactId>bcpkix-jdk15on</artifactId> <version>1.70</version> </dependency>
然后SSLUtils.java
使用以下代码创建文件。
0) { caCert = (X509Certificate) cf.generateCertificate(bis); } // load client certificate bis = new BufferedInputStream(new FileInputStream(crtFile)); X509Certificate cert = null; while (bis.available() > 0) { cert = (X509Certificate) cf.generateCertificate(bis); } // load client private key PEMParser pemParser = new PEMParser(new FileReader(keyFile)); Object object = pemParser.readObject(); JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); KeyPair key = converter.getKeyPair((PEMKeyPair) object); pemParser.close(); // CA certificate is used to authenticate server KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); caKs.load(null, null); caKs.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); tmf.init(caKs); // client key and certificates are sent to server so it can authenticate KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", cert); ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert}); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory .getDefaultAlgorithm()); kmf.init(ks, password.toCharArray()); // finally, create SSL socket factory SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return context.getSocketFactory(); } }" data-lang="" style="box-sizing: border-box;">
package io.emqx.mqtt; import org.bouncycastle.jce.provider.BouncyCastleProvider; import org.bouncycastle.openssl.PEMKeyPair; import org.bouncycastle.openssl.PEMParser; import org.bouncycastle.openssl.jcajce.JcaPEMKeyConverter; import javax.net.ssl.KeyManagerFactory; import javax.net.ssl.SSLContext; import javax.net.ssl.SSLSocketFactory; import javax.net.ssl.TrustManagerFactory; import java.io.BufferedInputStream; import java.io.FileInputStream; import java.io.FileReader; import java.security.KeyPair; import java.security.KeyStore; import java.security.Security; import java.security.cert.CertificateFactory; import java.security.cert.X509Certificate; public class SSLUtils { public static SSLSocketFactory getSocketFactory(final String caCrtFile, final String crtFile, final String keyFile, final String password) throws Exception { Security.addProvider(new BouncyCastleProvider()); // load CA certificate X509Certificate caCert = null; FileInputStream fis = new FileInputStream(caCrtFile); BufferedInputStream bis = new BufferedInputStream(fis); CertificateFactory cf = CertificateFactory.getInstance("X.509"); while (bis.available() > 0) { caCert = (X509Certificate) cf.generateCertificate(bis); } // load client certificate bis = new BufferedInputStream(new FileInputStream(crtFile)); X509Certificate cert = null; while (bis.available() > 0) { cert = (X509Certificate) cf.generateCertificate(bis); } // load client private key PEMParser pemParser = new PEMParser(new FileReader(keyFile)); Object object = pemParser.readObject(); JcaPEMKeyConverter converter = new JcaPEMKeyConverter().setProvider("BC"); KeyPair key = converter.getKeyPair((PEMKeyPair) object); pemParser.close(); // CA certificate is used to authenticate server KeyStore caKs = KeyStore.getInstance(KeyStore.getDefaultType()); caKs.load(null, null); caKs.setCertificateEntry("ca-certificate", caCert); TrustManagerFactory tmf = TrustManagerFactory.getInstance("X509"); tmf.init(caKs); // client key and certificates are sent to server so it can authenticate KeyStore ks = KeyStore.getInstance(KeyStore.getDefaultType()); ks.load(null, null); ks.setCertificateEntry("certificate", cert); ks.setKeyEntry("private-key", key.getPrivate(), password.toCharArray(), new java.security.cert.Certificate[]{cert}); KeyManagerFactory kmf = KeyManagerFactory.getInstance(KeyManagerFactory .getDefaultAlgorithm()); kmf.init(ks, password.toCharArray()); // finally, create SSL socket factory SSLContext context = SSLContext.getInstance("TLSv1.2"); context.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null); return context.getSocketFactory(); } }
设置options
如下。
String broker = "ssl://broker.emqx.io:8883"; // Set socket factory String caFilePath = "/cacert.pem"; String clientCrtFilePath = "/client.pem"; String clientKeyFilePath = "/client.key"; SSLSocketFactory socketFactory = getSocketFactory(caFilePath, clientCrtFilePath, clientKeyFilePath, ""); options.setSocketFactory(socketFactory);
发布 MQTT 消息
创建一个PublishSample
将向主题发布Hello MQTT
消息的类mqtt/test
。
package io.emqx.mqtt; import org.eclipse.paho.client.mqttv3.MqttClient; import org.eclipse.paho.client.mqttv3.MqttConnectOptions; import org.eclipse.paho.client.mqttv3.MqttException; import org.eclipse.paho.client.mqttv3.MqttMessage; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class PublishSample { public static void main(String[] args) { String broker = "tcp://broker.emqx.io:1883"; String topic = "mqtt/test"; String username = "emqx"; String password = "public"; String clientid = "publish_client"; String content = "Hello MQTT"; int qos = 0; try { MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(60); options.setKeepAliveInterval(60); // connect client.connect(options); // create message and setup QoS MqttMessage message = new MqttMessage(content.getBytes()); message.setQos(qos); // publish message client.publish(topic, message); System.out.println("Message published"); System.out.println("topic: " + topic); System.out.println("message content: " + content); // disconnect client.disconnect(); // close client client.close(); } catch (MqttException e) { throw new RuntimeException(e); } } }
订阅
创建一个SubscribeSample
将订阅主题的类mqtt/test
。
package io.emqx.mqtt; import org.eclipse.paho.client.mqttv3.*; import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence; public class SubscribeSample { public static void main(String[] args) { String broker = "tcp://broker.emqx.io:1883"; String topic = "mqtt/test"; String username = "emqx"; String password = "public"; String clientid = "subscribe_client"; int qos = 0; try { MqttClient client = new MqttClient(broker, clientid, new MemoryPersistence()); // connect options MqttConnectOptions options = new MqttConnectOptions(); options.setUserName(username); options.setPassword(password.toCharArray()); options.setConnectionTimeout(60); options.setKeepAliveInterval(60); // setup callback client.setCallback(new MqttCallback() { public void connectionLost(Throwable cause) { System.out.println("connectionLost: " + cause.getMessage()); } public void messageArrived(String topic, MqttMessage message) { System.out.println("topic: " + topic); System.out.println("Qos: " + message.getQos()); System.out.println("message content: " + new String(message.getPayload())); } public void deliveryComplete(IMqttDeliveryToken token) { System.out.println("deliveryComplete---------" + token.isComplete()); } }); client.connect(options); client.subscribe(topic, qos); } catch (Exception e) { e.printStackTrace(); } } }
Mqtt回调:
-
connectionLost(Throwable cause):当与服务器的连接丢失时调用此方法。
-
messageArrived(String topic, MqttMessage message):当消息从服务器到达时调用此方法。
-
deliveryComplete(IMqttDeliveryToken token):当消息的传递已完成并且已收到所有确认时调用。
测试
接下来,运行SubscribeSample
订阅mqtt/test
主题。然后运行PublishSample
发布关于mqtt/test
主题的消息。我们会看到发布者成功发布了消息,订阅者收到了消息。
至此我们就完成了使用Paho Java Client作为MQTT客户端连接公共MQTT服务器,实现消息的发布和订阅。更多的内容来自于icode9技术文章分享网站。
本站声明:
1. iCode9 技术分享网(下文简称本站)提供的所有内容,仅供技术学习、探讨和分享;
2. 关于本站的所有留言、评论、转载及引用,纯属内容发起人的个人观点,与本站观点和立场无关;
3. 关于本站的所有言论和文字,纯属内容发起人的个人观点,与本站观点和立场无关;
4. 本站文章均是网友提供,不完全保证技术分享内容的完整性、准确性、时效性、风险性和版权归属;如您发现该文章侵犯了您的权益,可联系我们第一时间进行删除;
5. 本站为非盈利性的个人网站,所有内容不会用来进行牟利,也不会利用任何形式的广告来间接获益,纯粹是为了广大技术爱好者提供技术内容和技术思想的分享性交流网站。
原创文章,作者:ItWorker,如若转载,请注明出处:https://blog.ytso.com/tech/pnotes/295093.html