通信协议:MQTT
2026-04-04 13:00:10一、MQTT通信
1. 工业互联数据交换协议
- HTTP:短连接 长连接
- CoAP (Constrained Application Protocol),受限应用协议,应用于无线传感网中协议。
- MQTT (Message Queuing Telemetry Transport ),消息队列遥测传输,由 IBM 开发的即时通讯协议。
- DDS(Data Distribution Service for Real-Time Systems),面向实时系统的数据分布服务。
- AMQP(Advanced Message Queuing Protocol),先进消息队列协议 RabbitMQ
- XMPP(Extensible Messaging and Presence Protocol)可扩展通讯和表示协议 XML
- JMS (Java Message Service),即消息服务,这是JAVA平台中著名的消息队列协议
2. MQTT
- 一种基于发布/订阅模式的“轻量级”通讯协议,该协议构建于TCP/IP协议上,由IBM在1999年发布。
- 极少的数据完成远程通信
核心内容:
- MQTT定义:轻量级发布/订阅消息协议,适用于低带宽、高延迟网络。
- 核心特点:低开销、异步通信、支持QoS(0/1/2)。
- 适用场景:物联网设备通信、工业设备监控(如MES中的设备状态上报)。
通信过程
- 服务端/Broker
- 客户端(App/Device)
通信概念:登录注册(Client ID)、订阅、主题、负载、发布、消息、服务质量
通信细节:
- 基于TCP协议的应用层协议
- 固定报头
- 可变报头
- 载荷内容
3. 固定报头

如:0010 0100
- 消息类型(第一个字节的高4位)

- 标志位(第一个字节的低4位)

- 消息长度:长度扩展,最大长度256M数据

4. 可变报头

- 连接标志

- 报文标识符(2字节) ,发布、订阅、取消订阅
5. 通信载荷

6. MQTT主题过滤
主题层级分隔符:“/”,用于将结构化引入主题名 A套/客厅/电视、A套/客厅/空调
通配符: “#”、“+”、“$”,作用:订阅的主题过滤器可以包含特殊的通配符,允许你一次订阅多个主题。
种类:
# 当前节点下的所有节点,包括父级
例:A/# 包括A、A/B、A/B/C 异常:A/#/C无效
+ 当前节点下的所有节点,单层匹配 (强制一级)
例:A/+ 包括A/B、A/C、A/D 不匹配A、A/B/C、A/B/D/E
有效:+、+/A/#、A/+/B、
无效:A+
$ $SYS/ 一般应用层面不会使得,服务特定信息或控制接口主题前缀
约定:由服务端使用
- 主题规则
- 所有的主题名和主题过滤器必须至少包含一个字符 。
- 主题名和主题过滤器是区分大小写的。 A a 两个不同的主题
- 主题名和主题过滤器可以包含空格。 Hello World
- 主题名或主题过滤器以前置或后置斜杠 “/” 区分。 /A A/
- 只包含斜杠 “/” 的主题名或主题过滤器是合法的。
- 主题名和主题过滤器不能包含空字符 (Unicode U+0000)
- 主题名和主题过滤器是UTF-8编码字符串,它们不能超过65535字节
7. MQTT通信问题整理
- 如果收到非法的标志,接收者关闭网络连接。
- SUBSCRIBE,UNSUBSCRIBE和PUBLISH(QoS大于0)控制报文必须包含一个非零的16位报文标识符(Packet Identifier)。QoS设置为0的PUBLISH报文不能包含报文标识符。
- 客户端到服务端的网络连接建立后,客户端发送给服务端的第一个报文必须是CONNECT报文。在一个网络连接上,客户端只能发送一次CONNECT报文。服务端必须将客户端发送的第二个CONNECT报文当作协议违规处理并断开客户端的连接。
- 服务端必须验证CONNECT控制报文的保留标志位(第0位)是否为0,如果不为0必须断开客户端连接。
- 服务端必须允许1到23个字节长的UTF-8编码的客户端标识符,客户端标识符只能包含大写字母,小写字母和数字。
二、报文通信实例
1. 连接服务器

