simple code
This commit is contained in:
parent
dfbc34a975
commit
e42584a9b9
@ -51,18 +51,11 @@ start_link() ->
|
|||||||
init([]) ->
|
init([]) ->
|
||||||
%% 建立到emqx服务器的连接
|
%% 建立到emqx服务器的连接
|
||||||
Opts = iot_config:emqt_opts(<<"publisher">>),
|
Opts = iot_config:emqt_opts(<<"publisher">>),
|
||||||
case emqtt:start_link(Opts) of
|
{ok, ConnPid} = emqtt:start_link(Opts),
|
||||||
{ok, ConnPid} ->
|
|
||||||
lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]),
|
lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]),
|
||||||
{ok, _} = emqtt:connect(ConnPid),
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
{ok, #state{conn_pid = ConnPid}};
|
|
||||||
ignore ->
|
{ok, #state{conn_pid = ConnPid}}.
|
||||||
lager:debug("[iot_mqtt_publisher] connect emqx get ignore"),
|
|
||||||
{stop, ignore};
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:debug("[iot_mqtt_publisher] connect emqx get error: ~p", [Reason]),
|
|
||||||
{stop, Reason}
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -129,7 +122,6 @@ handle_info(Info, State = #state{}) ->
|
|||||||
State :: #state{}) -> term()).
|
State :: #state{}) -> term()).
|
||||||
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
|
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
|
||||||
ok = emqtt:disconnect(ConnPid),
|
ok = emqtt:disconnect(ConnPid),
|
||||||
ok = emqtt:stop(ConnPid),
|
|
||||||
lager:debug("[iot_mqtt_publisher] terminate with reason: ~p", [Reason]),
|
lager:debug("[iot_mqtt_publisher] terminate with reason: ~p", [Reason]),
|
||||||
ok;
|
ok;
|
||||||
terminate(Reason, _State) ->
|
terminate(Reason, _State) ->
|
||||||
|
|||||||
@ -102,8 +102,8 @@ handle_info({disconnect, ReasonCode, Properties}, State = #state{}) ->
|
|||||||
lager:debug("[iot_mqtt_host_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
lager:debug("[iot_mqtt_host_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
||||||
{stop, disconnected, State};
|
{stop, disconnected, State};
|
||||||
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
||||||
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) ->
|
handle_info({publish, #{payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) ->
|
||||||
lager:debug("[iot_mqtt_subscriber] Recv a publish packet: ~p, qos: ~p", [Message, Qos]),
|
lager:debug("[iot_mqtt_subscriber] Recv a publish from topic: ~p, qos: ~p", [Topic, Qos]),
|
||||||
%% 将消息分发到对应的host进程去处理
|
%% 将消息分发到对应的host进程去处理
|
||||||
case Topic of
|
case Topic of
|
||||||
<<"host/upstream/", UUID/binary>> ->
|
<<"host/upstream/", UUID/binary>> ->
|
||||||
@ -116,14 +116,14 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload, qo
|
|||||||
{ok, NewHostPid} ->
|
{ok, NewHostPid} ->
|
||||||
iot_host:handle(NewHostPid, Payload);
|
iot_host:handle(NewHostPid, Payload);
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:warning("[iot_mqtt_subscriber] try start_new_host get error: ~p, assoc packet: ~p", [Reason, Message])
|
lager:warning("[iot_mqtt_subscriber] try start_new_host uuid: ~p, get error: ~p", [UUID, Reason])
|
||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
_ ->
|
_ ->
|
||||||
lager:warning("[iot_mqtt_subscriber] invalid topic: ~p, packet: ~p, qos: ~p", [Topic, Message, Qos])
|
lager:warning("[iot_mqtt_subscriber] invalid topic: ~p, qos: ~p", [Topic, Qos])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
handle_info({puback, Packet}, State = #state{}) ->
|
||||||
lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]),
|
lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
@ -138,7 +138,7 @@ handle_info(Info, State = #state{}) ->
|
|||||||
%% with Reason. The return value is ignored.
|
%% with Reason. The return value is ignored.
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
State :: #state{}) -> term()).
|
State :: #state{}) -> term()).
|
||||||
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
|
terminate(Reason, #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
|
||||||
%% 取消topic的订阅
|
%% 取消topic的订阅
|
||||||
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
|
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
|
||||||
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
|
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
|
||||||
|
|||||||
@ -1,217 +0,0 @@
|
|||||||
%%%-------------------------------------------------------------------
|
|
||||||
%%% @author licheng5
|
|
||||||
%%% @copyright (C) 2023, <COMPANY>
|
|
||||||
%%% @doc
|
|
||||||
%%%
|
|
||||||
%%% @end
|
|
||||||
%%% Created : 27. 2月 2023 16:00
|
|
||||||
%%%-------------------------------------------------------------------
|
|
||||||
-module(iot_rule_parser).
|
|
||||||
-author("licheng5").
|
|
||||||
|
|
||||||
%% API
|
|
||||||
-export([parse/1, test/0]).
|
|
||||||
-export([parse_bracket/1]).
|
|
||||||
|
|
||||||
|
|
||||||
-record(or_expr, {
|
|
||||||
expr = []
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(and_expr, {
|
|
||||||
expr = []
|
|
||||||
}).
|
|
||||||
|
|
||||||
test() ->
|
|
||||||
%Rule = <<"SELECT * FROM service.data WHERE id > 0 AND (name = 'anlicheng' OR name = 'test')">>,
|
|
||||||
%parse(Rule),
|
|
||||||
%Tokens = parse_condition(<<"id > 0 OR id < 3 AND (name = 'anlicheng' OR (name = 'test' AND (y = 1 OR x = 1))) AND (age > 1)">>),
|
|
||||||
Tokens = parse_condition(<<"id < 3 OR (name = 'anlicheng' OR (name = 'test' AND (y = 1 OR x = 1))) AND (age > 1 OR y = 1)">>),
|
|
||||||
% generate(Tokens, #{<<"name">> => <<"anlicheng">>, <<"id">> => 1234}),
|
|
||||||
ok.
|
|
||||||
|
|
||||||
scan(Tokens) ->
|
|
||||||
ExprList = scan_or(Tokens),
|
|
||||||
ORChildExpr = lists:map(fun(E) ->
|
|
||||||
case E of
|
|
||||||
{simple, Expr} ->
|
|
||||||
AndChildExpr = scan_and(Expr),
|
|
||||||
#and_expr{expr = AndChildExpr};
|
|
||||||
{complex, Expr} ->
|
|
||||||
AndChildExpr = scan_and(Expr),
|
|
||||||
#and_expr{expr = AndChildExpr}
|
|
||||||
end
|
|
||||||
end, ExprList),
|
|
||||||
#or_expr{expr = ORChildExpr}.
|
|
||||||
|
|
||||||
scan_or(Tokens) ->
|
|
||||||
lager:debug("tokens: ~p", [Tokens]),
|
|
||||||
scan_or(Tokens, [], [], 0, 0).
|
|
||||||
%% 扫描完成并且最后一个表达式为空
|
|
||||||
scan_or([], [], Acc, _, _) ->
|
|
||||||
lists:reverse(Acc);
|
|
||||||
%% 扫描完成并且最后一个表达式不为空
|
|
||||||
scan_or([], Expr, Acc, _, Deep) ->
|
|
||||||
lists:reverse([{get_tag(Deep), lists:reverse(Expr)}|Acc]);
|
|
||||||
%% 遇到OR关键词, 并且此时的层级为0
|
|
||||||
scan_or([32, $O, $R, 32|Tokens], Expr, Acc, 0, Deep) ->
|
|
||||||
scan_or(Tokens, [], [{get_tag(Deep), lists:reverse(Expr)}|Acc], 0, 0);
|
|
||||||
%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符
|
|
||||||
scan_or([Token|Tokens], Expr, Acc, Level, Deep) when Token == $( ->
|
|
||||||
scan_or(Tokens, [Token|Expr], Acc, Level + 1, Deep + 1);
|
|
||||||
%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1
|
|
||||||
scan_or([Token|Tokens], Expr, Acc, Level, Deep) when Token == $) ->
|
|
||||||
scan_or(Tokens, [Token|Expr], Acc, Level - 1, Deep);
|
|
||||||
%% 普通字符
|
|
||||||
scan_or([Token|Tokens], Expr, Acc, Level, Deep) ->
|
|
||||||
scan_or(Tokens, [Token|Expr], Acc, Level, Deep).
|
|
||||||
|
|
||||||
scan_and(Tokens) ->
|
|
||||||
scan_and(Tokens, [], [], 0, 0).
|
|
||||||
%% 扫描完成并且最后一个表达式为空
|
|
||||||
scan_and([], [], Acc, _, _) ->
|
|
||||||
lists:reverse(Acc);
|
|
||||||
%% 扫描完成并且最后一个表达式不为空
|
|
||||||
scan_and([], Expr, Acc, _, Deep) ->
|
|
||||||
lists:reverse([{get_tag(Deep), lists:reverse(Expr)}|Acc]);
|
|
||||||
%% 遇到OR关键词, 并且此时的层级为0
|
|
||||||
scan_and([32, $A, $N, $D, 32|Tokens], Expr, Acc, 0, Deep) ->
|
|
||||||
scan_and(Tokens, [], [{get_tag(Deep), lists:reverse(Expr)}|Acc], 0, 0);
|
|
||||||
%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符
|
|
||||||
scan_and([Token|Tokens], Expr, Acc, Level, Deep) when Token == $( ->
|
|
||||||
scan_and(Tokens, [Token|Expr], Acc, Level + 1, Deep + 1);
|
|
||||||
%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1
|
|
||||||
scan_and([Token|Tokens], Expr, Acc, Level, Deep) when Token == $) ->
|
|
||||||
scan_and(Tokens, [Token|Expr], Acc, Level - 1, Deep);
|
|
||||||
%% 普通字符
|
|
||||||
scan_and([Token|Tokens], Expr, Acc, Level, Deep) ->
|
|
||||||
scan_and(Tokens, [Token|Expr], Acc, Level, Deep).
|
|
||||||
|
|
||||||
parse(Rule) ->
|
|
||||||
{ok, MP} = re:compile("SELECT (.*) FROM (.*) WHERE (.*)"),
|
|
||||||
case re:run(Rule, MP, [{capture, all, binary}]) of
|
|
||||||
nomatch ->
|
|
||||||
lager:debug("not match rule");
|
|
||||||
{match, Captured} ->
|
|
||||||
lager:debug("match rule: ~p", [Captured])
|
|
||||||
end,
|
|
||||||
ok.
|
|
||||||
|
|
||||||
parse_condition(Condition) when is_binary(Condition) ->
|
|
||||||
Tokens = unicode:characters_to_list(Condition),
|
|
||||||
X = scan(Tokens),
|
|
||||||
lager:debug("E is: ~p", [X]),
|
|
||||||
X.
|
|
||||||
|
|
||||||
parse_token(Tokens) ->
|
|
||||||
parse_token(Tokens, [], [], 0).
|
|
||||||
%% 扫描完成并且最后一个表达式为空
|
|
||||||
parse_token([], [], Acc, 0) ->
|
|
||||||
lists:reverse(Acc);
|
|
||||||
%% 扫描完成并且最后一个表达式不为空
|
|
||||||
parse_token([], Expr, Acc, 0) ->
|
|
||||||
lists:reverse([{simple, lists:reverse(Expr)}|Acc]);
|
|
||||||
%% 扫描到左括号 && Level == 0; 此时的Expr为简单表达式
|
|
||||||
parse_token([Token|Tokens], Expr, Acc, 0) when Token == $( ->
|
|
||||||
parse_token(Tokens, [], [{simple, lists:reverse(Expr)}|Acc], 1);
|
|
||||||
%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符
|
|
||||||
parse_token([Token|Tokens], Expr, Acc, Level) when Token == $( ->
|
|
||||||
parse_token(Tokens, [Token|Expr], Acc, Level + 1);
|
|
||||||
%% 扫描到右括号 && Level == 1; 此时的Expr表达式的字符串扫描完毕; 递归处理子表达式
|
|
||||||
parse_token([Token|Tokens], Expr, Acc, 1) when Token == $) ->
|
|
||||||
parse_token(Tokens, [], [{complex, parse_token(lists:reverse(Expr))}|Acc], 0);
|
|
||||||
%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1
|
|
||||||
parse_token([Token|Tokens], Expr, Acc, Level) when Token == $) ->
|
|
||||||
parse_token(Tokens, [Token|Expr], Acc, Level - 1);
|
|
||||||
%% 普通字符
|
|
||||||
parse_token([Token|Tokens], Expr, Acc, Level) ->
|
|
||||||
parse_token(Tokens, [Token|Expr], Acc, Level).
|
|
||||||
|
|
||||||
trim([$(|Tokens]) ->
|
|
||||||
trim0(Tokens, [], 0);
|
|
||||||
trim(Tokens) ->
|
|
||||||
Tokens.
|
|
||||||
trim0([$)], Acc, 1) ->
|
|
||||||
{ok, Acc};
|
|
||||||
trim0([$(|Tokens], Acc, Level) ->
|
|
||||||
trim0(Tokens, [$(|Acc], Level + 1);
|
|
||||||
trim0([$)|Tokens], Acc, Level) ->
|
|
||||||
trim0(Tokens, [$)|Acc], Level - 1);
|
|
||||||
trim0([H|Tokens], Acc, Level) ->
|
|
||||||
trim0(Tokens, [H|Acc], Level).
|
|
||||||
|
|
||||||
%% 处理表达式的优先级关系
|
|
||||||
generate([{simple, Expr}|ExprList], Data) ->
|
|
||||||
Tokens = string:tokens(Expr, "OR"),
|
|
||||||
Tokens1 = lists:map(fun(Token) -> string:tokens(Token, "AND") end, Tokens),
|
|
||||||
|
|
||||||
lager:debug("tokens: ~p", [Tokens1]),
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
get_tag(0) ->
|
|
||||||
simple;
|
|
||||||
get_tag(_) ->
|
|
||||||
complex.
|
|
||||||
|
|
||||||
-record(bracket, {
|
|
||||||
items = []
|
|
||||||
}).
|
|
||||||
|
|
||||||
%% 括号解析
|
|
||||||
parse_bracket(Tokens) ->
|
|
||||||
parse_bracket(Tokens, [], []).
|
|
||||||
|
|
||||||
parse_bracket([], [], S) ->
|
|
||||||
#bracket{items = lists:reverse(S)};
|
|
||||||
parse_bracket([], Acc, S) ->
|
|
||||||
#bracket{items = lists:reverse([lists:reverse(Acc)|S])};
|
|
||||||
parse_bracket([$(|Tokens], Acc, S) ->
|
|
||||||
{ChildElements, RestTokens} = parse_bracket0(Tokens),
|
|
||||||
Bracket = parse_bracket(ChildElements),
|
|
||||||
parse_bracket(RestTokens, [], [Bracket, lists:reverse(Acc)|S]);
|
|
||||||
parse_bracket([H|Tokens], Acc, S) ->
|
|
||||||
parse_bracket(Tokens, [H|Acc], S).
|
|
||||||
|
|
||||||
%% 截取配对的括号
|
|
||||||
parse_bracket0(Tokens) ->
|
|
||||||
parse_bracket0(Tokens, [], 1).
|
|
||||||
parse_bracket0([$)|Tokens], Acc, Level) when Level - 1 == 0 ->
|
|
||||||
{lists:reverse(Acc), Tokens};
|
|
||||||
parse_bracket0([$)|Tokens], Acc, Level) ->
|
|
||||||
parse_bracket0(Tokens, [$)|Acc], Level - 1);
|
|
||||||
parse_bracket0([$(|Tokens], Acc, Level) ->
|
|
||||||
parse_bracket0(Tokens, [$(|Acc], Level + 1);
|
|
||||||
parse_bracket0([H|Tokens], Acc, Level) ->
|
|
||||||
parse_bracket0(Tokens, [H|Acc], Level).
|
|
||||||
|
|
||||||
%% 处理运算符号解析出表达式和操作符号
|
|
||||||
%{bracket,["id < 3 OR ",
|
|
||||||
% {bracket,["name = 'anlicheng' OR ",
|
|
||||||
% {bracket,["name = 'test' AND ",
|
|
||||||
% {bracket,["y = 1 OR x = 1"]}]}]},
|
|
||||||
% " OR ",
|
|
||||||
% {bracket,["1 = 1"]}]}
|
|
||||||
|
|
||||||
syntax(Bracket = #bracket{items = Items}) ->
|
|
||||||
lists:map(fun(E) ->
|
|
||||||
case E of
|
|
||||||
B = #bracket{} ->
|
|
||||||
syntax(B);
|
|
||||||
Expr ->
|
|
||||||
string:split(Expr, "AND", all)
|
|
||||||
|
|
||||||
end
|
|
||||||
end, Items),
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
ok.
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
|
|
||||||
Loading…
x
Reference in New Issue
Block a user