内核功能扩展
针对CP7的内核功能扩展需要使用此功能进行相关开发,如用户登入Coolpy7 mqtt borker的身份验证,扩展消息记录到数据库等功能时。微服务作为一种更适合Coolpy7的技术架构,经过我时间的技术选型和性能测试,最终选定了CoAP作为扩展服务协议。

概述

Coolpy7从版本号:V7.2.1.1开始使用本新方式进行相关内核功能扩展。选用CoAP协议进行相关的服务端,CoAP协议使用UDP底层通信,没有头阻塞问题,性能更优于http等 基于TCP的其他协议。CoAP协议类似简化版的http协议,在消息体积方面更有优势,更节省带宽。

业务说明示意图

当相关事件发生时内核触发Coolpy7 Extend Service相对应相关的事件处理函数。

技术架构说明示意图

Coolpy7 Extend Service支持服务多个Coolpy7,更适合以微服务形式进行部署。

实例演练

本演练说明如果操作构建一个Coolpy7 core对一个扩展服务
假设一个Coolpy7 Core服务位于192.168.101.1,一个扩展服务位于192.168.101.4。可以通过以下操作过程进行联调开发。
1.Coolpy7 Extend Service (此操作前请先本当前系统安装好Golang环境)
1
# 进入golang环境目录
2
cd $GOPATH/src
3
4
# 下载服务器端
5
git clone https://github.com/Coolpy7/coolpy7_extend_service.git && cd coolpy7_extend_service
6
7
# 安装依赖包
8
go get
9
10
# 启动coolpy7_extend_service 启动参数
11
# l 当前服务Host地址 (默认为:5683即本地5683端口,此参数一般默认即可,无需配置, 使用UDP连接请开启相关防火墙设置)
12
# ht 内核扩展功能服务token,(必须与客户端配置一致)
13
go run coolpy7_extend_service.go -ht=coolpy7
14
15
# 启动成功后会打印如下信息,即说明服务端已正常启动,host于5683端口,请确保相关防火墙配置可用
16
2020/01/15 10:55:33 coolpy7 extend server on udp port :5683
Copied!
2.Coolpy7
1
# 下载服务器端
2
wget https://github.com/Coolpy7/Coolpy7/raw/master/go_build_Coolpy7_go_linux.zip
3
# 解压文件
4
unzip go_build_Coolpy7_go_linux.zip
5
# 提权
6
chmod -R 777 go_build_Coolpy7_go_linux
7
# 启动Coolpy7 启动参数
8
# as参数,用户身份验证默认开关,0为不验证,1为通过jwt验证,必须设置-jsk启动参数,2为使用扩展服务
9
# ha参数,扩展服务器所在地址,本例为192.168.101.4:5683
10
# ht参数,服务器连接验证密钥,必须与扩展服务启动参数中的ht参数一致,否则无法与之通信
11
./go_build_Coolpy7_go_linux -as=2 -ha=192.168.101.4:5683 -ht=coolpy7
12
13
# 启动时测试连接扩展服务器结果显示
14
2020/01/15 11:30:04 connected to coolpy7 extend service 192.168.101.4:5683
15
# 启动成功后会打印如下信息,即说明服务端已正常启动,host于1883端口,请确保相关防火墙配置可用
16
2020/01/15 11:30:04 Coolpy7 v7.2.1.1 tcp [::]:1883 plugin build golang v1.13.1
Copied!
扩展服务绑定端口后,内核的ha参数可用多种方式进行连接,如:通过域名连接[xxxx.com:5683],通过内外网IP[xxx.xxx.xxx.xxx:5683] 等方式。

扩展服务源代码