//发送连接请求
private static void Connect(Socket socket)
{
// Connect握手-》发送ClientID 用户名 密码
List<byte> bytes = new List<byte>();
// 固定报头
{
byte byte1 = 1 << 4;// 直接写8? 发送 0011 1100
bytes.Add(byte1);
//
// byte byte2 = 0;// 暂时无法确定 一个字节最多表示255个长度
// 特殊 超出255字节
// 1000 0000 1000 0000 1000 0000 127
// 最高位 标记位 如果是1 表示后面一个字节也是长度值
// 如果是123个字节 7B 200 300
// 1Byte 0 -127 0x00-0x7F
// 2Byte 128-16383 0x80 0x01 - 0xFF 0x7F
}
List<byte> byte_list = new List<byte>();
// 可变报头
{
// 协议信息
string p_str = "MQTT";
byte[] p_bytes = Encoding.ASCII.GetBytes(p_str);
byte_list.Add((byte)(p_bytes.Length / 256));
byte_list.Add((byte)(p_bytes.Length % 256));
byte_list.AddRange(p_bytes);
// 版本号 一个字节
byte_list.Add(0x04);// 3.1.1版本协议,如果是5.0版本协议:0x05
// 连接标志:一个字节
{
byte flag = 0;
flag |= 128;// 将用户名标记置1
flag |= 64;// 将密码标记置1
// 遗嘱:
// (场景:客户端A连接时,建立遗嘱;异常断线,遗嘱有效
// 包含:主题名/数据 "offline"/"123")
// 客户端B 订阅 offline
// 客户端断线
// 客户端B接收到A的遗嘱
// 客户端C上线,并订阅了offline
flag |= 4;// 将使用遗嘱标记置1
// 清理会话
flag |= 2; // 将Clean Session标记置1
byte_list.Add(flag);
}
// Keep Alive 秒:
// 服务端如果要判断标准:
// 1、指客户端两次通信间隔时间
// 2、100 * 1.5 时间范围
ushort second = 100;
byte_list.Add((byte)(second / 256));
byte_list.Add((byte)(second % 256));
}
// 负载信息
{
// Client ID
string client_id = "liuyz";
byte[] ci_bytes = Encoding.ASCII.GetBytes(client_id);
byte_list.Add((byte)(ci_bytes.Length / 256));
byte_list.Add((byte)(ci_bytes.Length % 256));
byte_list.AddRange(ci_bytes);
// 根据可变报头中的标记位,需要:
// 添加遗嘱主题信息
string will_topic_str = "offline";
byte[] wt_bytes = Encoding.ASCII.GetBytes(will_topic_str);
byte_list.Add((byte)(wt_bytes.Length / 256));
byte_list.Add((byte)(wt_bytes.Length % 256));
byte_list.AddRange(wt_bytes);
// 添加遗嘱消息信息
string will_msg_str = "AAA123";
byte[] wm_bytes = Encoding.ASCII.GetBytes(will_msg_str);
byte_list.Add((byte)(wm_bytes.Length / 256));
byte_list.Add((byte)(wm_bytes.Length % 256));
byte_list.AddRange(wm_bytes);
// 添加用户名信息
string user_str = "admin";
byte[] u_bytes = Encoding.ASCII.GetBytes(user_str);
byte_list.Add((byte)(u_bytes.Length / 256));
byte_list.Add((byte)(u_bytes.Length % 256));
byte_list.AddRange(u_bytes);
// 添加密码信息
string pwd_str = "123456";
byte[] p_bytes = Encoding.ASCII.GetBytes(pwd_str);
byte_list.Add((byte)(p_bytes.Length / 256));
byte_list.Add((byte)(p_bytes.Length % 256));
byte_list.AddRange(p_bytes);
}
// 拼接长度数据
int len = byte_list.Count;
byte[] len_bytes = new byte[] { (byte)byte_list.Count };
bytes.AddRange(len_bytes);// 长度信息
bytes.AddRange(byte_list);// 可变报头+载荷信息
//socket.Send(bytes.ToArray());// 发送MQTT的请求连接报文
SendAndReceive(bytes.ToArray(), socket);
}
//辅助方法
static Random rnd = new Random();
Socket socket = new Socket(AddressFamily.InterNetwork, SocketType.Stream, ProtocolType.Tcp);
socket.Connect("127.0.0.1", 1883);// 连接到服务器
static byte[] SendAndReceive(byte[] bytes, Socket socket)
{
socket.Send(bytes);
List<byte> byteList = new List<byte>();
byte[] respBytes = new byte[2];// 固定报头
socket.Receive(respBytes, 0, 2, SocketFlags.None);
//byteList.AddRange(respBytes);
//for (int i = 0; i < 3; i++)
//{
// if ((byteList[byteList.Count - 1] & 0x80) != 0)
// {
// respBytes = new byte[1];// 固定报头
// socket.Receive(respBytes, 0, 1, SocketFlags.None);
// byteList.Add(respBytes[0]);
// }
// else
// break;
//}
//byte[] lenBytes = byteList.GetRange(1, byteList.Count - 1).ToArray();
long len = respBytes[1];// LengthDecode(lenBytes);// 当前报文剩余字节的数量
byte[] dataBytes = new byte[len];// 可变报头+有效载荷的字节
socket.Receive(dataBytes, 0, dataBytes.Length, SocketFlags.None);
byteList.AddRange(dataBytes);
return byteList.ToArray();// 将完整响应报文返回
}
// 从一个十进制数转换成字节
static byte[] LengthEncode(int len)
{
List<byte> bytes = new List<byte>();
ulong rc = 0;
byte d;
do
{
d = (byte)(len % 128);
len /= 128;
if (len > 0)
d |= 128;
bytes.Add(d);
} while (len > 0);
return bytes.ToArray();
}
// 根据字节转换成数字
static long LengthDecode(byte[] bytes)
{
byte encodeByte;
uint multiplier = 1;
long rc = 0;
int i = 0;
do
{
encodeByte = bytes[i++];
rc += (encodeByte & 127) * multiplier;
if (multiplier > 128 * 128 * 128)
break;
else
multiplier *= 128;
} while ((encodeByte & 128) != 0);
return rc;
}
//心跳
private static void Ping(Socket socket)
{
Task.Run(async () =>
{
while (true)
{
await Task.Delay(1000);
// 心跳请求,只有固定报头
List<byte> ping_bytes = new List<byte>();
byte byte1 = 12 << 4;// 消息类型 12
ping_bytes.Add(byte1);
ping_bytes.Add(0x00);
socket.Send(ping_bytes.ToArray());
}
});
}
//断开连接
private static void Disconnect(Socket socket)
{
List<byte> reqBytes = new List<byte>();
byte byte1 = 14 << 4;
reqBytes.Add(byte1);
// 剩余字节长度
reqBytes.Add(0);
socket.Send(reqBytes.ToArray()); ;
}
2. 订阅

