MQTT 代码示例

Python 示例 (使用 paho-mqtt)

安装依赖:pip install paho-mqtt

1. 基础连接与发布

import paho.mqtt.client as mqtt
import json
import time

# MQTT 配置
BROKER = "broker.emqx.io"
PORT = 1883
CLIENT_ID = "python-client-001"
USERNAME = "your_username"
PASSWORD = "your_password"

# 回调函数:连接成功
def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print(f"✅ 已连接到 Broker: {BROKER}")
        # 订阅主题
        client.subscribe("sensors/temperature", qos=1)
    else:
        print(f"❌ 连接失败,返回码:{rc}")

# 回调函数:收到消息
def on_message(client, userdata, msg):
    print(f"📩 收到消息 - 主题:{msg.topic}")
    print(f"   QoS: {msg.qos}, 内容:{msg.payload.decode()}")

# 回调函数:订阅确认
def on_subscribe(client, userdata, mid, granted_qos):
    print(f"✅ 订阅成功,QoS: {granted_qos}")

# 创建客户端
client = mqtt.Client(client_id=CLIENT_ID)
client.username_pw_set(USERNAME, PASSWORD)

# 设置回调
client.on_connect = on_connect
client.on_message = on_message
client.on_subscribe = on_subscribe

# 连接 Broker
client.connect(BROKER, PORT, keepalive=60)

# 发布消息
for i in range(5):
    payload = json.dumps({
        "temperature": 25.5 + i * 0.1,
        "humidity": 60,
        "timestamp": time.time()
    })
    result = client.publish("sensors/temperature", payload, qos=1)
    status, mid = result.wait_for_publish()
    print(f"📤 已发布消息 #{i+1}, mid={mid}, status={status}")
    time.sleep(1)

# 启动网络循环(非阻塞)
client.loop_start()

# 保持运行
try:
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    print("\n👋 正在断开连接...")
    client.loop_stop()
    client.disconnect()

2. 使用 TLS 加密连接

import paho.mqtt.client as mqtt
import ssl

# TLS 配置
BROKER = "broker.example.com"
PORT = 8883  # TLS 端口
CA_CERTS = "ca.crt"
CLIENT_CERT = "client.crt"
CLIENT_KEY = "client.key"

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("✅ 安全连接已建立")
    else:
        print(f"❌ 连接失败:{rc}")

client = mqtt.Client(client_id="secure-client")

# 配置 TLS
client.tls_set(
    ca_certs=CA_CERTS,
    certfile=CLIENT_CERT,
    keyfile=CLIENT_KEY,
    tls_version=ssl.PROTOCOL_TLSv1_2
)

# 可选:禁用主机名验证(仅测试用)
# client.tls_insecure_set(True)

client.on_connect = on_connect
client.connect(BROKER, PORT, keepalive=60)
client.loop_forever()

3. 遗嘱消息(Last Will)

import paho.mqtt.client as mqtt
import json

BROKER = "broker.emqx.io"
PORT = 1883

# 配置遗嘱消息
will_payload = json.dumps({
    "status": "offline",
    "reason": "unexpected_disconnect"
})

client = mqtt.Client(client_id="device-001")

# 设置遗嘱
client.will_set(
    topic="devices/device-001/status",
    payload=will_payload,
    qos=1,
    retain=True  # 保留消息
)

def on_connect(client, userdata, flags, rc):
    if rc == 0:
        print("✅ 连接成功,遗嘱消息已设置")
        # 发布在线状态
        client.publish(
            "devices/device-001/status",
            json.dumps({"status": "online"}),
            qos=1,
            retain=True
        )

client.on_connect = on_connect
client.connect(BROKER, PORT)
client.loop_forever()

4. 完整的生产者 - 消费者示例

"""
MQTT 生产者 - 消费者完整示例
模拟 IoT 设备数据上报与处理
"""
import paho.mqtt.client as mqtt
import json
import random
import time
from datetime import datetime
from typing import Optional

