内核功能扩展

针对CP7的内核功能扩展需要使用此功能进行相关开发,如用户登入Coolpy7 mqtt borker的身份验证,扩展消息记录到数据库等功能时。

脚本文件

CP7内核扩展支持使用Go std plugin技术进行开发,提供了用户登入和消息推送Hook定义。在CP7内核程序所在路径下的cmd/plugin文件夹下支持auth.go和pubs.go及term.go文件分别是插件源文件,内核启动时会自动加载内核文件当前目录下的plugin目录下的所有.so文件,.so文件通过cmd目录下的make.sh编译并自动生成.so文件到指定文件夹

用户登入身份验证扩展功能,auth.go示例

package main
import (
"compress/gzip"
"crypto/tls"
"encoding/json"
"io"
"io/ioutil"
"log"
"net/http"
"strings"
//"github.com/dgrijalva/jwt-go"
//"github.com/pquerna/ffjson/ffjson"
)
//告诉内核此插件为用户身份验证插件
var CT = "auth"
//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件
var Ver float32 = 1.1
////jwt key
//var secretKey = "Coolpy7yeah"
//主程序首次启动或热更新后加触发一次
func Setup() {
log.Println("Authentication plugin ok version", Ver)
}
func Loop(cid, username, password, remote string) bool {
log.Println(cid, username, password, remote)
////固定值判断认证登陆信息合法性
//if cid == "system" && username == "premissid" && password == "testpremissid" {
// return true
//}
////jwt token
//token, err := jwt.Parse(secretKey, func(token *jwt.Token) (interface{}, error) {
// return []byte(secretKey), nil
//})
//if err != nil {
// return false
//}
//if claims, ok := token.Claims.(jwt.MapClaims); ok && token.Valid {
// log.Println(claims)
// return true
//} else {
// return false
//}
////http请求验证
////参数: method, url, header key, header value, content type, body
//res, err := httpFunc("GET", "https://192.168.200.200/api/v1/sys/authenticate/" + username, "Authorization", "5ba32f074966b41be9123fe1", "application/json", "")
//if err == nil && res["ok"].(bool) == true {
// return true
//}
//不做作何身份验证直接允许登入
return true
}
//内核逻辑处理异常触发此事件
func Err(cid, username, password, remote, err string) {
log.Println(cid, username, password, remote)
}
//http请求
func httpFunc(method, url, headerKey, headerValue, contentType, body string) (map[string]interface{}, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
reqest, err := http.NewRequest(method, url, strings.NewReader(body))
if err != nil {
return nil, err
}
if headerKey != "" {
reqest.Header.Add(headerKey, headerValue)
}
if contentType != "" {
reqest.Header.Set("Content-Type", contentType)
} else {
reqest.Header.Set("Content-Type", "application/json;charset=utf-8")
}
response, err := client.Do(reqest)
defer response.Body.Close()
if err != nil {
return nil, err
}
if response.StatusCode == 200 {
resBody := make([]byte, 0)
switch response.Header.Get("Content-Encoding") {
case "gzip":
reader, _ := gzip.NewReader(response.Body)
defer reader.Close()
for {
buf := make([]byte, 1024)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
return nil, err
}
if n == 0 {
break
}
resBody = append(resBody, buf...)
}
default:
resBody, err = ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
}
var obj map[string]interface{}
err = json.Unmarshal(resBody, &obj)
if err != nil {
return nil, err
}
return obj, nil
}
return nil, err
}

推送消息扩展功能,pubs.go示例

