2022年 12月 19日

Python paho-mqtt库测试MQTT

一、MQTT库安装

pip install paho-mqtt

二、代码

  1. # coding=utf-8
  2. import paho.mqtt.client as mqtt
  3. import time,os,requests,json,threading
  4. cafile = r"E:\Python\rootCA.PEM" #身份认证文件
  5. certfile = r"E:\Python\ClientCA_11111.PEM"
  6. keyfile = r"E:\Python\ClientKey_11111.PEM"
  7. host = "xxxxxx" #主机地址
  8. port = 8883 #端口号
  9. data={"state":{"desired":{"WIFI_AP":"on"}}} #发布信息
  10. sub_topic="@askey/dvr/xxxxxx/status" #订阅主题
  11. pub_topic="@askey/dvr/xxxxxx/status" #发布主题
  12. client = mqtt.Client() #创建一个mqtt对象
  13. client.tls_set(cafile, certfile, keyfile) #加密身份认证
  14. client.connect(host, port, 60) #连接mqtt服务器
  15. client.loop_start() #后台运行一个线程来自动调用loop()
  16. def on_connect(client, userdata, flags, rc):
  17. if rc == 0:
  18. print('连接成功') #0代表连接成功
  19. client.subscribe(sub_topic) #订阅消息
  20. else:
  21. print('Connect Error status {0}'.format(rc)) #连接失败并显示错误代码
  22. def on_message(client, userdata, msg):
  23. print("主题:"+msg.topic + " 消息:" + str(msg.payload.decode('utf-8'))) #接收消息后处理函数
  24. def mqtt_subscribe():
  25. client.on_connect = client.subscribe(sub_topic) # 设置连接上服务器回调函数
  26. client.on_message = on_message # 设置接收到服务器消息回调函数
  27. #client.loop_forever() # 守护连接状态
  28. def mqtt_publish():
  29. while True:
  30. client.publish(pub_topic, payload=str(data), qos=1) # 发布消息
  31. time.sleep(2)
  32. if __name__ == '__main__':
  33. p = threading.Thread(target=mqtt_publish, name="Thread_pub" , args=()).start() #发布主题线程
  34. s = threading.Thread(target=mqtt_subscribe, name="Thread_sub", args=()).start() #订阅主题线程