Coolpy7从版本号:V7.2.1.1开始使用本新方式进行相关内核功能扩展。选用CoAP协议进行相关的服务端,CoAP协议使用UDP底层通信,没有头阻塞问题,性能更优于http等 基于TCP的其他协议。CoAP协议类似简化版的http协议,在消息体积方面更有优势,更节省带宽。
当相关事件发生时内核触发Coolpy7 Extend Service相对应相关的事件处理函数。
Coolpy7 Extend Service支持服务多个Coolpy7,更适合以微服务形式进行部署。
本演练说明如果操作构建一个Coolpy7 core对一个扩展服务
假设一个Coolpy7 Core服务位于192.168.101.1,一个扩展服务位于192.168.101.4。可以通过以下操作过程进行联调开发。
1.Coolpy7 Extend Service (此操作前请先本当前系统安装好Golang环境)
# 进入golang环境目录cd $GOPATH/src# 下载服务器端git clone https://github.com/Coolpy7/coolpy7_extend_service.git && cd coolpy7_extend_service# 安装依赖包go get# 启动coolpy7_extend_service 启动参数# l 当前服务Host地址 (默认为:5683即本地5683端口,此参数一般默认即可,无需配置, 使用UDP连接请开启相关防火墙设置)# ht 内核扩展功能服务token,(必须与客户端配置一致)go run coolpy7_extend_service.go -ht=coolpy7# 启动成功后会打印如下信息,即说明服务端已正常启动,host于5683端口,请确保相关防火墙配置可用2020/01/15 10:55:33 coolpy7 extend server on udp port :5683
2.Coolpy7 Core
# 下载服务器端git clone https://github.com/Coolpy7/Coolpy7/releases/download/7.2.1.1/go_build_Coolpy7_go_linux.zip# 解压文件unzip go_build_Coolpy7_go_linux.zip# 提权chmod -R 777 go_build_Coolpy7_go_linux# 启动Coolpy7 启动参数# as参数,启动扩展服务功能,(关闭默认的禁用连接身份认证功能)# ha参数,扩展服务器所在地址,本例为192.168.101.4:5683# ht参数,服务器连接验证密钥,必须与扩展服务启动参数中的ht参数一致,否则无法与之通信./go_build_Coolpy7_go_linux -as=false -ha=192.168.101.4:5683 -ht=coolpy7# 启动时测试连接扩展服务器结果显示2020/01/15 11:30:04 connected to coolpy7 extend service 192.168.101.4:5683# 启动成功后会打印如下信息,即说明服务端已正常启动,host于1883端口,请确保相关防火墙配置可用2020/01/15 11:30:04 Coolpy7 v7.2.1.1 tcp [::]:1883 plugin build golang v1.13.1
扩展服务绑定端口后,内核的ha参数可用多种方式进行连接,如:通过域名连接[xxxx.com:5683],通过内外网IP[xxx.xxx.xxx.xxx:5683] 等方式。
package mainimport ("bytes""encoding/json""flag"//"github.com/dgrijalva/jwt-go""github.com/jacoblai/go-coap""log""net""os""os/signal""syscall")var ctoken []byte//jwt key//var secretKey = "Coolpy2020"func main() {var (addr = flag.String("l", ":5683", "绑定Host地址")token = flag.String("ht", "coolpy7", "内核扩展功能服务token,(必须与客户端配置一致)"))flag.Parse()ctoken = []byte(*token)mux := coap.NewServeMux()//身份验证mux.Handle("/auth", tokenAuth(coap.FuncHandler(handleAuth)))//订阅mux.Handle("/sub", tokenAuth(coap.FuncHandler(handleSub)))//取消订阅mux.Handle("/unsub", tokenAuth(coap.FuncHandler(handleUnSub)))//消息mux.Handle("/pub", tokenAuth(coap.FuncHandler(handlePub)))//客户端离线mux.Handle("/term", tokenAuth(coap.FuncHandler(handleTerm)))go func() {if err := coap.ListenAndServe("udp", *addr, mux); err != nil {log.Fatal(err)}}()log.Println("coolpy7 extend server on udp port", *addr)signalChan := make(chan os.Signal, 1)cleanupDone := make(chan bool)signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)go func() {for range signalChan {log.Println("safe quit")cleanupDone <- true}}()<-cleanupDone}func response(m *coap.Message, payload []byte) *coap.Message {res := &coap.Message{Type: coap.Acknowledgement,Code: coap.Content,MessageID: m.MessageID,Token: m.Token,Payload: payload,}res.SetOption(coap.ContentFormat, coap.AppJSON)return res}//token难中间件func tokenAuth(next coap.Handler) coap.Handler {return coap.FuncHandler(func(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {//判断token是否合法, != 0即为非法if bytes.Compare(m.Token, ctoken) != 0 {msg := make(map[string]interface{})msg["ok"] = falsemsg["err"] = "token error"payload, _ := json.Marshal(&msg)res := &coap.Message{Type: coap.Acknowledgement,Code: coap.Content,MessageID: m.MessageID,Token: m.Token,Payload: payload,}res.SetOption(coap.ContentFormat, coap.AppJSON)return nil}//通过后执行进行服务下一层中间件return next.ServeCOAP(l, a, m)})}//用户身份验证处理函数func handleAuth(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {var msg map[string]interface{}err := json.Unmarshal(m.Payload, &msg)if err != nil {log.Println(err)return nil}if !msg["ok"].(bool) {//错误通知log.Println(msg)} else {//请求消息if m.IsConfirmable() {msg := make(map[string]interface{})////固定值判断认证登陆信息合法性//if msg["cid"].(string) == "system" && msg["username"].(string) == "premissid" && msg["password"].(string) == "testpremissid" {// msg["ok"] = true//}////jwt token//token, err := jwt.Parse(msg["password"].(string), func(token *jwt.Token) (interface{}, error) {// return []byte(secretKey), nil//})//if err != nil {// msg["ok"] = false//}//if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {// log.Println(claims)// msg["ok"] = true//} else {// msg["ok"] = false//}//默认允许所有请求直接允许登陆//允许登陆设置为true,反之设置为falsemsg["ok"] = truepayload, _ := json.Marshal(&msg)//回复内核return response(m, payload)}}return nil}//订阅主题处理函数//每个用户消息推送都会触发此事件//cid:客户端身份标识clientid, topic:主题,qos: 消息质量//返回值ok:当为true时,允许此操作,false为禁止此次订阅并强制断开客户端连接func handleSub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {var msg map[string]interface{}err := json.Unmarshal(m.Payload, &msg)if err != nil {log.Println(err)return nil}if !msg["ok"].(bool) {//错误通知log.Println(msg)} else {//订阅通知消息if m.IsConfirmable() {msg := make(map[string]interface{})//允许订阅设置为true,反之设置为falsemsg["ok"] = truepayload, _ := json.Marshal(&msg)//回复内核return response(m, payload)}}return nil}//每个用户消息推送都会触发此事件//cid:客户端身份标识clientid, topic:主题,qos: 消息质量//返回值:无返回指令func handleUnSub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {var msg map[string]interface{}err := json.Unmarshal(m.Payload, &msg)if err != nil {log.Println(err)return nil}if !msg["ok"].(bool) {//错误通知log.Println(msg)} else {//取消订阅通知消息log.Println(msg)}return nil}//消息推送处理函数//每个用户消息推送都会触发此事件//cid:客户端身份标识clientid, topic:主题,qos: 消息质量, payload:消息内容//返回值ok:当为true时告知内核有消息处理返回//返回值rep:告知内核是否替换原消息内容发送,当为true时,内核会取消息中的payload替换原消息进和地发送//返回值topic:替换原topic,当rep为true时生效//返回值payload:替换原payload,当rep为true时生效func handlePub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {var msg map[string]interface{}err := json.Unmarshal(m.Payload, &msg)if err != nil {log.Println(err)return nil}if !msg["ok"].(bool) {//错误通知log.Println(msg)} else {//请求消息if m.IsConfirmable() {msg := make(map[string]interface{})//允许登陆设置为true,反之设置为falsemsg["ok"] = true//要求内核替换消息内容设置为truemsg["rep"] = false////设置需要替换的消息内容//msg["topic"] = msg["topic"]//msg["payload"] = `{"test":"value"}`payload, _ := json.Marshal(&msg)//回复内核return response(m, payload)}}return nil}//用户断开连接或意外离线处理函数//cid:客户端身份标识clientid, err:退出原因func handleTerm(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {var msg map[string]interface{}err := json.Unmarshal(m.Payload, &msg)if err != nil {log.Println(err)return nil}log.Println(msg)return nil}
扩展服务开源地址:https://github.com/Coolpy7/coolpy7_extend_service
性能测试源代码:https://github.com/jacoblai/go-coap/blob/master/example/client/bench_test.go
goos: darwingoarch: amd64BenchmarkQpsBenchmarkQps-8 96454 12558 ns/op 360 B/op 9 allocs/opPASS