private static void Subscribte(Socket socket)
{
List<byte> reqBytes = new List<byte>();
// 固定报头
byte byte1 = 8 << 4;
// 位运算
// 将8这个值左移4位?
// 0000 1000 -> 1000 0000
// 13 -> 1101 0000
byte1 |= 2;// 固定写法,字节的bit1置1
// | 或运算 & 与运算
// 1000 0000
// 0000 0010
// 1000 0010 -> 或运算结果
reqBytes.Add(byte1);
List<byte> bytes = new List<byte>();
// 可变报头(报文标识符 2byte)
ushort pid = (ushort)rnd.Next(100, 1000);
bytes.Add((byte)(pid / 256));
bytes.Add((byte)(pid % 256));
// 载荷信息
// 订阅主题名称
string topic = "a/b"; // "a/#"
byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
// QoS 服务质量(0,1,2) 不能超过2
// 0:最多发送成功一次
// 1:最少发送成功一次
// 2:确保发送成功一次
bytes.Add(1);
topic = "a/d";
t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
// QoS 2
bytes.Add(2);
topic = "a/f";
t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
// QoS 0
bytes.Add(0);
// 添加报文长度 最大256M
reqBytes.AddRange(LengthEncode(bytes.Count));
reqBytes.AddRange(bytes);
//socket.Send(reqBytes.ToArray());
SendAndReceive(reqBytes.ToArray(), socket);
}
3. 取消订阅