package main
import (
"bytes"
"compress/gzip"
"crypto/tls"
"encoding/json"
"io"
"io/ioutil"
"log"
"net/http"
)
//告诉内核此插件为消息过滤插件
var CT = "pubs"
//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件
var Ver float32 = 1.1
//主程序首次启动或热更新后加触发一次
func Setup() {
log.Println("Publish plugin ok version", Ver)
}
//每个用户消息推送都会触发此事件
//cid:客户端身份标识clientid, topic:主题,qos: 消息质量, payload:消息内容
//返回值1:当为true时,返回值2会替换原消息内容发送
//返回值3:内核判断是否断开当前客户端
func Loop(cid, topic string, qos uint8, payload []byte) (bool, []byte, error) {
//log.Println(cid, topic, qos, string(payload))
////http请求验证
////参数: method, url, header key, header value, content type, body
//_, err := httpFunc("POST", "https://192.168.200.200/api/v1/sys/chatLog", "Authorization", "5ba32f074966b41be9123fe1", "application/json", payload)
//if err == nil {
// return false, nil, nil
//}
////此示例演示接收到一个json消息后
////增加两个键,然后打印出来
////通过返回替换模式返回新的payload给内核替换原来的payload
//var res map[string]interface{}
//err := ffjson.Unmarshal(payload, &res)
//if err != nil {
// return false, nil, nil
//}
////添加两个键
//res["createat"] = time.Now().Local()
//res["id"] = objectid.NewObjectId().Hex()
////打印
//log.Println(res)
////json序列化后返回给内核
//nPayload, _ := ffjson.Marshal(&res)
////返回true即内核替换payload为nPayload
//return true, nPayload, nil
return false, nil, nil
}
//内核逻辑处理异常触发此事件
func Err(cid, topic string, qos uint8, payload []byte, err string) {
log.Println("pubs err event")
log.Println(cid, topic, qos, string(payload), err)
}
//http请求
func httpFunc(method, url, headerKey, headerValue, contentType string, body []byte) (map[string]interface{}, error) {
tr := &http.Transport{
TLSClientConfig: &tls.Config{InsecureSkipVerify: true},
}
client := &http.Client{Transport: tr}
reqest, err := http.NewRequest(method, url, bytes.NewReader(body))
if err != nil {
return nil, err
}
if headerKey != "" {
reqest.Header.Add(headerKey, headerValue)
}
if contentType != "" {
reqest.Header.Set("Content-Type", contentType)
} else {
reqest.Header.Set("Content-Type", "application/json;charset=utf-8")
}
response, err := client.Do(reqest)
defer response.Body.Close()
if err != nil {
return nil, err
}
if response.StatusCode == 200 {
resBody := make([]byte, 0)
switch response.Header.Get("Content-Encoding") {
case "gzip":
reader, _ := gzip.NewReader(response.Body)
defer reader.Close()
for {
buf := make([]byte, 1024)
n, err := reader.Read(buf)
if err != nil && err != io.EOF {
return nil, err
}
if n == 0 {
break
}
resBody = append(resBody, buf...)
}
default:
resBody, err = ioutil.ReadAll(response.Body)
if err != nil {
return nil, err
}
}
var obj map[string]interface{}
err = json.Unmarshal(resBody, &obj)
if err != nil {
return nil, err
}
return obj, nil
}
return nil, err
}

客户端订阅主题扩展功能,subs.go示例

package main
import (
"log"
)
//告诉内核此插件为订阅过滤插件
var CT = "subs"
//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件
var Ver float32 = 1.1
//主程序首次启动或热更新后加触发一次
func Setup() {
log.Println("Subscribe plugin ok version", Ver)
}
//每个用户消息推送都会触发此事件
//cid:客户端身份标识clientid, topic:主题,qos: 消息质量
//返回值1:当为true时,返回值2会替换原订阅主题
//返回值3:内核判断是否断开当前客户端
func Loop(cid, topic string, qos uint8) (bool, string, error) {
log.Println(cid, topic, qos)
return false, "", nil
}
//内核逻辑处理异常触发此事件
func Err(cid, topic string, qos uint8, err string) {
log.Println("subs err event")
log.Println(cid, topic, qos, err)
}

客户端取消订阅主题扩展功能,unsubs.go示例

package main
import (
"log"
)
//告诉内核此插件为取消订阅过滤插件
var CT = "unsubs"
//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件
var Ver float32 = 1.1
//主程序首次启动或热更新后加触发一次
func Setup() {
log.Println("UnSubscribe plugin ok version", Ver)
}
//每个用户消息推送都会触发此事件
//cid:客户端身份标识clientid, topic:主题,qos: 消息质量
//返回值1:当为true时,返回值2会替换原订阅主题
//返回值3:内核判断是否断开当前客户端
func Loop(cid, topic string) (bool, string, error) {
log.Println(cid, topic)
return false, "", nil
}
//内核逻辑处理异常触发此事件
func Err(cid, topic string, err string) {
log.Println("unsubs err event")
log.Println(cid, topic, err)
}

客户端正常或异常退出扩展功能,term.go示例

package main
import "log"
//告诉内核此插件为用户连接断开插件
var CT = "term"
//告诉内核本插件版本号,当内核重启首次加载多个插件时会生效Ver值最大的相同性质插件
var Ver float32 = 1.1
//主程序首次启动或热更新后加触发一次
func Setup() {
log.Println("Terminate plugin ok version", Ver)
}
//每个用户断开连接都会触发此事件
//cid:客户端身份标识clientid, error:退出原因
func Loop(cid, err string) {
log.Println(cid, err)
}