class MQTTDataPublisher:
    """MQTT 数据发布者"""
    
    def __init__(self, broker: str, port: int, client_id: str):
        self.broker = broker
        self.port = port
        self.client_id = client_id
        self.client: Optional[mqtt.Client] = None
        self.connected = False
        
    def connect(self):
        """连接到 MQTT Broker"""
        self.client = mqtt.Client(client_id=self.client_id)
        self.client.on_connect = self._on_connect
        self.client.on_disconnect = self._on_disconnect
        self.client.connect(self.broker, self.port, keepalive=60)
        self.client.loop_start()
        
    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            self.connected = True
            print(f"✅ {self.client_id} 已连接")
        else:
            print(f"❌ 连接失败:{rc}")
            
    def _on_disconnect(self, client, userdata, rc):
        self.connected = False
        print(f"🔌 {self.client_id} 已断开")
        
    def publish_sensor_data(self, device_id: str, temp: float, humidity: float):
        """发布传感器数据"""
        if not self.connected:
            return False
            
        payload = {
            "device_id": device_id,
            "temperature": temp,
            "humidity": humidity,
            "timestamp": datetime.now().isoformat()
        }
        
        topic = f"sensors/{device_id}/telemetry"
        result = self.client.publish(topic, json.dumps(payload), qos=1)
        return result.is_published()
        
    def disconnect(self):
        """断开连接"""
        if self.client:
            self.client.loop_stop()
            self.client.disconnect()


class MQTTDataConsumer:
    """MQTT 数据消费者"""
    
    def __init__(self, broker: str, port: int, client_id: str):
        self.broker = broker
        self.port = port
        self.client_id = client_id
        self.client: Optional[mqtt.Client] = None
        
    def connect(self, topics: list):
        """连接并订阅主题"""
        self.client = mqtt.Client(client_id=self.client_id)
        self.client.on_connect = self._on_connect
        self.client.on_message = self._on_message
        
        self.client.connect(self.broker, self.port, keepalive=60)
        
        # 订阅主题
        for topic in topics:
            self.client.subscribe(topic, qos=1)
            print(f"📬 订阅主题:{topic}")
            
        self.client.loop_forever()
        
    def _on_connect(self, client, userdata, flags, rc):
        if rc == 0:
            print(f"✅ 消费者 {self.client_id} 已连接")
        else:
            print(f"❌ 连接失败:{rc}")
            
    def _on_message(self, client, userdata, msg):
        """处理收到的消息"""
        try:
            data = json.loads(msg.payload.decode())
            print(f"\n📩 收到数据:")
            print(f"   主题:{msg.topic}")
            print(f"   设备:{data.get('device_id')}")
            print(f"   温度:{data.get('temperature')}°C")
            print(f"   湿度:{data.get('humidity')}%")
            print(f"   时间:{data.get('timestamp')}")
            
            # 这里可以添加数据处理逻辑
            self._process_data(data)
            
        except json.JSONDecodeError:
            print(f"❌ 无法解析消息:{msg.payload}")
            
    def _process_data(self, data: dict):
        """处理传感器数据"""
        temp = data.get('temperature', 0)
        if temp > 30:
            print(f"   ⚠️  高温告警:{temp}°C")
        elif temp < 10:
            print(f"   ⚠️  低温告警:{temp}°C")


# 使用示例
if __name__ == "__main__":
    BROKER = "broker.emqx.io"
    PORT = 1883
    
    # 创建发布者
    publisher = MQTTDataPublisher(BROKER, PORT, "publisher-001")
    publisher.connect()
    
    # 创建消费者(在另一个线程或进程中运行)
    # consumer = MQTTDataConsumer(BROKER, PORT, "consumer-001")
    # consumer.connect(["sensors/+/telemetry"])
    
    # 模拟发布数据
    try:
        for i in range(10):
            publisher.publish_sensor_data(
                device_id=f"sensor-{i % 3 + 1}",
                temp=random.uniform(20, 35),
                humidity=random.uniform(40, 80)
            )
            time.sleep(2)
    except KeyboardInterrupt:
        print("\n👋 停止发布")
    finally:
        publisher.disconnect()