diff --git a/apps/iot/src/iot_mqtt_publisher.erl b/apps/iot/src/iot_mqtt_publisher.erl index 437ebb3..2997a1c 100644 --- a/apps/iot/src/iot_mqtt_publisher.erl +++ b/apps/iot/src/iot_mqtt_publisher.erl @@ -51,18 +51,11 @@ start_link() -> init([]) -> %% 建立到emqx服务器的连接 Opts = iot_config:emqt_opts(<<"publisher">>), - case emqtt:start_link(Opts) of - {ok, ConnPid} -> - lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]), - {ok, _} = emqtt:connect(ConnPid), - {ok, #state{conn_pid = ConnPid}}; - ignore -> - lager:debug("[iot_mqtt_publisher] connect emqx get ignore"), - {stop, ignore}; - {error, Reason} -> - lager:debug("[iot_mqtt_publisher] connect emqx get error: ~p", [Reason]), - {stop, Reason} - end. + {ok, ConnPid} = emqtt:start_link(Opts), + lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]), + {ok, _} = emqtt:connect(ConnPid), + + {ok, #state{conn_pid = ConnPid}}. %% @private %% @doc Handling call messages @@ -129,7 +122,6 @@ handle_info(Info, State = #state{}) -> State :: #state{}) -> term()). terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> ok = emqtt:disconnect(ConnPid), - ok = emqtt:stop(ConnPid), lager:debug("[iot_mqtt_publisher] terminate with reason: ~p", [Reason]), ok; terminate(Reason, _State) -> diff --git a/apps/iot/src/iot_mqtt_subscriber.erl b/apps/iot/src/iot_mqtt_subscriber.erl index 12f15db..e2864bd 100644 --- a/apps/iot/src/iot_mqtt_subscriber.erl +++ b/apps/iot/src/iot_mqtt_subscriber.erl @@ -102,8 +102,8 @@ handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> lager:debug("[iot_mqtt_host_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 -handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) -> - lager:debug("[iot_mqtt_subscriber] Recv a publish packet: ~p, qos: ~p", [Message, Qos]), +handle_info({publish, #{payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) -> + lager:debug("[iot_mqtt_subscriber] Recv a publish from topic: ~p, qos: ~p", [Topic, Qos]), %% 将消息分发到对应的host进程去处理 case Topic of <<"host/upstream/", UUID/binary>> -> @@ -116,14 +116,14 @@ handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload, qo {ok, NewHostPid} -> iot_host:handle(NewHostPid, Payload); {error, Reason} -> - lager:warning("[iot_mqtt_subscriber] try start_new_host get error: ~p, assoc packet: ~p", [Reason, Message]) + lager:warning("[iot_mqtt_subscriber] try start_new_host uuid: ~p, get error: ~p", [UUID, Reason]) end end; _ -> - lager:warning("[iot_mqtt_subscriber] invalid topic: ~p, packet: ~p, qos: ~p", [Topic, Message, Qos]) + lager:warning("[iot_mqtt_subscriber] invalid topic: ~p, qos: ~p", [Topic, Qos]) end, {noreply, State}; -handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> +handle_info({puback, Packet}, State = #state{}) -> lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), {noreply, State}; @@ -138,7 +138,7 @@ handle_info(Info, State = #state{}) -> %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> +terminate(Reason, #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> %% 取消topic的订阅 TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), diff --git a/apps/iot/src/iot_rule_parser.erl b/apps/iot/src/iot_rule_parser.erl deleted file mode 100644 index bc68107..0000000 --- a/apps/iot/src/iot_rule_parser.erl +++ /dev/null @@ -1,217 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 27. 2月 2023 16:00 -%%%------------------------------------------------------------------- --module(iot_rule_parser). --author("licheng5"). - -%% API --export([parse/1, test/0]). --export([parse_bracket/1]). - - --record(or_expr, { - expr = [] -}). - --record(and_expr, { - expr = [] -}). - -test() -> - %Rule = <<"SELECT * FROM service.data WHERE id > 0 AND (name = 'anlicheng' OR name = 'test')">>, - %parse(Rule), - %Tokens = parse_condition(<<"id > 0 OR id < 3 AND (name = 'anlicheng' OR (name = 'test' AND (y = 1 OR x = 1))) AND (age > 1)">>), - Tokens = parse_condition(<<"id < 3 OR (name = 'anlicheng' OR (name = 'test' AND (y = 1 OR x = 1))) AND (age > 1 OR y = 1)">>), - % generate(Tokens, #{<<"name">> => <<"anlicheng">>, <<"id">> => 1234}), - ok. - -scan(Tokens) -> - ExprList = scan_or(Tokens), - ORChildExpr = lists:map(fun(E) -> - case E of - {simple, Expr} -> - AndChildExpr = scan_and(Expr), - #and_expr{expr = AndChildExpr}; - {complex, Expr} -> - AndChildExpr = scan_and(Expr), - #and_expr{expr = AndChildExpr} - end - end, ExprList), - #or_expr{expr = ORChildExpr}. - -scan_or(Tokens) -> - lager:debug("tokens: ~p", [Tokens]), - scan_or(Tokens, [], [], 0, 0). -%% 扫描完成并且最后一个表达式为空 -scan_or([], [], Acc, _, _) -> - lists:reverse(Acc); -%% 扫描完成并且最后一个表达式不为空 -scan_or([], Expr, Acc, _, Deep) -> - lists:reverse([{get_tag(Deep), lists:reverse(Expr)}|Acc]); -%% 遇到OR关键词, 并且此时的层级为0 -scan_or([32, $O, $R, 32|Tokens], Expr, Acc, 0, Deep) -> - scan_or(Tokens, [], [{get_tag(Deep), lists:reverse(Expr)}|Acc], 0, 0); -%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符 -scan_or([Token|Tokens], Expr, Acc, Level, Deep) when Token == $( -> - scan_or(Tokens, [Token|Expr], Acc, Level + 1, Deep + 1); -%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1 -scan_or([Token|Tokens], Expr, Acc, Level, Deep) when Token == $) -> - scan_or(Tokens, [Token|Expr], Acc, Level - 1, Deep); -%% 普通字符 -scan_or([Token|Tokens], Expr, Acc, Level, Deep) -> - scan_or(Tokens, [Token|Expr], Acc, Level, Deep). - -scan_and(Tokens) -> - scan_and(Tokens, [], [], 0, 0). -%% 扫描完成并且最后一个表达式为空 -scan_and([], [], Acc, _, _) -> - lists:reverse(Acc); -%% 扫描完成并且最后一个表达式不为空 -scan_and([], Expr, Acc, _, Deep) -> - lists:reverse([{get_tag(Deep), lists:reverse(Expr)}|Acc]); -%% 遇到OR关键词, 并且此时的层级为0 -scan_and([32, $A, $N, $D, 32|Tokens], Expr, Acc, 0, Deep) -> - scan_and(Tokens, [], [{get_tag(Deep), lists:reverse(Expr)}|Acc], 0, 0); -%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符 -scan_and([Token|Tokens], Expr, Acc, Level, Deep) when Token == $( -> - scan_and(Tokens, [Token|Expr], Acc, Level + 1, Deep + 1); -%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1 -scan_and([Token|Tokens], Expr, Acc, Level, Deep) when Token == $) -> - scan_and(Tokens, [Token|Expr], Acc, Level - 1, Deep); -%% 普通字符 -scan_and([Token|Tokens], Expr, Acc, Level, Deep) -> - scan_and(Tokens, [Token|Expr], Acc, Level, Deep). - -parse(Rule) -> - {ok, MP} = re:compile("SELECT (.*) FROM (.*) WHERE (.*)"), - case re:run(Rule, MP, [{capture, all, binary}]) of - nomatch -> - lager:debug("not match rule"); - {match, Captured} -> - lager:debug("match rule: ~p", [Captured]) - end, - ok. - -parse_condition(Condition) when is_binary(Condition) -> - Tokens = unicode:characters_to_list(Condition), - X = scan(Tokens), - lager:debug("E is: ~p", [X]), - X. - -parse_token(Tokens) -> - parse_token(Tokens, [], [], 0). -%% 扫描完成并且最后一个表达式为空 -parse_token([], [], Acc, 0) -> - lists:reverse(Acc); -%% 扫描完成并且最后一个表达式不为空 -parse_token([], Expr, Acc, 0) -> - lists:reverse([{simple, lists:reverse(Expr)}|Acc]); -%% 扫描到左括号 && Level == 0; 此时的Expr为简单表达式 -parse_token([Token|Tokens], Expr, Acc, 0) when Token == $( -> - parse_token(Tokens, [], [{simple, lists:reverse(Expr)}|Acc], 1); -%% 扫描到左括号 && Level > 0; 此时的Expr需要更多的字符 -parse_token([Token|Tokens], Expr, Acc, Level) when Token == $( -> - parse_token(Tokens, [Token|Expr], Acc, Level + 1); -%% 扫描到右括号 && Level == 1; 此时的Expr表达式的字符串扫描完毕; 递归处理子表达式 -parse_token([Token|Tokens], Expr, Acc, 1) when Token == $) -> - parse_token(Tokens, [], [{complex, parse_token(lists:reverse(Expr))}|Acc], 0); -%% 扫描到右括号 && Level > 1; 此时的Expr表达式的内部嵌套的子串扫描完成,Level的值减1 -parse_token([Token|Tokens], Expr, Acc, Level) when Token == $) -> - parse_token(Tokens, [Token|Expr], Acc, Level - 1); -%% 普通字符 -parse_token([Token|Tokens], Expr, Acc, Level) -> - parse_token(Tokens, [Token|Expr], Acc, Level). - -trim([$(|Tokens]) -> - trim0(Tokens, [], 0); -trim(Tokens) -> - Tokens. -trim0([$)], Acc, 1) -> - {ok, Acc}; -trim0([$(|Tokens], Acc, Level) -> - trim0(Tokens, [$(|Acc], Level + 1); -trim0([$)|Tokens], Acc, Level) -> - trim0(Tokens, [$)|Acc], Level - 1); -trim0([H|Tokens], Acc, Level) -> - trim0(Tokens, [H|Acc], Level). - -%% 处理表达式的优先级关系 -generate([{simple, Expr}|ExprList], Data) -> - Tokens = string:tokens(Expr, "OR"), - Tokens1 = lists:map(fun(Token) -> string:tokens(Token, "AND") end, Tokens), - - lager:debug("tokens: ~p", [Tokens1]), - - - - ok. - - - -get_tag(0) -> - simple; -get_tag(_) -> - complex. - --record(bracket, { - items = [] -}). - -%% 括号解析 -parse_bracket(Tokens) -> - parse_bracket(Tokens, [], []). - -parse_bracket([], [], S) -> - #bracket{items = lists:reverse(S)}; -parse_bracket([], Acc, S) -> - #bracket{items = lists:reverse([lists:reverse(Acc)|S])}; -parse_bracket([$(|Tokens], Acc, S) -> - {ChildElements, RestTokens} = parse_bracket0(Tokens), - Bracket = parse_bracket(ChildElements), - parse_bracket(RestTokens, [], [Bracket, lists:reverse(Acc)|S]); -parse_bracket([H|Tokens], Acc, S) -> - parse_bracket(Tokens, [H|Acc], S). - -%% 截取配对的括号 -parse_bracket0(Tokens) -> - parse_bracket0(Tokens, [], 1). -parse_bracket0([$)|Tokens], Acc, Level) when Level - 1 == 0 -> - {lists:reverse(Acc), Tokens}; -parse_bracket0([$)|Tokens], Acc, Level) -> - parse_bracket0(Tokens, [$)|Acc], Level - 1); -parse_bracket0([$(|Tokens], Acc, Level) -> - parse_bracket0(Tokens, [$(|Acc], Level + 1); -parse_bracket0([H|Tokens], Acc, Level) -> - parse_bracket0(Tokens, [H|Acc], Level). - -%% 处理运算符号解析出表达式和操作符号 -%{bracket,["id < 3 OR ", -% {bracket,["name = 'anlicheng' OR ", -% {bracket,["name = 'test' AND ", -% {bracket,["y = 1 OR x = 1"]}]}]}, -% " OR ", -% {bracket,["1 = 1"]}]} - -syntax(Bracket = #bracket{items = Items}) -> - lists:map(fun(E) -> - case E of - B = #bracket{} -> - syntax(B); - Expr -> - string:split(Expr, "AND", all) - - end - end, Items), - - - - ok. - - - -