内核加载插件逻辑

Go plugin有两个特性,其一是热更新加载相同文件名字的插件不会生效,其二是编译时pluginpath参数不允许一样。缺一不可完成热更新插件功能。所以编译时指令可以参考cmd/plugin/make.sh文件,内容如下:

#!/usr/bin/env bash
go build -ldflags "-pluginpath=plugin/cp7-auth-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_auth_$(date +%s).so ./auth/cp7_auth.go
go build -ldflags "-pluginpath=plugin/cp7-pubs-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_pubs_$(date +%s).so ./pubs/cp7_pubs.go
go build -ldflags "-pluginpath=plugin/cp7-subs-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_subs_$(date +%s).so ./subs/cp7_subs.go
go build -ldflags "-pluginpath=plugin/cp7-unsubs-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_unsubs_$(date +%s).so ./unsubs/cp7_unsubs.go
go build -ldflags "-pluginpath=plugin/cp7-term-$(date +%s)" -buildmode=plugin -o ../../bin/plugin/cp7_term_$(date +%s).so ./term/cp7_term.go

1.Coolpy7内核启动加载插件逻辑

由于插件热更要求通常情况下会在plugin目录不断增加相同性质的插件文件,并只发现.so文件类型,所以加载时会以插件源代码中的Ver变量作为版本号参考,相关性质的插件只加载Ver值最大的插件一次。

2.插件热更新

内核会以15秒为时间间隔监测plugin文件夹下的“新增”文件(复制文件时请尽量于少此时间操作完成),并只发现.so文件类型。所以当新插件复制到plugin文件夹并热更新加载生效时会判断Ver变量大于或等于当前已加载插件Ver值加载生效,请做好插件版本控制。

编译为.so文件到plugin文件夹

经测试go plugin编译指令需要到目标特定操作系统并安装gcc后再执行以下编译操作。由于目前CP7的Linux支持特殊性,建议扩展开发内核功能尽量使用如Ubuntu等带UI的Linux发行版。

Ubuntu为例安装gcc指令

$ sudo apt install gcc

# 下载服务器端
git clone https://github.com/Coolpy7/Coolpy7.git && cd Coolpy7
# 此时你可以据自己需要分别开发auth(身份验证插件)、pubs(消息到达插件)、term(连接断开插件)
# 进入插件目录
cd cmd/plugin
# 执行编译脚本生成.so插件到plugin插件文件夹
./make.sh
# 完成后plugin文件夹下会产生三个.so文件
# 正常启动Coolpy7内核程序会自动加载相关插件, 如下显示即插件生效
2019/02/17 23:06:35 Authentication plugin ok version 1.1
2019/02/17 23:06:35 Publish plugin ok version 1.1
2019/02/17 23:06:35 Subscribe plugin ok version 1.1
2019/02/17 23:06:35 Terminate plugin ok version 1.1
2019/02/17 23:06:35 UnSubscribe plugin ok version 1.1
2019/02/17 23:06:35 Coolpy7 tcp is listening on [::]:1883

当热更新插件的时候请把Ver参数的数值递增,可以防止内核热更发现不了新插件导至热更无效加载

插件性能测试报表

CP7经过多种扩展内核功能技术方案进行了大量技术选型工作,从一开始选用js引擎测试不支持多线程后再选用lua测试发现性能还是对内核有所损耗,所以最终选择了go原生的plugin插件功能进行技术开发扩展

本测试用例为载加插件后测试不做任何处理返回消息的publish操作压力测试,测试代码使用Coolpy7_benchmark性能测试工具包,地址:https://github.com/Coolpy7/coolpy7_benchmark/tree/master/test_pubsum1max

测试结果

