规则引擎
通过简易的规则编码,实现消息转义,转发等功能。此功能从Coolpy7之v7.3.5.0版本开始支持。
规则执行脚本引擎使用的是https://github.com/bilibili/gengine
内核方式
参数
返回值类型
说明
Core.Publish
message
string
转发消息到其他主题
Core.Http
url,method,header,body,timeout
string
转发到第三方http接口
Core.String
[]byte
string
字节数组转换为字符串
Core.Bytes
string
[]byte
字符串转为字节数组
Core.MakeMap
map[string]interface{}
实例化一个map[string]interface{}对象
Core.MakeMessage
实例化一个mqtt消息对象
Core.FromJson
[]byte
反序列化json字符串
Core.ToJson
map[string]interface{}
序列化对象为json字符串
Core.FromMsgPack
[]byte
反序列化MessagePack包
Core.ToMsgPack
map[string]interface{}
序列化对象为MessagePack字符串
Core.AddCron
map,cronString,ruleNames
int
添加一个计划任务
Core.DelCron
int
删除一个计划任务
Core.Bits.And
x1,x2 byte
byte
位与操作
Core.Bits.Or
x1,x2 byte
byte
位或操作
Core.Bits.Xor
x1,x2 byte
byte
位异或操作
Core.Bits.Not
x1,x2 byte
byte
位非操作
Core.Bits.Left
data []byte, bits int
[]byte
位左移操作
Core.Bits.Right
data []byte, bits int
[]byte
位右移操作
AddCron中的cronString参数为cron表达式,可自由设置年月时分秒周等触发器时间规则。祥情请参阅 https://github.com/robfig/cron/blob/v3/doc.go
生成工具:https://cron.qqe2.com

功能模式

    1.
    编写规则脚本
    2.
    通过API提交到Coolpy7内核
    3.
    推送消息到内核验证脚本

规则示例

1
//规则名称为rule1,规则备注,执行优先级
2
rule "rule1" "rule-describtion" salience 1
3
begin
4
//过滤主题为aaa的消息才进行处理
5
if Message.Topic == "a/b/c" {
6
//反序列化消息中的载体
7
payload = Core.FromJson(Message.Payload)
8
//判断消息载体是否为空
9
if isNil(payload) {
10
println("parse error")
11
return
12
}
13
//实例化一个map用于转换消息体格式
14
res = Core.MakeMap()
15
//把原消息的msg内容付值到新消息体的m键中
16
res["m"] = payload["msg"]
17
//序列化新的消息体为json
18
reMsg = Core.ToJson(res)
19
//传送序列化是否失败
20
if isNil(reMsg) {
21
println("format error")
22
return
23
}
24
//打印序列化结果于服务端
25
println(Core.String(reMsg))
26
//实例化mqtt消息体
27
newMsg = Core.MakeMessage()
28
//设置新的接收主题
29
newMsg.Topic = "bbb/#"
30
//消息质量控制
31
newMsg.QOS = 0
32
newMsg.Retain = false
33
//把新的载体付值到新消息载体中
34
newMsg.Payload = reMsg
35
//通过内核发布新消息
36
err = Core.Publish(newMsg)
37
if err != "" {
38
println(err)
39
}
40
}
41
end
Copied!

接口说明

接口
HTTP
说明
/api/rules
POST
添加一个规则到内核
/api/rules
GET
获取所有内核规则集合记录(支持分页)
/api/rule/:ruleId
GET
获取指定内核规则脚本
/api/rule?ruleid=xxx
DELETE
删除一个规则记录(:ruleid为规则名,不指定时清空所有规则)

接口身份证验证方式需调用/api/login后取到token后放到http的header中,如下是消息格式转义后转发到另一个主题的示例:

1
curl --location --request POST 'http://localhost:8081/v1/rules' \
2
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjE1NzcxMjEsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIxNTY5OTIxLCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIxNTY5OTIxfQ.cE1YusJXYQbrU2Pa9gl6gqqsf93rkHB4ePNiXuDSdeg' \
3
--header 'Content-Type: text/plain' \
4
--data-raw '//规则名称为rule2,规则备注,执行优先级
5
rule "rule2" "rule-describtion" salience 1
6
begin
7
//过滤主题为aaa的消息才进行处理
8
if Message.Topic == "aaa" {
9
//反序列化消息中的载体
10
payload = Core.FromJson(Message.Payload)
11
//判断消息载体是否为空
12
if isNil(payload) {
13
println("parse error")
14
return
15
}
16
//实例化一个map用于转换消息体格式
17
res = Core.MakeMap()
18
//把原消息的msg内容付值到新消息体的m键中
19
res["m"] = payload["msg"]
20
//序列化新的消息体为json
21
reMsg = Core.ToJson(res)
22
//传送序列化是否失败
23
if isNil(reMsg) {
24
println("format error")
25
return
26
}
27
//打印序列化结果于服务端
28
println(Core.String(reMsg))
29
//实例化mqtt消息体
30
newMsg = Core.MakeMessage()
31
//设置新的接收主题
32
newMsg.Topic = "bbb/#"
33
//消息质量控制
34
newMsg.QOS = 0
35
newMsg.Retain = false
36
//把新的载体付值到新消息载体中
37
newMsg.Payload = reMsg
38
//通过内核发布新消息
39
err = Core.Publish(newMsg)
40
if err != "" {
41
println(err)
42
}
43
}
44
end'
Copied!

如下是消息转义后以提交到第三方http服务接口 curl示例

1
curl --location --request POST 'http://localhost:8081/v1/rules' \
2
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjE1NzcxMjEsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIxNTY5OTIxLCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIxNTY5OTIxfQ.cE1YusJXYQbrU2Pa9gl6gqqsf93rkHB4ePNiXuDSdeg' \
3
--header 'Content-Type: text/plain' \
4
--data-raw 'rule "rule1" "rule-describtion" salience 10
5
begin
6
//过滤主题为aaa的消息才进行处理
7
if Message.Topic == "aaa" {
8
//反序列化消息中的载体
9
payload = Core.FromJson(Message.Payload)
10
//判断消息载体是否为空
11
if isNil(payload) {
12
println("parse error")
13
return
14
}
15
//实例化一个map用于转换消息体格式
16
res = Core.MakeMap()
17
//把原消息的msg内容付值到新消息体的m键中
18
res["m"] = payload["msg"]
19
//序列化新的消息体为json
20
reMsg = Core.ToJson(res)
21
//传送序列化是否失败
22
if isNil(reMsg) {
23
println("format error")
24
return
25
}
26
//打印序列化结果于服务端
27
println(Core.String(reMsg))
28
//通过http转发消息
29
//实例化一个map用于转换消息体格式
30
header = Core.MakeMap()
31
header["Authorization"] = "Bearer " + "token"
32
err = Core.Http("http://localhost:8081/v1/test","POST",header,reMsg,2)
33
if err != "" {
34
println("format error")
35
}
36
}
37
end'
Copied!
返回值:
1
{
2
"success": true,
3
"data": {
4
"rule_desc": "rule-describtion",
5
"rule_name": "rule2",
6
"salience": 1
7
}
8
}
Copied!

删除一个规则 curl示例

1
curl --location --request DELETE 'http://localhost:8081/v1/rules/rule1' \
2
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjIwMzA5OTUsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIyMDIzNzk1LCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIyMDIzNzk1fQ.XeNnwyWWFR-Rz2hByigNQVvTvTuXR7nzFcl7O6fqleQ'
Copied!
返回值:
1
{
2
"success": true,
3
"data": "rule1"
4
}
Copied!

获取前100个规则 curl示例

1
curl --location --request GET 'http://localhost:8081/v1/rules?limit=100' \
2
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjIwMzA5OTUsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIyMDIzNzk1LCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIyMDIzNzk1fQ.XeNnwyWWFR-Rz2hByigNQVvTvTuXR7nzFcl7O6fqleQ'
Copied!
返回值:
1
{
2
"success": true,
3
"data": {
4
"current": 0,
5
"list": [
6
"rule2"
7
],
8
"pageSize": 100,
9
"total": 1
10
}
11
}
Copied!
Last modified 2mo ago
Copy link