MQTT 代码示例
Python 示例 (使用 paho-mqtt)
安装依赖:pip install paho-mqtt
1. 基础连接与发布
import paho.mqtt.client as mqtt
import json
import time
# MQTT 配置
BROKER = "broker.emqx.io"
PORT = 1883
CLIENT_ID = "python-client-001"
USERNAME = "your_username"
PASSWORD = "your_password"
# 回调函数:连接成功
def on_connect(client, userdata, flags, rc):
if rc == 0:
print(f"✅ 已连接到 Broker: {BROKER}")
# 订阅主题
client.subscribe("sensors/temperature", qos=1)
else:
print(f"❌ 连接失败,返回码:{rc}")
# 回调函数:收到消息
def on_message(client, userdata, msg):
print(f"📩 收到消息 - 主题:{msg.topic}")
print(f" QoS: {msg.qos}, 内容:{msg.payload.decode()}")
# 回调函数:订阅确认
def on_subscribe(client, userdata, mid, granted_qos):
print(f"✅ 订阅成功,QoS: {granted_qos}")
# 创建客户端
client = mqtt.Client(client_id=CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)
# 设置回调
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe
# 连接 Broker
client.connect(BROKER, PORT, keepalive=60)
# 发布消息
for i in range(5):
payload = json.dumps({
"temperature": 25.5 + i * 0.1,
"humidity": 60,
"timestamp": time.time()
})
result = client.publish("sensors/temperature", payload, qos=1)
status, mid = result.wait_for_publish()
print(f"📤 已发布消息 #{i+1}, mid={mid}, status={status}")
time.sleep(1)
# 启动网络循环(非阻塞)
client.loop_start()
# 保持运行
try:
while True:
time.sleep(1)
except KeyboardInterrupt:
print("\n👋 正在断开连接...")
client.loop_stop()
client.disconnect()
2. 使用 TLS 加密连接
import paho.mqtt.client as mqtt
import ssl
# TLS 配置
BROKER = "broker.example.com"
PORT = 8883 # TLS 端口
CA_CERTS = "ca.crt"
CLIENT_CERT = "client.crt"
CLIENT_KEY = "client.key"
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("✅ 安全连接已建立")
else:
print(f"❌ 连接失败:{rc}")
client = mqtt.Client(client_id="secure-client")
# 配置 TLS
client.tls_set(
ca_certs=CA_CERTS,
certfile=CLIENT_CERT,
keyfile=CLIENT_KEY,
tls_version=ssl.PROTOCOL_TLSv1_2
)
# 可选:禁用主机名验证(仅测试用)
# client.tls_insecure_set(True)
client.on_connect = on_connect
client.connect(BROKER, PORT, keepalive=60)
client.loop_forever()
3. 遗嘱消息(Last Will)
import paho.mqtt.client as mqtt
import json
BROKER = "broker.emqx.io"
PORT = 1883
# 配置遗嘱消息
will_payload = json.dumps({
"status": "offline",
"reason": "unexpected_disconnect"
})
client = mqtt.Client(client_id="device-001")
# 设置遗嘱
client.will_set(
topic="devices/device-001/status",
payload=will_payload,
qos=1,
retain=True # 保留消息
)
def on_connect(client, userdata, flags, rc):
if rc == 0:
print("✅ 连接成功,遗嘱消息已设置")
# 发布在线状态
client.publish(
"devices/device-001/status",
json.dumps({"status": "online"}),
qos=1,
retain=True
)
client.on_connect = on_connect
client.connect(BROKER, PORT)
client.loop_forever()
4. 完整的生产者 - 消费者示例
"""
MQTT 生产者 - 消费者完整示例
模拟 IoT 设备数据上报与处理
"""
import paho.mqtt.client as mqtt
import json
import random
import time
from datetime import datetime
from typing import Optional
class MQTTDataPublisher:
"""MQTT 数据发布者"""
def __init__(self, broker: str, port: int, client_id: str):
self.broker = broker
self.port = port
self.client_id = client_id
self.client: Optional[mqtt.Client] = None
self.connected = False
def connect(self):
"""连接到 MQTT Broker"""
self.client = mqtt.Client(client_id=self.client_id)
self.client.on_connect = self._on_connect
self.client.on_disconnect = self._on_disconnect
self.client.connect(self.broker, self.port, keepalive=60)
self.client.loop_start()
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
self.connected = True
print(f"✅ {self.client_id} 已连接")
else:
print(f"❌ 连接失败:{rc}")
def _on_disconnect(self, client, userdata, rc):
self.connected = False
print(f"🔌 {self.client_id} 已断开")
def publish_sensor_data(self, device_id: str, temp: float, humidity: float):
"""发布传感器数据"""
if not self.connected:
return False
payload = {
"device_id": device_id,
"temperature": temp,
"humidity": humidity,
"timestamp": datetime.now().isoformat()
}
topic = f"sensors/{device_id}/telemetry"
result = self.client.publish(topic, json.dumps(payload), qos=1)
return result.is_published()
def disconnect(self):
"""断开连接"""
if self.client:
self.client.loop_stop()
self.client.disconnect()
class MQTTDataConsumer:
"""MQTT 数据消费者"""
def __init__(self, broker: str, port: int, client_id: str):
self.broker = broker
self.port = port
self.client_id = client_id
self.client: Optional[mqtt.Client] = None
def connect(self, topics: list):
"""连接并订阅主题"""
self.client = mqtt.Client(client_id=self.client_id)
self.client.on_connect = self._on_connect
self.client.on_message = self._on_message
self.client.connect(self.broker, self.port, keepalive=60)
# 订阅主题
for topic in topics:
self.client.subscribe(topic, qos=1)
print(f"📬 订阅主题:{topic}")
self.client.loop_forever()
def _on_connect(self, client, userdata, flags, rc):
if rc == 0:
print(f"✅ 消费者 {self.client_id} 已连接")
else:
print(f"❌ 连接失败:{rc}")
def _on_message(self, client, userdata, msg):
"""处理收到的消息"""
try:
data = json.loads(msg.payload.decode())
print(f"\n📩 收到数据:")
print(f" 主题:{msg.topic}")
print(f" 设备:{data.get('device_id')}")
print(f" 温度:{data.get('temperature')}°C")
print(f" 湿度:{data.get('humidity')}%")
print(f" 时间:{data.get('timestamp')}")
# 这里可以添加数据处理逻辑
self._process_data(data)
except json.JSONDecodeError:
print(f"❌ 无法解析消息:{msg.payload}")
def _process_data(self, data: dict):
"""处理传感器数据"""
temp = data.get('temperature', 0)
if temp > 30:
print(f" ⚠️ 高温告警:{temp}°C")
elif temp < 10:
print(f" ⚠️ 低温告警:{temp}°C")
# 使用示例
if __name__ == "__main__":
BROKER = "broker.emqx.io"
PORT = 1883
# 创建发布者
publisher = MQTTDataPublisher(BROKER, PORT, "publisher-001")
publisher.connect()
# 创建消费者(在另一个线程或进程中运行)
# consumer = MQTTDataConsumer(BROKER, PORT, "consumer-001")
# consumer.connect(["sensors/+/telemetry"])
# 模拟发布数据
try:
for i in range(10):
publisher.publish_sensor_data(
device_id=f"sensor-{i % 3 + 1}",
temp=random.uniform(20, 35),
humidity=random.uniform(40, 80)
)
time.sleep(2)
except KeyboardInterrupt:
print("\n👋 停止发布")
finally:
publisher.disconnect()