一、安装
搭建一个mqtt服务器,这里我们采用mosquitto
1、 下载地址:https://mosquitto.org/download/
2、 选择windows:https://mosquitto.org/files/binary/win64/mosquitto-2.0.18-install-windows-x64.exe
3、 右键安装即可
默认的安装路径在这里:C:\Program Files\mosquitto
二、修改配置文件
1、 进入mosquitto的安装目录,找到配置文件
C:\Program Files\mosquitto\ mosquitto.conf
2、 修改mosquitto指定的监听端口
listener 8899

3、 关闭mosquitto的匿名登录功能
allow_anonymous false

4、 修改配置用户的配置文件路径(这里要用绝对路径)
password_file D:\other\mosquitto\pwd.emample

三、启动
1、 进入windows的service。启动mqtt broker服务

2、命令行启动:mosquitto.exe -c mosquitto.conf -v
四、使用
1、用python写一个订阅者的代码
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
|
import paho.mqtt.client as mqttimport sslcontext = ssl.create_default_context()# 加载证书context.load_default_certs()# 将证书转换为常量cert_data = ssl.DER_cert_to_PEM_cert(context.get_ca_certs()[0])broker = '127.0.0.1'port = 7788topic = "AIOT"# 当客户端和服务端连接成功后执行的回调函数def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}") # 当连接成功后,客户端订阅一个主题,并消费该主题的消息 client.subscribe(topic=topic)# 当客户端订阅的主题收到消息的时候执行的回调函数def on_message(client, userdata, msg): # client 那个客户端 # userdata 用户数据 # msg 实际的消息对象 print(msg.topic+" "+str(msg.payload))# 实例化一个客户端,实现mqtt协议版本3.1、3.1.1、5.0的客户端类,是一个主要的与MQTT服务端通信的类client = mqtt.Client()# 实例化的时候可以传递参数进去# client_id="",# 唯一的客户端ip字符串,当前连接mqtt服务器时候使用,如果cliendid长度为0,则会自动随机生产一个clientid,这种情况下,clean_session参数必须是true# clean_session=None,# 决定clint类型的参数,如果为true,当client失去连接时候中间人将删除所有关于这个client的信息,如果为false,则说明这是一个永久的client,当失去连接的时候,发布的消息和队列里的消息# 都会被保持。# 注意,当失去连接的时候clinet永远不会抛弃自己需要发布的消息,当调用connetc()/reconnect()会重新发送这些消息# userdata=None,# 用户定义的任何类型的数据将被传输为userdata,后面可以使用user_data_set()来更新这个参数# protocol=MQTTv311,# client使用的mqtt版本# transport="tcp",# 设置通信机制,websocket或者tcp、一个是websocket传输,一个tcp传输# reconnect_on_failure=True# 连接失败是否重新连接# 客户端绑定回调方法,其实这里定义的很多回调函数client.on_connect = on_connectclient.on_message = on_message# client.on_disconnect## client.on_publish# 客户端连接mqtt broker服务器client.connect(bind_address=broker, bind_port=port, keepalive=60)# client.connect_async()# 客户端和mqtt broker服务器断开连接client.disconnect()# 永久执行,保持和mqtt broker服务端的长期通信client.loop_forever() |
我们当然也可以用命令行的方式启动订阅者,可以通过help来查询使用方法
mosquitto_sub.exe –help

2、发布者代码
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
|
import paho.mqtt.client as mqttimport timebroker = '127.0.0.1'port = 8899topic = "AIOT"def on_connect(client, userdata, flags, rc): print(f"Connected with result code {rc}")client = mqtt.Client()client.on_connect = on_connectclient.connect(broker, port, 60)for i in range(60): client.publish(topic, payload=i, qos=0, retain=False) print(f"send {i} to a/b{topic}") time.sleep(1)client.loop_forever() |
当然我们可以使用命令行的方式启用生产者,可以通过help来查询使用方法
mosquitto_pub.exe –help

3、通过证书连接mqtt broker
|
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
|
import pahoimport sslmqttc = paho.mqtt.client.Client('my_client')...# 构建一个SSL上下文SSL_CTX = { 'ssl_port': 8884, 'ca': "/ ca.crt", 'client_cert': '/client/client.crt', 'client_key':" /client/client.key", 'cert_reqs': None, 'tls_version': None, 'ciphers': None, 'insecure': False # 关闭insecure选项}# 设置TLS参数mqttc.tls_set(SSL_CTX['ca'], certfile=SSL_CTX['client_cert'], keyfile=SSL_CTX['client_key'], cert_reqs=SSL_CTX['cert_reqs'], tls_version=SSL_CTX['tls_version'], ciphers=SSL_CTX['ciphers'])# ca_certs=None, certfile=None, keyfile=None, cert_reqs=None, tls_version=None, ciphers=None, keyfile_password=Non# 要求验证服务端证书中域名与mqtt连接创建时输入的broker域名一致mqttc.tls_insecure_set(SSL_CTX['insecure']) |