规则引擎

通过简易的规则编码,实现消息转义,转发等功能。此功能从Coolpy7之v7.3.5.0版本开始支持。

规则执行脚本引擎使用的是https://github.com/bilibili/gengine

语法参考:https://github.com/bilibili/gengine/wiki

内核方式

参数

返回值类型

说明

Core.Publish

message

string

转发消息到其他主题

Core.Http

url,method,header,body,timeout

string

转发到第三方http接口

Core.String

[]byte

string

字节数组转换为字符串

Core.Bytes

string

[]byte

字符串转为字节数组

Core.MakeMap

map[string]interface{}

实例化一个map[string]interface{}对象

Core.MakeMessage

实例化一个mqtt消息对象

Core.FromJson

[]byte

反序列化json字符串

Core.ToJson

map[string]interface{}

序列化对象为json字符串

Core.FromMsgPack

[]byte

反序列化MessagePack包

Core.ToMsgPack

map[string]interface{}

序列化对象为MessagePack字符串

Core.AddCron

map,cronString,ruleNames

int

添加一个计划任务

Core.DelCron

int

删除一个计划任务

Core.Bits.And

x1,x2 byte

byte

位与操作

Core.Bits.Or

x1,x2 byte

byte

位或操作

Core.Bits.Xor

x1,x2 byte

byte

位异或操作

Core.Bits.Not

x1,x2 byte

byte

位非操作

Core.Bits.Left

data []byte, bits int

[]byte

位左移操作

Core.Bits.Right

data []byte, bits int

[]byte

位右移操作

AddCron中的cronString参数为cron表达式,可自由设置年月时分秒周等触发器时间规则。祥情请参阅 https://github.com/robfig/cron/blob/v3/doc.go

生成工具:https://cron.qqe2.com

功能模式

  1. 编写规则脚本

  2. 通过API提交到Coolpy7内核

  3. 推送消息到内核验证脚本

规则示例

//规则名称为rule1,规则备注,执行优先级
rule "rule1" "rule-describtion" salience  1
begin
  //过滤主题为aaa的消息才进行处理
  if Message.Topic == "a/b/c" {
    //反序列化消息中的载体
    payload = Core.FromJson(Message.Payload)
    //判断消息载体是否为空
    if isNil(payload) {
        println("parse error")
        return
    }
    //实例化一个map用于转换消息体格式
    res = Core.MakeMap()
    //把原消息的msg内容付值到新消息体的m键中
    res["m"] = payload["msg"]
    //序列化新的消息体为json
    reMsg = Core.ToJson(res)
    //传送序列化是否失败
    if isNil(reMsg) {
        println("format error")
        return
    }
    //打印序列化结果于服务端
    println(Core.String(reMsg))
    //实例化mqtt消息体
    newMsg = Core.MakeMessage()
    //设置新的接收主题
    newMsg.Topic = "bbb/#"
    //消息质量控制
    newMsg.QOS = 0
    newMsg.Retain = false
    //把新的载体付值到新消息载体中
    newMsg.Payload = reMsg
    //通过内核发布新消息
    err = Core.Publish(newMsg)
    if err != "" {
        println(err)
    }
  }
end

接口说明

接口

HTTP

说明

/api/rules

POST

添加一个规则到内核

/api/rules

GET

获取所有内核规则集合记录(支持分页)

/api/rule/:ruleId

GET

获取指定内核规则脚本

/api/rule?ruleid=xxx

DELETE

删除一个规则记录(:ruleid为规则名,不指定时清空所有规则)

接口身份证验证方式需调用/api/login后取到token后放到http的header中,如下是消息格式转义后转发到另一个主题的示例:

curl --location --request POST 'http://localhost:8081/v1/rules' \
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjE1NzcxMjEsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIxNTY5OTIxLCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIxNTY5OTIxfQ.cE1YusJXYQbrU2Pa9gl6gqqsf93rkHB4ePNiXuDSdeg' \
--header 'Content-Type: text/plain' \
--data-raw '//规则名称为rule2,规则备注,执行优先级
rule "rule2" "rule-describtion" salience  1
begin
  //过滤主题为aaa的消息才进行处理
  if Message.Topic == "aaa" {
    //反序列化消息中的载体
    payload = Core.FromJson(Message.Payload)
    //判断消息载体是否为空
    if isNil(payload) {
        println("parse error")
        return
    }
    //实例化一个map用于转换消息体格式
    res = Core.MakeMap()
    //把原消息的msg内容付值到新消息体的m键中
    res["m"] = payload["msg"]
    //序列化新的消息体为json
    reMsg = Core.ToJson(res)
    //传送序列化是否失败
    if isNil(reMsg) {
        println("format error")
        return
    }
    //打印序列化结果于服务端
    println(Core.String(reMsg))
    //实例化mqtt消息体
    newMsg = Core.MakeMessage()
    //设置新的接收主题
    newMsg.Topic = "bbb/#"
    //消息质量控制
    newMsg.QOS = 0
    newMsg.Retain = false
    //把新的载体付值到新消息载体中
    newMsg.Payload = reMsg
    //通过内核发布新消息
    err = Core.Publish(newMsg)
    if err != "" {
        println(err)
    }
  }
end'

如下是消息转义后以提交到第三方http服务接口 curl示例

curl --location --request POST 'http://localhost:8081/v1/rules' \
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjE1NzcxMjEsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIxNTY5OTIxLCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIxNTY5OTIxfQ.cE1YusJXYQbrU2Pa9gl6gqqsf93rkHB4ePNiXuDSdeg' \
--header 'Content-Type: text/plain' \
--data-raw 'rule "rule1" "rule-describtion" salience  10
begin
  //过滤主题为aaa的消息才进行处理
  if Message.Topic == "aaa" {
    //反序列化消息中的载体
    payload = Core.FromJson(Message.Payload)
    //判断消息载体是否为空
    if isNil(payload) {
        println("parse error")
        return
    }
    //实例化一个map用于转换消息体格式
    res = Core.MakeMap()
    //把原消息的msg内容付值到新消息体的m键中
    res["m"] = payload["msg"]
    //序列化新的消息体为json
    reMsg = Core.ToJson(res)
    //传送序列化是否失败
    if isNil(reMsg) {
        println("format error")
        return
    }
    //打印序列化结果于服务端
    println(Core.String(reMsg))
    //通过http转发消息
    //实例化一个map用于转换消息体格式
    header = Core.MakeMap()
    header["Authorization"] = "Bearer " + "token"
    err = Core.Http("http://localhost:8081/v1/test","POST",header,reMsg,2)
    if err != "" {
        println("format error")
    }
  }
end'

返回值:

{
    "success": true,
    "data": {
        "rule_desc": "rule-describtion",
        "rule_name": "rule2",
        "salience": 1
    }
}

删除一个规则 curl示例

curl --location --request DELETE 'http://localhost:8081/v1/rules/rule1' \
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjIwMzA5OTUsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIyMDIzNzk1LCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIyMDIzNzk1fQ.XeNnwyWWFR-Rz2hByigNQVvTvTuXR7nzFcl7O6fqleQ'

返回值:

{
    "success": true,
    "data": "rule1"
}

获取前100个规则 curl示例

curl --location --request GET 'http://localhost:8081/v1/rules?limit=100' \
--header 'Authorization: Bearer eyJhbGciOiJIUzI1NiIsInR5cCI6IkpXVCJ9.eyJhdWQiOiJDUDciLCJleHAiOjE2MjIwMzA5OTUsImp0aSI6ImFkbWluIiwiaWF0IjoxNjIyMDIzNzk1LCJpc3MiOiJjb29scHkubmV0IiwibmJmIjoxNjIyMDIzNzk1fQ.XeNnwyWWFR-Rz2hByigNQVvTvTuXR7nzFcl7O6fqleQ'

返回值:

{
    "success": true,
    "data": {
        "current": 0,
        "list": [
            "rule2"
        ],
        "pageSize": 100,
        "total": 1
    }
}

Last updated