1
package main
2
3
import (
4
"bytes"
5
"encoding/json"
6
"flag"
7
//"github.com/dgrijalva/jwt-go"
8
"github.com/jacoblai/go-coap"
9
"log"
10
"net"
11
"os"
12
"os/signal"
13
"syscall"
14
)
15
16
var ctoken []byte
17
18
//jwt key
19
//var secretKey = "Coolpy2020"
20
21
func main() {
22
var (
23
addr = flag.String("l", ":5683", "绑定Host地址")
24
token = flag.String("ht", "coolpy7", "内核扩展功能服务token,(必须与客户端配置一致)")
25
)
26
27
flag.Parse()
28
29
ctoken = []byte(*token)
30
31
mux := coap.NewServeMux()
32
//身份验证
33
mux.Handle("/auth", tokenAuth(coap.FuncHandler(handleAuth)))
34
//订阅
35
mux.Handle("/sub", tokenAuth(coap.FuncHandler(handleSub)))
36
//取消订阅
37
mux.Handle("/unsub", tokenAuth(coap.FuncHandler(handleUnSub)))
38
//消息
39
mux.Handle("/pub", tokenAuth(coap.FuncHandler(handlePub)))
40
//客户端离线
41
mux.Handle("/term", tokenAuth(coap.FuncHandler(handleTerm)))
42
43
go func() {
44
if err := coap.ListenAndServe("udp", *addr, mux); err != nil {
45
log.Fatal(err)
46
}
47
}()
48
log.Println("coolpy7 extend server on udp port", *addr)
49
50
signalChan := make(chan os.Signal, 1)
51
cleanupDone := make(chan bool)
52
signal.Notify(signalChan, syscall.SIGINT, syscall.SIGTERM)
53
go func() {
54
for range signalChan {
55
log.Println("safe quit")
56
cleanupDone <- true
57
}
58
}()
59
<-cleanupDone
60
}
61
62
func response(m *coap.Message, payload []byte) *coap.Message {
63
res := &coap.Message{
64
Type: coap.Acknowledgement,
65
Code: coap.Content,
66
MessageID: m.MessageID,
67
Token: m.Token,
68
Payload: payload,
69
}
70
res.SetOption(coap.ContentFormat, coap.AppJSON)
71
return res
72
}
73
74
//token难中间件
75
func tokenAuth(next coap.Handler) coap.Handler {
76
return coap.FuncHandler(func(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
77
//判断token是否合法, != 0即为非法
78
if bytes.Compare(m.Token, ctoken) != 0 {
79
msg := make(map[string]interface{})
80
msg["ok"] = false
81
msg["err"] = "token error"
82
payload, _ := json.Marshal(&msg)
83
res := &coap.Message{
84
Type: coap.Acknowledgement,
85
Code: coap.Content,
86
MessageID: m.MessageID,
87
Token: m.Token,
88
Payload: payload,
89
}
90
res.SetOption(coap.ContentFormat, coap.AppJSON)
91
return nil
92
}
93
//通过后执行进行服务下一层中间件
94
return next.ServeCOAP(l, a, m)
95
})
96
}
97
98
//用户身份验证处理函数
99
func handleAuth(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
100
var msg map[string]interface{}
101
err := json.Unmarshal(m.Payload, &msg)
102
if err != nil {
103
log.Println(err)
104
return nil
105
}
106
if !msg["ok"].(bool) {
107
//错误通知
108
log.Println("auth", msg)
109
} else {
110
//请求消息
111
if m.IsConfirmable() {
112
msg := make(map[string]interface{})
113
114
////固定值判断认证登陆信息合法性
115
//if msg["cid"].(string) == "system" && msg["username"].(string) == "premissid" && msg["password"].(string) == "testpremissid" {
116
// msg["ok"] = true
117
//}
118
119
////jwt token
120
//token, err := jwt.Parse(msg["password"].(string), func(token *jwt.Token) (interface{}, error) {
121
// return []byte(secretKey), nil
122
//})
123
//if err != nil {
124
// msg["ok"] = false
125
//}
126
//if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
127
// log.Println(claims)
128
// msg["ok"] = true
129
//} else {
130
// msg["ok"] = false
131
//}
132
133
//默认允许所有请求直接允许登陆
134
//允许登陆设置为true,反之设置为false
135
msg["ok"] = true
136
payload, _ := json.Marshal(&msg)
137
//回复内核
138
return response(m, payload)
139
}
140
}
141
return nil
142
}
143
144
//订阅主题处理函数
145
//每个用户消息推送都会触发此事件
146
//cid:客户端身份标识clientid, topic:主题,qos: 消息质量
147
//返回值:无返回指令
148
func handleSub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
149
var inMsg map[string]interface{}
150
err := json.Unmarshal(m.Payload, &inMsg)
151
if err != nil {
152
log.Println(err)
153
return nil
154
}
155
log.Println(inMsg)
156
return nil
157
}
158
159
//每个用户消息推送都会触发此事件
160
//cid:客户端身份标识clientid, topic:主题,qos: 消息质量
161
//返回值:无返回指令
162
func handleUnSub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
163
var inMsg map[string]interface{}
164
err := json.Unmarshal(m.Payload, &inMsg)
165
if err != nil {
166
log.Println(err)
167
return nil
168
}
169
log.Println(inMsg)
170
return nil
171
}
172
173
//消息推送处理函数
174
//每个用户消息推送都会触发此事件
175
//cid:客户端身份标识clientid, topic:主题,qos: 消息质量, payload:消息内容
176
//返回值:无返回指令
177
func handlePub(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
178
var inMsg map[string]interface{}
179
err := json.Unmarshal(m.Payload, &inMsg)
180
if err != nil {
181
log.Println(err)
182
return nil
183
}
184
log.Println(inMsg)
185
return nil
186
}
187
188
//用户断开连接或意外离线处理函数
189
//cid:客户端身份标识clientid, err:退出原因
190
func handleTerm(l *net.UDPConn, a *net.UDPAddr, m *coap.Message) *coap.Message {
191
var msg map[string]interface{}
192
err := json.Unmarshal(m.Payload, &msg)
193
if err != nil {
194
log.Println(err)
195
return nil
196
}
197
log.Println("term", msg)
198
return nil
199
}
200
Copied!

性能测试

1
goos: darwin
2
goarch: amd64
3
BenchmarkQps
4
BenchmarkQps-8 96454 12558 ns/op 360 B/op 9 allocs/op
5
PASS
Copied!
Last modified 2mo ago