Sent: 6193 msgs - Received: 4380 msgs (Buffered: 1813 msgs) (Average Throughput: 4380 msg/s)
Sent: 4132 msgs - Received: 4374 msgs (Buffered: 1571 msgs) (Average Throughput: 4377 msg/s)
Sent: 5167 msgs - Received: 4896 msgs (Buffered: 1841 msgs) (Average Throughput: 4550 msg/s)
Sent: 4652 msgs - Received: 4652 msgs (Buffered: 1840 msgs) (Average Throughput: 4576 msg/s)
Sent: 4647 msgs - Received: 4734 msgs (Buffered: 1753 msgs) (Average Throughput: 4607 msg/s)
Sent: 4128 msgs - Received: 4389 msgs (Buffered: 1492 msgs) (Average Throughput: 4571 msg/s)
Sent: 5163 msgs - Received: 4929 msgs (Buffered: 1726 msgs) (Average Throughput: 4622 msg/s)
Sent: 5168 msgs - Received: 5087 msgs (Buffered: 1807 msgs) (Average Throughput: 4680 msg/s)
Sent: 4646 msgs - Received: 4992 msgs (Buffered: 1460 msgs) (Average Throughput: 4715 msg/s)
Sent: 5167 msgs - Received: 5032 msgs (Buffered: 1595 msgs) (Average Throughput: 4746 msg/s)
Sent: 5164 msgs - Received: 4924 msgs (Buffered: 1835 msgs) (Average Throughput: 4762 msg/s)
Sent: 4645 msgs - Received: 4736 msgs (Buffered: 1744 msgs) (Average Throughput: 4760 msg/s)
Sent: 4646 msgs - Received: 4776 msgs (Buffered: 1614 msgs) (Average Throughput: 4761 msg/s)
Sent: 5168 msgs - Received: 4942 msgs (Buffered: 1840 msgs) (Average Throughput: 4774 msg/s)
Sent: 5161 msgs - Received: 5057 msgs (Buffered: 1944 msgs) (Average Throughput: 4793 msg/s)
Sent: 4654 msgs - Received: 5074 msgs (Buffered: 1523 msgs) (Average Throughput: 4811 msg/s)
Sent: 5163 msgs - Received: 5148 msgs (Buffered: 1538 msgs) (Average Throughput: 4830 msg/s)
Sent: 5163 msgs - Received: 4982 msgs (Buffered: 1718 msgs) (Average Throughput: 4839 msg/s)
Sent: 4133 msgs - Received: 4068 msgs (Buffered: 1783 msgs) (Average Throughput: 4798 msg/s)
Sent: 4132 msgs - Received: 3979 msgs (Buffered: 1936 msgs) (Average Throughput: 4757 msg/s)
Sent: 4129 msgs - Received: 4401 msgs (Buffered: 1664 msgs) (Average Throughput: 4740 msg/s)
Sent: 5101 msgs - Received: 4926 msgs (Buffered: 1838 msgs) (Average Throughput: 4749 msg/s)
Sent: 4713 msgs - Received: 5028 msgs (Buffered: 1522 msgs) (Average Throughput: 4761 msg/s)
Sent: 5165 msgs - Received: 4818 msgs (Buffered: 1869 msgs) (Average Throughput: 4763 msg/s)
Sent: 4645 msgs - Received: 4951 msgs (Buffered: 1562 msgs) (Average Throughput: 4771 msg/s)
Sent: 5168 msgs - Received: 4798 msgs (Buffered: 1931 msgs) (Average Throughput: 4772 msg/s)
Sent: 4641 msgs - Received: 4908 msgs (Buffered: 1664 msgs) (Average Throughput: 4777 msg/s)
Sent: 5170 msgs - Received: 4975 msgs (Buffered: 1858 msgs) (Average Throughput: 4784 msg/s)
Sent: 4578 msgs - Received: 5001 msgs (Buffered: 1435 msgs) (Average Throughput: 4791 msg/s)

从以上测试结果观测,加载插件后单个客户端最大消息收发量在4700个/秒左右,性能损耗基本接近golang原生性能指标。

常见问题汇总

由于go plugin有自身的一些特殊性,所以应该尽量在最终运行Coolpy7内核环境中执行make.sh进行插件的编译工作,因为go plugin对go env的配置有一与加载程序的一致性要求,并且与golang的版本一致性要求,如果当前编译的版本不同会报编译错误等等问题。像:plugin: not implemented等错误提示一般与此有关。

Q: plugin was built with a different version of package

A1: 由于Coolpy7编译的golang版本与插件的.so文件编译的golang版本不一致导至,如果你是全新安装的golang环境,只需在当前环境下把旧的.so插件删除掉,然后重新执行mak.sh编译所有插件生成新的.so文件即可正常使用

A2: 修改plugin包底层实现并重新编译 打开runtime/plugin.go, 注释以下代码

for _, pkghash := range md.pkghashes {
if pkghash.linktimehash != *pkghash.runtimehash {
return "", nil, pkghash.modulename
}
}

重新执行make.sh