注册

基于环信MQTT消息云,Java版MQTT客户端快速实现消息收发

本文介绍Java版MQTT 客户端,如何连接环信MQTT消息云快速实现消息的自收自发。

 

一、前提条件

1.部署Java开发环境

安装IDE。您可以使用IntelliJ IDEA或者Eclipse,本文以IntelliJ IDEA为例。
下载安装
JDK 

2.导入项目依赖

IntelliJ IDEA中创建工程,并确认pom.xml中包含以下依赖。


commons-codec
commons-codec
1.10


org.eclipse.paho
org.eclipse.paho.client.mqttv3
1.2.2


org.apache.httpcomponents
httpclient
4.5.2


com.alibaba
fastjson
1.2.76

二、实现流程

1、获取鉴权

     为保障客户安全性需求,环信MQTT消息云服务为客户提供【token+clientID】方式实现鉴权认证,其中AppID(clientID中的关键字段)及token标识获取流程如下:

【登录console】
     欢迎您登录环信云console控制台,在此控制台中,为您提供应用列表、解决方案、DEMO体验以及常见问题等功能。
     在应用列表中,若您未在APP中开通MQTT业务,可参见APP  MQTT开通流程
     若APP已开通MQTT业务,可在应用列表中选中Appname,点击【查看】操作,进入应用详情。

1786e8dc003a38164fddc2363cd9b7a2.jpg

【获取AppID及连接地址】 
      进入【查看】后,点击左侧菜单栏【MQTT】->【服务概览】,在下图红色方框内获取当前AppID及服务器连接地址。
dea235d9e3145f5da45f49cf786be234.jpg
【获取token】
     为实现对用户管控及接入安全性,环信云console提供用户认证功能,支持对用户账户的增、删、改、查以及为每个用户账户分配唯一token标识,获取token标识可选择以下两种形式。
  形式一:console控制台获取(管理员视角)
  * 点击左侧菜单栏【应用概览】->【用户认证】页面,点击【创建IM用户】按钮,增添新的账户信息(包  括用户名及密码)。
  * 创建成功后,在【用户ID】列表中选中账户,点击【查看token】按钮获取当前账户token信息。

90865c2e77224acc6fb3204195929b1d.jpg
  形式二:客户端代码获取(客户端视角)
  * 获取域名:点击左侧菜单栏【即时通讯】->【服务概览】页面,查看下图中token域名、org_name、app_name。

571de25f0e103c06faa2b2581a027280.jpg
  * 拼接URL:获取token URL格式为:http:/ /token域名/org_name/app_name/token。 
  * 用户名/密码:使用【用户ID】列表中已有账户的用户名及密码,例“用户名:test/密码:test123”。

客户端获取token代码示例如下:

public static void main() 
{
// 获取token的URL
http://{token域名}/{org_name}/{app_name}/token
// 获取token
String token = "";
// 取token
try (final CloseableHttpClient httpClient = HttpClients.createDefault())
{
final HttpPost httpPost = new HttpPost("http://{token域名}/{org_name}/{app_name}/token");
Map params = new HashMap<>();
params.put("grant_type", "password");
params.put("username", "test");
params.put("password", "test123");
//设置请求体参数
StringEntity entity = new StringEntity(JSONObject.toJSONString(params), Charset.forName("utf-8"));
entity.setContentEncoding("utf-8");
httpPost.setEntity(entity);
//设置请求头部
httpPost.setHeader("Content-Type", "application/json");
//执行请求,返回请求响应
try (final CloseableHttpResponse response = httpClient.execute(httpPost)
{
//请求返回状态码
int statusCode = response.getStatusLine().getStatusCode();
//请求成功
if (statusCode == HttpStatus.SC_OK && statusCode <= HttpStatus.SC_TEMPORARY_REDIRECT)
{
//取出响应体
final HttpEntity entity2 = response.getEntity();
//从响应体中解析出token
String responseBody = EntityUtils.toString(entity2, "utf-8");
JSONObject jsonObject = JSONObject.parseObject(responseBody);
token = jsonObject.getString("access_token");
}
else
{
//请求失败
throw new ClientProtocolException("请求失败,响应码为:" + statusCode);
}
}
}
catch (IOException e)
{
e.printStackTrace();
}
},>

返回结果

 {
"access_token": "YWMtN8a0oqV3EeuF0AmiqRgEh-grzF8zZk2Wp8GS3pF-orDW_F-gj3kR6os3h_oz3ROQAwMAAAF5BxhGlwBPGgAvTR8vDrdVsDPNZMQj0fFjv7EaohgZhzMHM9ncVLE30g",
"expires_in": 5184000,
"user":
{
"uuid": "d6fc5fa0-8f79-11ea-8b37-87fa33dd1390",
"type": "user",
"created": 1588756404898,
"modified": 1588756404898,
"username": "test",
"activated": true
}
}
access_token即为要获取的token

2、初始化

      在IntelliJ IDEA工程中创建MQTT客户端,客户端初始配置包括创建clientID,topic名称,QoS质量,连接地址等信息。

import org.eclipse.paho.client.mqttv3.*;
import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import static org.eclipse.paho.client.mqttv3.MqttConnectOptions.MQTT_VERSION_3_1_1;
public class MqttDemoStarter 
{
public static void main(String[] args) throws MqttException, InterruptedException {
/**
* 用户指定
* /
String deviceId = "xxxxx-xxxx-xxxxx-xxxxx-xxxxx";
/**
* 从console控制台获取
* /
String appId = "1NQ1E9";
/**
* 设置接入点,进入console控制台获取
*/
String endpoint = "1NQ1E9.sandbox.mqtt.chat";
/**
* MQTT客户端ID,由业务系统分配,需要保证每个TCP连接都不一样,保证全局唯一,如果不同的客户端对象(TCP连接)使用了相同的clientId会导致之前的连接断开。
* clientId由两部分组成,格式为DeviceID@appId,其中DeviceID由业务方自己设置,appId在console控制台创建,clientId总长度不得超过64个字符。
*/
String clientId = deviceId + "@" + appId;

/**
* 需要订阅或发送消息的topic名称
* 如果使用了没有创建或者没有被授权的Topic会导致鉴权失败,服务端会断开客户端连接。
*/
final String myTopic = "myTopic";

/**
* QoS参数代表传输质量,可选0,1,2。详细信息,请参见名词解释。
*/
final int qosLevel = 0;
final MemoryPersistence memoryPersistence = new MemoryPersistence();

/**
* 客户端协议和端口。客户端使用的协议和端口必须匹配,如果是ws或者wss,使用http://;如果是mqtt或者mqtts,使用tcp://
*/
final MqttClient mqttClient = new MqttClient("tcp://" + endpoint + ":1883", clientId, memoryPersistence);
/**
* 设置客户端发送超时时间,防止无限阻塞。
*/
mqttClient.setTimeToWait(5000);

final ExecutorService executorService = new ThreadPoolExecutor(1, 1, 0, TimeUnit.MILLISECONDS,
new LinkedBlockingQueue<>());
}

3、连接服务器

    配置连接密码、cleansession标志、心跳间隔、超时时间等信息,调用connect()函数连接至环信MQTT消息云。

MqttConnectOptions mqttConnectOptions = new MqttConnectOptions();
/**
* 用户名,在console中注册
*/
mqttConnectOptions.setUserName("test");
/**
* 用户密码为第一步中申请的token
*/
mqttConnectOptions.setPassword(token.toCharArray());
mqttConnectOptions.setCleanSession(true);
mqttConnectOptions.setKeepAliveInterval(90);
mqttConnectOptions.setAutomaticReconnect(true);
mqttConnectOptions.setMqttVersion(MQTT_VERSION_3_1_1);
mqttConnectOptions.setConnectionTimeout(5000);
mqttClient.connect(mqttConnectOptions);
//暂停1秒钟,等待连接订阅完成
Thread.sleep(1000);

4、订阅【subscribe】

【订阅主题】

    当客户端成功连接环信MQTT消息云后,需尽快向服务器发送订阅主题消息。

mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 连接完成回调方法
* @param b
* @param s
*/
@Override
public void connectComplete(boolean b, String s) {
/**
* 客户端连接成功后就需要尽快订阅需要的Topic。
*/
System.out.println("connect success");
executorService.submit(() -> {
try {
final String[] topicFilter = {myTopic};
final int[] qos = {qosLevel};
mqttClient.subscribe(topicFilter, qos);
} catch (Exception e) {
e.printStackTrace();
}
});
}
});

【取消订阅】

mqttClient.unsubscribe(new String[]{myTopic});

【接收消息】

    配置接收消息回调方法,从环信MQTT消息云接收订阅消息。

mqttClient.setCallback(new MqttCallbackExtended() {
/**
* 接收消息回调方法
* @param s
* @param mqttMessage
*/
@Override
public void messageArrived(String s, MqttMessage mqttMessage) {
System.out.println("receive msg from topic " + s + " , body is " + new String(mqttMessage.getPayload()));
}
});

5、发布【publish】

    配置发送消息回调方法,向环信MQTT消息云中指定topic发送消息。

for (int i = 0; i < 10; i++) {
/**
* 构建一个Mqtt消息
*/
MqttMessage message = new MqttMessage("hello world pub sub msg".getBytes());
//设置传输质量
message.setQos(qosLevel);
/**
* 发送普通消息时,Topic必须和接收方订阅的Topic一致,或者符合通配符匹配规则。
*/
mqttClient.publish(myTopic, message);
}

6、结果验证

connect success
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
send msg succeed topic is : myTopic
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg
receive msg from topic myTopic , body is hello world pub sub msg

三、更多信息

  * 完整demo示例,请参见demo下载

或直接下载:MQTTChatDemo- Java.zip

  * 目前MQTT客户端支持多种语言,请参见 SDK下载
  * 如果您在使用环信MQTT消息云服务中,有任何疑问和建议,欢迎您联系我们

 

0 个评论

要回复文章请先登录注册