private static void Unsubscribte(Socket socket)
{
List<byte> reqBytes = new List<byte>();
byte byte1 = 10 << 4;
byte1 |= 2;// 固定写法,字节的bit1置1
reqBytes.Add(byte1);
// 可变报头+有效载荷
List<byte> bytes = new List<byte>();
{
// 可变报头(报文标识符 2byte)
ushort pid = (ushort)rnd.Next(100, 1000);
bytes.Add((byte)(pid / 256));
bytes.Add((byte)(pid % 256));
string topic = "a/b";
byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
topic = "z/x";
t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
}
reqBytes.AddRange(LengthEncode(bytes.Count));
reqBytes.AddRange(bytes);
SendAndReceive(reqBytes.ToArray(), socket);
}
4. 发布
- QoS-0(可能对方收不到,最多只收一次)

private static void Publish_0(Socket socket)
{
List<byte> reqBytes = new List<byte>();
byte byte1 = 3 << 4;// 功能码
// 如里需要服务端进行消息的保存,将bit0置1 RETAIN标记
//byte1 |= 1;
// 如果需要设备发存消息的QoS等级,设置bit1 - 2两个位,最大为2
int qos = 0;
byte1 |= (byte)(qos << 1);
// 如果消息是重发的,设备bit3为1
reqBytes.Add(byte1);
// 可变报头+有效载荷
List<byte> bytes = new List<byte>();
{
// 添加主题
string topic = "t001";
byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
// 添加消息内容
string msg = "Hello MQTT - 0";
byte[] m_bytes = Encoding.UTF8.GetBytes(msg);
bytes.AddRange(m_bytes);
}
reqBytes.AddRange(LengthEncode(bytes.Count));
reqBytes.AddRange(bytes);
socket.Send(reqBytes.ToArray());
}
- QoS-1(最少收一次)

private static void Publish_1(Socket socket)
{
List<byte> reqBytes = new List<byte>();
byte byte1 = 3 << 4;// 功能码
// 如里需要服务端进行消息的保存,将bit0置1
//byte1 |= 1;
// 如果需要设备发存消息的QoS等级,设置bit1 - 2两个位,最大为2
int qos = 1;
byte1 |= (byte)(qos << 1);
// 如果消息是重发的,设备bit3为1
reqBytes.Add(byte1);
// 可变报头+有效载荷
List<byte> bytes = new List<byte>();
{
// 添加主题
string topic = "t001";
byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
// 添加PID
ushort pid = (ushort)rnd.Next(100, 1000);
bytes.Add((byte)(pid / 256));
bytes.Add((byte)(pid % 256));
// 添加消息内容
string msg = "Hello MQTT - 1";
byte[] m_bytes = Encoding.UTF8.GetBytes(msg);
bytes.AddRange(m_bytes);
}
reqBytes.AddRange(LengthEncode(bytes.Count));
reqBytes.AddRange(bytes);
SendAndReceive(reqBytes.ToArray(), socket); // 接收PubAck的响应 功能码:4
}
- QoS-2(保证消息发送并只接收一次)

