请注意从Coolpy7的V7.2.1.1开始,将放弃使用插件功能进行内核功能扩展。因为有部分用户在使用过程中因为插件功能无法正常工作。今后版本将使用CoAP协议进行相关功能的扩展开发工作。祥细操作办法请参阅本手册(内核扩展功能新版)
CP7内核扩展支持使用Go std plugin技术进行开发,提供了用户登入和消息推送Hook定义。在CP7内核程序所在路径下的cmd/plugin文件夹下支持auth.go和pubs.go及term.go文件分别是插件源文件,内核启动时会自动加载内核文件当前目录下的plugin目录下的所有.so文件,.so文件通过cmd目录下的make.sh编译并自动生成.so文件到指定文件夹
用户登入身份验证扩展功能,auth.go示例
package mainimport ("compress/gzip""crypto/tls""encoding/json""io""io/ioutil""log""net/http""strings"//"github.com/dgrijalva/jwt-go"//"github.com/pquerna/ffjson/ffjson")//告诉内核此插件为用户身份验证插件var CT = "auth"//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件var Ver float32 = 1.1////jwt key//var secretKey = "Coolpy7yeah"//主程序首次启动或热更新后加触发一次func Setup() {log.Println("Authentication plugin ok version", Ver)}func Loop(cid, username, password, remote string) bool {log.Println(cid, username, password, remote)////固定值判断认证登陆信息合法性//if cid == "system" && username == "premissid" && password == "testpremissid" {// return true//}////jwt token//token, err := jwt.Parse(secretKey, func(token *jwt.Token) (interface{}, error) {// return []byte(secretKey), nil//})//if err != nil {// return false//}//if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {// log.Println(claims)// return true//} else {// return false//}////http请求验证////参数: method, url, header key, header value, content type, body//res, err := httpFunc("GET", "https://192.168.200.200/api/v1/sys/authenticate/" + username, "Authorization", "5ba32f074966b41be9123fe1", "application/json", "")//if err == nil && res["ok"].(bool) == true {// return true//}//不做作何身份验证直接允许登入return true}//内核逻辑处理异常触发此事件func Err(cid, username, password, remote, err string) {log.Println(cid, username, password, remote)}//http请求func httpFunc(method, url, headerKey, headerValue, contentType, body string) (map[string]interface{}, error) {tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},}client := &http.Client{Transport: tr}reqest, err := http.NewRequest(method, url, strings.NewReader(body))if err != nil {return nil, err}if headerKey != "" {reqest.Header.Add(headerKey, headerValue)}if contentType != "" {reqest.Header.Set("Content-Type", contentType)} else {reqest.Header.Set("Content-Type", "application/json;charset=utf-8")}response, err := client.Do(reqest)defer response.Body.Close()if err != nil {return nil, err}if response.StatusCode == 200 {resBody := make([]byte, 0)switch response.Header.Get("Content-Encoding") {case "gzip":reader, _ := gzip.NewReader(response.Body)defer reader.Close()for {buf := make([]byte, 1024)n, err := reader.Read(buf)if err != nil && err != io.EOF {return nil, err}if n == 0 {break}resBody = append(resBody, buf...)}default:resBody, err = ioutil.ReadAll(response.Body)if err != nil {return nil, err}}var obj map[string]interface{}err = json.Unmarshal(resBody, &obj)if err != nil {return nil, err}return obj, nil}return nil, err}
推送消息扩展功能,pubs.go示例
package mainimport ("bytes""compress/gzip""crypto/tls""encoding/json""io""io/ioutil""log""net/http")//告诉内核此插件为消息过滤插件var CT = "pubs"//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件var Ver float32 = 1.1//主程序首次启动或热更新后加触发一次func Setup() {log.Println("Publish plugin ok version", Ver)}//每个用户消息推送都会触发此事件//cid:客户端身份标识clientid, topic:主题,qos: 消息质量, payload:消息内容//返回值1:当为true时,返回值2会替换原消息内容发送//返回值3:内核判断是否断开当前客户端func Loop(cid, topic string, qos uint8, payload []byte) (bool, []byte, error) {//log.Println(cid, topic, qos, string(payload))////http请求验证////参数: method, url, header key, header value, content type, body//_, err := httpFunc("POST", "https://192.168.200.200/api/v1/sys/chatLog", "Authorization", "5ba32f074966b41be9123fe1", "application/json", payload)//if err == nil {// return false, nil, nil//}////此示例演示接收到一个json消息后////增加两个键,然后打印出来////通过返回替换模式返回新的payload给内核替换原来的payload//var res map[string]interface{}//err := ffjson.Unmarshal(payload, &res)//if err != nil {// return false, nil, nil//}////添加两个键//res["createat"] = time.Now().Local()//res["id"] = objectid.NewObjectId().Hex()////打印//log.Println(res)////json序列化后返回给内核//nPayload, _ := ffjson.Marshal(&res)////返回true即内核替换payload为nPayload//return true, nPayload, nilreturn false, nil, nil}//内核逻辑处理异常触发此事件func Err(cid, topic string, qos uint8, payload []byte, err string) {log.Println("pubs err event")log.Println(cid, topic, qos, string(payload), err)}//http请求func httpFunc(method, url, headerKey, headerValue, contentType string, body []byte) (map[string]interface{}, error) {tr := &http.Transport{TLSClientConfig: &tls.Config{InsecureSkipVerify: true},}client := &http.Client{Transport: tr}reqest, err := http.NewRequest(method, url, bytes.NewReader(body))if err != nil {return nil, err}if headerKey != "" {reqest.Header.Add(headerKey, headerValue)}if contentType != "" {reqest.Header.Set("Content-Type", contentType)} else {reqest.Header.Set("Content-Type", "application/json;charset=utf-8")}response, err := client.Do(reqest)defer response.Body.Close()if err != nil {return nil, err}if response.StatusCode == 200 {resBody := make([]byte, 0)switch response.Header.Get("Content-Encoding") {case "gzip":reader, _ := gzip.NewReader(response.Body)defer reader.Close()for {buf := make([]byte, 1024)n, err := reader.Read(buf)if err != nil && err != io.EOF {return nil, err}if n == 0 {break}resBody = append(resBody, buf...)}default:resBody, err = ioutil.ReadAll(response.Body)if err != nil {return nil, err}}var obj map[string]interface{}err = json.Unmarshal(resBody, &obj)if err != nil {return nil, err}return obj, nil}return nil, err}
客户端订阅主题扩展功能,subs.go示例
package mainimport ("log")//告诉内核此插件为订阅过滤插件var CT = "subs"//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件var Ver float32 = 1.1//主程序首次启动或热更新后加触发一次func Setup() {log.Println("Subscribe plugin ok version", Ver)}//每个用户消息推送都会触发此事件//cid:客户端身份标识clientid, topic:主题,qos: 消息质量//返回值1:当为true时,返回值2会替换原订阅主题//返回值3:内核判断是否断开当前客户端func Loop(cid, topic string, qos uint8) (bool, string, error) {log.Println(cid, topic, qos)return false, "", nil}//内核逻辑处理异常触发此事件func Err(cid, topic string, qos uint8, err string) {log.Println("subs err event")log.Println(cid, topic, qos, err)}
客户端取消订阅主题扩展功能,unsubs.go示例
package mainimport ("log")//告诉内核此插件为取消订阅过滤插件var CT = "unsubs"//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件var Ver float32 = 1.1//主程序首次启动或热更新后加触发一次func Setup() {log.Println("UnSubscribe plugin ok version", Ver)}//每个用户消息推送都会触发此事件//cid:客户端身份标识clientid, topic:主题,qos: 消息质量//返回值1:当为true时,返回值2会替换原订阅主题//返回值3:内核判断是否断开当前客户端func Loop(cid, topic string) (bool, string, error) {log.Println(cid, topic)return false, "", nil}//内核逻辑处理异常触发此事件func Err(cid, topic string, err string) {log.Println("unsubs err event")log.Println(cid, topic, err)}
客户端正常或异常退出扩展功能,term.go示例
package mainimport "log"//告诉内核此插件为用户连接断开插件var CT = "term"//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件var Ver float32 = 1.1//主程序首次启动或热更新后加触发一次func Setup() {log.Println("Terminate plugin ok version", Ver)}//每个用户断开连接都会触发此事件//cid:客户端身份标识clientid, error:退出原因func Loop(cid, err string) {log.Println(cid, err)}
Go plugin有两个特性,其一是热更新加载相同文件名字的插件不会生效,其二是编译时pluginpath参数不允许一样。缺一不可完成热更新插件功能。所以编译时指令可以参考cmd/plugin/make.sh文件,内容如下:
#!/usr/bin/env bashgo build -ldflags "-pluginpath=plugin/cp7-auth-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_auth_$(date +%s).so ./auth/cp7_auth.gogo build -ldflags "-pluginpath=plugin/cp7-pubs-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_pubs_$(date +%s).so ./pubs/cp7_pubs.gogo build -ldflags "-pluginpath=plugin/cp7-subs-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_subs_$(date +%s).so ./subs/cp7_subs.gogo build -ldflags "-pluginpath=plugin/cp7-unsubs-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_unsubs_$(date +%s).so ./unsubs/cp7_unsubs.gogo build -ldflags "-pluginpath=plugin/cp7-term-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_term_$(date +%s).so ./term/cp7_term.go
1.Coolpy7内核启动加载插件逻辑
由于插件热更要求通常情况下会在plugin目录不断增加相同性质的插件文件,并只发现.so文件类型,所以加载时会以插件源代码中的Ver变量作为版本号参考,相关性质的插件只加载Ver值最大的插件一次。
2.插件热更新
内核会以15秒为时间间隔监测plugin文件夹下的“新增”文件(复制文件时请尽量于少此时间操作完成),并只发现.so文件类型。所以当新插件复制到plugin文件夹并热更新加载生效时会判断Ver变量大于或等于当前已加载插件Ver值加载生效,请做好插件版本控制。
经测试go plugin编译指令需要到目标特定操作系统并安装gcc后再执行以下编译操作。由于目前CP7的Linux支持特殊性,建议扩展开发内核功能尽量使用如Ubuntu等带UI的Linux发行版。
Ubuntu为例安装gcc指令
$ sudo apt install gcc
# 下载服务器端git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7# 此时你可以据自己需要分别开发auth(身份验证插件)、pubs(消息到达插件)、term(连接断开插件)# 进入插件目录cd cmd/plugin# 执行编译脚本生成.so插件到plugin插件文件夹./make.sh# 完成后plugin文件夹下会产生三个.so文件# 正常启动Coolpy7内核程序会自动加载相关插件, 如下显示即插件生效2019/02/17 23:06:35 Authentication plugin ok version 1.12019/02/17 23:06:35 Publish plugin ok version 1.12019/02/17 23:06:35 Subscribe plugin ok version 1.12019/02/17 23:06:35 Terminate plugin ok version 1.12019/02/17 23:06:35 UnSubscribe plugin ok version 1.12019/02/17 23:06:35 Coolpy7 tcp is listening on [::]:1883
当热更新插件的时候请把Ver参数的数值递增,可以防止内核热更发现不了新插件导至热更无效加载
CP7经过多种扩展内核功能技术方案进行了大量技术选型工作,从一开始选用js引擎测试不支持多线程后再选用lua测试发现性能还是对内核有所损耗,所以最终选择了go原生的plugin插件功能进行技术开发扩展
本测试用例为载加插件后测试不做任何处理返回消息的publish操作压力测试,测试代码使用Coolpy7_benchmark性能测试工具包,地址:https://github.com/Coolpy7/coolpy7_benchmark/tree/master/test_pubsum1max
测试结果
Sent: 6193 msgs - Received: 4380 msgs (Buffered: 1813 msgs) (Average Throughput: 4380 msg/s)Sent: 4132 msgs - Received: 4374 msgs (Buffered: 1571 msgs) (Average Throughput: 4377 msg/s)Sent: 5167 msgs - Received: 4896 msgs (Buffered: 1841 msgs) (Average Throughput: 4550 msg/s)Sent: 4652 msgs - Received: 4652 msgs (Buffered: 1840 msgs) (Average Throughput: 4576 msg/s)Sent: 4647 msgs - Received: 4734 msgs (Buffered: 1753 msgs) (Average Throughput: 4607 msg/s)Sent: 4128 msgs - Received: 4389 msgs (Buffered: 1492 msgs) (Average Throughput: 4571 msg/s)Sent: 5163 msgs - Received: 4929 msgs (Buffered: 1726 msgs) (Average Throughput: 4622 msg/s)Sent: 5168 msgs - Received: 5087 msgs (Buffered: 1807 msgs) (Average Throughput: 4680 msg/s)Sent: 4646 msgs - Received: 4992 msgs (Buffered: 1460 msgs) (Average Throughput: 4715 msg/s)Sent: 5167 msgs - Received: 5032 msgs (Buffered: 1595 msgs) (Average Throughput: 4746 msg/s)Sent: 5164 msgs - Received: 4924 msgs (Buffered: 1835 msgs) (Average Throughput: 4762 msg/s)Sent: 4645 msgs - Received: 4736 msgs (Buffered: 1744 msgs) (Average Throughput: 4760 msg/s)Sent: 4646 msgs - Received: 4776 msgs (Buffered: 1614 msgs) (Average Throughput: 4761 msg/s)Sent: 5168 msgs - Received: 4942 msgs (Buffered: 1840 msgs) (Average Throughput: 4774 msg/s)Sent: 5161 msgs - Received: 5057 msgs (Buffered: 1944 msgs) (Average Throughput: 4793 msg/s)Sent: 4654 msgs - Received: 5074 msgs (Buffered: 1523 msgs) (Average Throughput: 4811 msg/s)Sent: 5163 msgs - Received: 5148 msgs (Buffered: 1538 msgs) (Average Throughput: 4830 msg/s)Sent: 5163 msgs - Received: 4982 msgs (Buffered: 1718 msgs) (Average Throughput: 4839 msg/s)Sent: 4133 msgs - Received: 4068 msgs (Buffered: 1783 msgs) (Average Throughput: 4798 msg/s)Sent: 4132 msgs - Received: 3979 msgs (Buffered: 1936 msgs) (Average Throughput: 4757 msg/s)Sent: 4129 msgs - Received: 4401 msgs (Buffered: 1664 msgs) (Average Throughput: 4740 msg/s)Sent: 5101 msgs - Received: 4926 msgs (Buffered: 1838 msgs) (Average Throughput: 4749 msg/s)Sent: 4713 msgs - Received: 5028 msgs (Buffered: 1522 msgs) (Average Throughput: 4761 msg/s)Sent: 5165 msgs - Received: 4818 msgs (Buffered: 1869 msgs) (Average Throughput: 4763 msg/s)Sent: 4645 msgs - Received: 4951 msgs (Buffered: 1562 msgs) (Average Throughput: 4771 msg/s)Sent: 5168 msgs - Received: 4798 msgs (Buffered: 1931 msgs) (Average Throughput: 4772 msg/s)Sent: 4641 msgs - Received: 4908 msgs (Buffered: 1664 msgs) (Average Throughput: 4777 msg/s)Sent: 5170 msgs - Received: 4975 msgs (Buffered: 1858 msgs) (Average Throughput: 4784 msg/s)Sent: 4578 msgs - Received: 5001 msgs (Buffered: 1435 msgs) (Average Throughput: 4791 msg/s)
从以上测试结果观测,加载插件后单个客户端最大消息收发量在4700个/秒左右,性能损耗基本接近golang原生性能指标。
由于go plugin有自身的一些特殊性,所以应该尽量在最终运行Coolpy7内核环境中执行make.sh进行插件的编译工作,因为go plugin对go env的配置有一与加载程序的一致性要求,并且与golang的版本一致性要求,如果当前编译的版本不同会报编译错误等等问题。像:plugin: not implemented等错误提示一般与此有关。
A1: 由于Coolpy7编译的golang版本与插件的.so文件编译的golang版本不一致导至,如果你是全新安装的golang环境,只需在当前环境下把旧的.so插件删除掉,然后重新执行mak.sh编译所有插件生成新的.so文件即可正常使用
A2: 修改plugin包底层实现并重新编译 打开runtime/plugin.go, 注释以下代码
for _, pkghash := range md.pkghashes {if pkghash.linktimehash != *pkghash.runtimehash {return "", nil, pkghash.modulename}}
重新执行make.sh