Paho.mqtt.golang客户端连接示例
需注意收SetDefaultPublishHandler对应的Handler func不允许有业务代码阻塞,必须通过channel进行缓冲列表,否则会导致连接不稳定。
1
package main
2
3
import (
4
"flag"
5
"fmt"
6
MQTT "github.com/eclipse/paho.mqtt.golang"
7
"log"
8
"os"
9
"os/signal"
10
"syscall"
11
"time"
12
)
13
14
func main() {
15
topic := flag.String("topic", "aaa/bbb", "The topic name to/from which to publish/subscribe")
16
broker := flag.String("broker", "tcp://192.168.101.254:1883", "The broker URI. ex: tcp://10.10.1.1:1883")
17
password := flag.String("password", "", "The password (optional)")
18
user := flag.String("user", "", "The User (optional)")
19
id := flag.String("id", "testgoid", "The ClientID (optional)")
20
cleansess := flag.Bool("clean", false, "Set Clean Session (default false)")
21
qos := flag.Int("qos", 0, "The Quality of Service 0,1,2 (default 0)")
22
payload := flag.String("message", "", "The message text to publish (default empty)")
23
store := flag.String("store", ":memory:", "The Store Directory (default use memory store)")
24
flag.Parse()
25
26
if *topic == "" {
27
fmt.Println("Invalid setting for -topic, must not be empty")
28
return
29
}
30
31
fmt.Printf("Sample Info:\n")
32
fmt.Printf("\tbroker: %s\n", *broker)
33
fmt.Printf("\tclientid: %s\n", *id)
34
fmt.Printf("\tuser: %s\n", *user)
35
fmt.Printf("\tpassword: %s\n", *password)
36
fmt.Printf("\ttopic: %s\n", *topic)
37
fmt.Printf("\tmessage: %s\n", *payload)
38
fmt.Printf("\tqos: %d\n", *qos)
39
fmt.Printf("\tcleansess: %v\n", *cleansess)
40
fmt.Printf("\tstore: %s\n", *store)
41
42
opts := MQTT.NewClientOptions()
43
opts.AddBroker(*broker)
44
opts.SetClientID(*id)
45
opts.SetUsername(*user)
46
opts.SetPassword(*password)
47
opts.SetCleanSession(*cleansess)
48
opts.SetKeepAlive(60)
49
////自动重连机制,如网络不稳定可开启
50
//opts.SetAutoReconnect(true)//启用自动重连功能
51
//opts.SetMaxReconnectInterval(30)//每30秒尝试重连
52
if *store != ":memory:" {
53
opts.SetStore(MQTT.NewFileStore(*store))
54
}
55
56
quit := make(chan bool)
57
choke := make(chan [2]string)
58
opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
59
choke <- [2]string{msg.Topic(), string(msg.Payload())}
60
})
61
go func() {
62
for {
63
select {
64
case incoming := <-choke:
65
fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
66
case <-quit:
67
return
68
default:
69
70
}
71
}
72
}()
73
74
client := MQTT.NewClient(opts)
75
if token := client.Connect(); token.Wait() && token.Error() != nil {
76
panic(token.Error())
77
}
78
79
if token := client.Subscribe(*topic, byte(*qos), nil); token.Wait() && token.Error() != nil {
80
fmt.Println(token.Error())
81
os.Exit(1)
82
}
83
84
signalChan := make(chan os.Signal, 1)
85
cleanupDone := make(chan bool)
86
cleanup := make(chan bool)
87
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
88
go func() {
89
for range signalChan {
90
quit <- true
91
go func() {
92
go func() {
93
client.Disconnect(250)
94
}()
95
time.Sleep(260 * time.Millisecond)
96
cleanup <- true
97
}()
98
<-cleanup
99
log.Println("safe quit")
100
cleanupDone <- true
101
}
102
}()
103
<-cleanupDone
104
}
Copied!
Last modified 9mo ago
Copy link