private static void Publish_2(Socket socket)
{
List<byte> reqBytes = new List<byte>();
byte byte1 = 3 << 4;// 功能码
// 如里需要服务端进行消息的保存,将bit0置1
//byte1 |= 1;
// 如果需要设备发存消息的QoS等级,设置bit1 - 2两个位,最大为2
int qos = 2;
byte1 |= (byte)(qos << 1);
// 如果消息是重发的,设备bit3为1
reqBytes.Add(byte1);
ushort pid = (ushort)rnd.Next(100, 1000);
// 可变报头+有效载荷
List<byte> bytes = new List<byte>();
{
// 添加主题
string topic = "t001";
byte[] t_bytes = Encoding.ASCII.GetBytes(topic);
bytes.Add((byte)(t_bytes.Length / 256));
bytes.Add((byte)(t_bytes.Length % 256));
bytes.AddRange(t_bytes);
// 添加PID
bytes.Add((byte)(pid / 256));
bytes.Add((byte)(pid % 256));
// 添加消息内容
string msg = "Hello MQTT - 2";
byte[] m_bytes = Encoding.UTF8.GetBytes(msg);
bytes.AddRange(m_bytes);
}
reqBytes.AddRange(LengthEncode(bytes.Count));
reqBytes.AddRange(bytes);
SendAndReceive(reqBytes.ToArray(), socket);// 接收PubRec响应报文 功能码:5
// 发送PubRel报文 6
{
reqBytes = new List<byte>();
byte1 = 6 << 4;// 功能码
byte1 |= 2;
reqBytes.Add(byte1);
// 剩余字节长度 2
reqBytes.Add(2);
// 添加pid
reqBytes.Add((byte)(pid / 256));
reqBytes.Add((byte)(pid % 256));
SendAndReceive(reqBytes.ToArray(), socket);// 接收PubComp响应报文 功能:7
}
}
QoS及信息交互过程
- QoS 0:最多一次的传输:定时监听点位(5秒获取)

- QoS 1:至少一次的传输、至多无限次:停车场入场

- QoS 2:有且仅有一次的传输:停车场出场 订单支付

