Paho.mqtt.golang客户端连接示例

需注意收SetDefaultPublishHandler对应的Handler func不允许有业务代码阻塞,必须通过channel进行缓冲列表,否则会导致连接不稳定。

package main

import (
	"flag"
	"fmt"
	MQTT "github.com/eclipse/paho.mqtt.golang"
	"log"
	"os"
	"os/signal"
	"syscall"
	"time"
)

func main() {
	topic := flag.String("topic", "aaa/bbb", "The topic name to/from which to publish/subscribe")
	broker := flag.String("broker", "tcp://192.168.101.254:1883", "The broker URI. ex: tcp://10.10.1.1:1883")
	password := flag.String("password", "", "The password (optional)")
	user := flag.String("user", "", "The User (optional)")
	id := flag.String("id", "testgoid", "The ClientID (optional)")
	cleansess := flag.Bool("clean", false, "Set Clean Session (default false)")
	qos := flag.Int("qos", 0, "The Quality of Service 0,1,2 (default 0)")
	payload := flag.String("message", "", "The message text to publish (default empty)")
	store := flag.String("store", ":memory:", "The Store Directory (default use memory store)")
	flag.Parse()

	if *topic == "" {
		fmt.Println("Invalid setting for -topic, must not be empty")
		return
	}

	fmt.Printf("Sample Info:\n")
	fmt.Printf("\tbroker:    %s\n", *broker)
	fmt.Printf("\tclientid:  %s\n", *id)
	fmt.Printf("\tuser:      %s\n", *user)
	fmt.Printf("\tpassword:  %s\n", *password)
	fmt.Printf("\ttopic:     %s\n", *topic)
	fmt.Printf("\tmessage:   %s\n", *payload)
	fmt.Printf("\tqos:       %d\n", *qos)
	fmt.Printf("\tcleansess: %v\n", *cleansess)
	fmt.Printf("\tstore:     %s\n", *store)

	opts := MQTT.NewClientOptions()
	opts.AddBroker(*broker)
	opts.SetClientID(*id)
	opts.SetUsername(*user)
	opts.SetPassword(*password)
	opts.SetCleanSession(*cleansess)
	opts.SetKeepAlive(60)
	////自动重连机制,如网络不稳定可开启
	//opts.SetAutoReconnect(true)//启用自动重连功能
	//opts.SetMaxReconnectInterval(30)//每30秒尝试重连
	if *store != ":memory:" {
		opts.SetStore(MQTT.NewFileStore(*store))
	}

	quit := make(chan bool)
	choke := make(chan [2]string)
	opts.SetDefaultPublishHandler(func(client MQTT.Client, msg MQTT.Message) {
		choke <- [2]string{msg.Topic(), string(msg.Payload())}
	})
	go func() {
		for {
			select {
			case incoming := <-choke:
				fmt.Printf("RECEIVED TOPIC: %s MESSAGE: %s\n", incoming[0], incoming[1])
			case <-quit:
				return
			default:

			}
		}
	}()

	client := MQTT.NewClient(opts)
	if token := client.Connect(); token.Wait() && token.Error() != nil {
		panic(token.Error())
	}

	if token := client.Subscribe(*topic, byte(*qos), nil); token.Wait() && token.Error() != nil {
		fmt.Println(token.Error())
		os.Exit(1)
	}

	signalChan := make(chan os.Signal, 1)
	cleanupDone := make(chan bool)
	cleanup := make(chan bool)
	signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
	go func() {
		for range signalChan {
			quit <- true
			go func() {
				go func() {
					client.Disconnect(250)
				}()
				time.Sleep(260 * time.Millisecond)
				cleanup <- true
			}()
			<-cleanup
			log.Println("safe quit")
			cleanupDone <- true
		}
	}()
	<-cleanupDone
}

Last updated