三、第三方库(MQTTnet)的使用示例
1. 客户端
internal class Program
{
static IMqttClient client = null;
static void Main(string[] args)
{
Console.WriteLine("Hello, World!");
ClientTest();
Console.ReadLine();
}
static async void ClientTest()
{
// MQTT客户端
client = new MqttClientFactory().CreateMqttClient();
MqttClientOptions options = new MqttClientOptionsBuilder()
.WithProtocolVersion(MQTTnet.Formatter.MqttProtocolVersion.V311)
.WithTcpServer("127.0.0.1", 1883)
.WithClientId("mqttnet_id")
.WithCredentials("admin", "123456")
.WithWillTopic("offline")
.WithWillPayload("掉线喽")
.WithWillQualityOfServiceLevel(MQTTnet.Protocol.MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
client.ConnectedAsync += Client_ConnectedAsync;
client.ApplicationMessageReceivedAsync += Client_ApplicationMessageReceivedAsync;
client.DisconnectedAsync += Client_DisconnectedAsync;
MqttClientConnectResult result = await client.ConnectAsync(options);
if (result.ResultCode == MqttClientConnectResultCode.Success)
{
Task.Run(async() =>
{
while (client.IsConnected)
{
await Task.Delay(5000);
await client.TryPingAsync();
}
});
// 订阅
// QoS - 2
// 第一种方式
// topic01/# : topic01/a topic01/a/b/c/d/e/f
await client.SubscribeAsync("topic01", MqttQualityOfServiceLevel.ExactlyOnce);
// 第二种方式
//MqttTopicFilter filter = new MqttTopicFilterBuilder()
// .WithTopic("topic11")
// .WithAtLeastOnceQoS()
// .Build();
//await client.SubscribeAsync(filter);
// 第三种方式
//MqttClientSubscribeOptions options1 = new MqttClientSubscribeOptionsBuilder()
// .WithTopicFilter(filter)
// .Build();
//await client.SubscribeAsync(options1);
// 取消订阅
//await client.UnsubscribeAsync("topic11");
// 发布
// QoS - 1
MqttApplicationMessage message = new MqttApplicationMessageBuilder()
.WithTopic("topic02")
.WithPayload("Hello MQTTnet")
.WithQualityOfServiceLevel(MqttQualityOfServiceLevel.AtLeastOnce)
.Build();
await client.PublishAsync(message);
// 断开连接
//await client.DisconnectAsync();
}
}
private static Task Client_DisconnectedAsync(MqttClientDisconnectedEventArgs arg)
{
return Task.CompletedTask;
}
private static Task Client_ApplicationMessageReceivedAsync(MqttApplicationMessageReceivedEventArgs arg)
{
// 获取相关的主题 、负载信息
var id = arg.ClientId;
var topic = arg.ApplicationMessage.Topic;
var msg = Encoding.UTF8.GetString(arg.ApplicationMessage.Payload.ToArray());
return Task.CompletedTask;
}
private static Task Client_ConnectedAsync(MqttClientConnectedEventArgs arg)
{
if (client == null) return Task.CompletedTask;
// QoS - 2
//MqttClientSubscribeResult result = client.SubscribeAsync("topic02", MqttQualityOfServiceLevel.ExactlyOnce).GetAwaiter().GetResult();
return Task.CompletedTask;
}
}
2. 服务器端
internal class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello, World!");
Server();
Console.ReadLine();
}
static async void Server()
{
MqttServerOptions options = new MqttServerOptionsBuilder()
.WithDefaultEndpoint()// 127.0.0.1:1883
.Build();
MqttServer server = new MqttServerFactory().CreateMqttServer(options);
// 服务器启动状态
server.StartedAsync += Server_StartedAsync;
// 登录验证
server.ValidatingConnectionAsync += Server_ValidatingConnectionAsync;
// 检查哪个客户端接入
server.ClientConnectedAsync += Server_ClientConnectedAsync;
server.ClientDisconnectedAsync += Server_ClientDisconnectedAsync;
// 检查订阅信息
server.ClientSubscribedTopicAsync += Server_ClientSubscribedTopicAsync;
server.ClientUnsubscribedTopicAsync += Server_ClientUnsubscribedTopicAsync;
// 检查消息
server.ApplicationMessageEnqueuedOrDroppedAsync += Server_ApplicationMessageEnqueuedOrDroppedAsync;
// 拦截
server.InterceptingPublishAsync += Server_InterceptingPublishAsync;
await server.StartAsync();
Console.ReadLine();
// 基于服务对象发布消息
//server.InjectApplicationMessage("", "");
MqttApplicationMessage msg = new MqttApplicationMessageBuilder()
.WithTopic("topic02")
.WithPayload("Hello server!")
.Build();
await server.InjectApplicationMessage(new InjectedMqttApplicationMessage(msg)
{
SenderClientId = "Server"
});
Console.ReadLine();
// 强制客户端退出
var client_list = (await server.GetClientsAsync());
MqttClientStatus client = client_list.FirstOrDefault(c => c.Id == "aaaa");
if (client != null)
await client.DisconnectAsync();
Console.ReadLine();
// 停止服务
await server.StopAsync();
}
private static Task Server_InterceptingPublishAsync(InterceptingPublishEventArgs arg)
{
//arg.ApplicationMessage.Topic += "/sub002";
return Task.CompletedTask;
}
private static Task Server_ApplicationMessageEnqueuedOrDroppedAsync(ApplicationMessageEnqueuedEventArgs arg)
{
Console.WriteLine($"转发消息:{arg.SenderClientId} - {arg.ReceiverClientId}");
Console.WriteLine($"\t:{arg.ApplicationMessage.Topic} - {Encoding.UTF8.GetString(arg.ApplicationMessage.Payload)} - {arg.ApplicationMessage.QualityOfServiceLevel}");
//arg.SenderClientId
//arg.ReceiverClientId
//arg.ApplicationMessage.Topic
//arg.ApplicationMessage.Payload -- byte[] -> string
return Task.CompletedTask;
}
private static Task Server_ClientUnsubscribedTopicAsync(ClientUnsubscribedTopicEventArgs arg)
{
Console.WriteLine($"客户端取消订阅:{arg.ClientId} - {arg.TopicFilter}");
//arg.TopicFilter
return Task.CompletedTask;
}
private static Task Server_ClientSubscribedTopicAsync(ClientSubscribedTopicEventArgs arg)
{
Console.WriteLine($"客户端订阅主题:{arg.ClientId} - {arg.TopicFilter.Topic}");
//arg.TopicFilter.Topic
return Task.CompletedTask;
}
private static Task Server_ClientDisconnectedAsync(ClientDisconnectedEventArgs arg)
{
Console.WriteLine("客户端断开连接:" + arg.ClientId);
//arg.ClientId
return Task.CompletedTask;
}
private static Task Server_ClientConnectedAsync(ClientConnectedEventArgs arg)
{
Console.WriteLine("客户端连接成功:" + arg.ClientId);
//arg.ClientId
//arg.UserName
//arg.Password
return Task.CompletedTask;
}
private static Task Server_ValidatingConnectionAsync(ValidatingConnectionEventArgs arg)
{
Console.WriteLine("正在验证客户端登录:" + arg.ClientId);
//arg.ClientId
//arg.ClientCertificate
//arg.UserName
//arg.Password
// 检查用户名、密码的正确性,ClientId
if (arg.UserName != "admin" || arg.Password != "123456")
{
arg.ReasonCode = MQTTnet.Protocol.MqttConnectReasonCode.BadUserNameOrPassword;
Console.WriteLine("登录失败!");
}
return Task.CompletedTask;
}
private static Task Server_StartedAsync(EventArgs arg)
{
Console.WriteLine("服务启动成功!正在监听....");
return Task.CompletedTask;
}
}
3. WebSocket
internal class Program
{
static void Main(string[] args)
{
Console.WriteLine("Hello, World!");
WebServer();
Console.ReadLine();
}
static void WebServer()
{
var server = new SuperWebSocket.WebSocketServer();
server.Setup(8989);
server.Start();
Console.WriteLine("WebSocket服务已启动!");
server.NewMessageReceived += Server_NewMessageReceived;
server.NewDataReceived += Server_NewDataReceived;
//server.NewSessionConnected += Server_NewSessionConnected;
}
private static void Server_NewDataReceived(SuperWebSocket.WebSocketSession session, byte[] value)
{
}
private static void Server_NewMessageReceived(SuperWebSocket.WebSocketSession session, string value)
{
session.Send("消息回复");
}
private static void Server_NewSessionConnected(SuperWebSocket.WebSocketSession session)
{
Console.WriteLine("有客户端接入:" + session.SessionID);
}
}
- SuperWebSocketNETServer 0.8.0
- System.Configuration.ConfigurationManager 9.0.2
4. html
<div class="wrap" style="padding-top: 20px">
<div>
<div class="contact">
<div class="contact-form">
<div class="content">
<h2>MQTT功能测试</h2>
</div>
<div>
<span><label>服务器地址</label></span>
<span><input name="url" type="text" class="textbox" id="url"></span>
</div>
<div>
<span><label>客户端ID</label></span>
<span><input name="clientId" type="text" class="textbox" id="clientId"></span>
</div>
<div>
<span><label>用户名</label></span>
<span><input name="userName" type="text" class="textbox" id="userName"></span>
</div>
<div>
<span><label>密码</label></span>
<span><input name="password" type="text" class="textbox" id="password"> </input></span>
</div>
<div>
<span><input type="submit" class="" value="登录到服务器" id="btn_connnect"></span>
</div>
</div>
<div class="contact-form">
<div class="content">
<h2>订阅</h2>
</div>
<div>
<span><label>主题</label></span>
<span><input name="topic" type="text" class="textbox" id="topic"></span>
</div>
<div>
<span><label>QoS</label></span>
<span>
<select name="qos" class="select" id="qos">
<option value="0">0</option>
<option value="1">1</option>
<option value="2">2</option>
</select>
</span>
</div>
<div>
<span><input type="submit" class="" value="订阅" id="btn_sub"></span>
</div>
</div>
<div class="contact-form">
<div class="content">
<h2>消息发布</h2>
</div>
<div>
<span><label>主题</label></span>
<span><input name="pub_topic" type="text" class="textbox" id="pub_topic"></span>
</div>
<div>
<span><label>QoS</label></span>
<span>
<select name="qos" class="select" id="pub_qos">
<option value="0">0</option>
<option value="1">1</option>
<option value="2">2</option>
</select>
</span>
</div>
<div>
<span><label>Pyload</label></span>
<span><input name="pyload" type="text" class="textbox" id="pyload"></span>
</div>
<div>
<span><input type="submit" class="" value="发布" id="btn_pub"></span>
</div>
</div>
<div class="clear"> </div>
</div>
</div>
</div>
<script src="https://unpkg.com/mqtt/dist/mqtt.min.js"></script>
<script type="text/javascript">
console.log(mqtt);
let client = null;
const btnConnect = document.getElementById("btn_connnect");
btnConnect.addEventListener("click", connect);
function connect() {
const ele_url = document.getElementById("url");
const ele_client_id = document.getElementById("clientId");
const ele_username = document.getElementById("userName");
const ele_password = document.getElementById("password");
client = mqtt.connect(`ws://${ele_url.value}/`, {
clientId: ele_client_id.value,
username: ele_username.value,
password: ele_password.value,
clean: true
});
client.on("connect", () => {
console.log("连接成功");
});
client.on("message", (topic, msg, packet) => {
console.log("获取到的数据:", msg)
console.log("数据对应订阅主题:", topic)
console.log("获取到的数据包:", packet)
});
}
const btnSubscript = document.getElementById("btn_sub");
btnSubscript.addEventListener("click", subscribe);
function subscribe() {
var ele_topic = document.getElementById("topic");
var ele_qos = document.getElementById("qos");
// 单订阅
client.subscribe(ele_topic.value, { qos: parseInt(ele_qos.value) }, (error, granted) => {
if (error)
console.log('订阅失败!', error);
else
console.log('订阅成功!', granted[0].topic);
});
// 多订阅
//client.subscribe([{ topic: "aaa", qos=1 }, { topic: "bbb", qos: 0 }], (err, gra) => {
//});
}
const btnPublish = document.getElementById("btn_pub");
btnPublish.addEventListener("click", publish);
function publish() {
const ele_topic = document.getElementById("pub_topic");
const ele_qos = document.getElementById("pub_qos");
const ele_pyload = document.getElementById("pyload");
client.publish(ele_topic.value, ele_pyload.value, { qos: parseInt(ele_qos.value) });
}
</script>
四. MQTT5.0
1. 从3.1.1到5.0
- 报文格式:属性字段
- 通信模式:订阅/发布,请求/响应
- 服务重定向
- 认证:首先客户端在连接的时候带认证信息 添加认证过程 (0x0F) CONNACK
- 用户属性:自定义数据格式 “prop:value”
- 过期:Session过期(4字节 秒)、消息过期 Clean Session =1
- 流量控制:接收最大值、最大报文长度 QoS>0
2. MQTT 5.0属性

3. MQTT 5.0报文格式
- 连接请求部分报文格式

- 订阅请求部分报文格式

- 断开连接原因码

