From 50ee83febab5a95b788f4e5973af41bd43d2a794 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Tue, 20 Jun 2023 17:49:01 +0800 Subject: [PATCH] fix host --- .../src/http_handler/http_host_handler.erl | 6 +- apps/iot/src/iot_host.erl | 89 +++------ apps/iot/src/iot_mqtt_reply_subscriber.erl | 173 ++++++++++++++++++ apps/iot/src/iot_sup.erl | 9 + 4 files changed, 210 insertions(+), 67 deletions(-) create mode 100644 apps/iot/src/iot_mqtt_reply_subscriber.erl diff --git a/apps/iot/src/http_handler/http_host_handler.erl b/apps/iot/src/http_handler/http_host_handler.erl index de02d91..b6d70a9 100644 --- a/apps/iot/src/http_handler/http_host_handler.erl +++ b/apps/iot/src/http_handler/http_host_handler.erl @@ -113,8 +113,7 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := {ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, Pid} when is_pid(Pid) -> lager:debug("[host_handler] activate host_id: ~p, start", [UUID]), - {ok, Assoc} = iot_host:make_assoc(Pid), - ReplyTopic = iot_host:upstream_topic(UUID), + {ok, Assoc, ReplyTopic} = iot_mqtt_reply_subscriber:make_assoc(UUID), BinReply = jiffy:encode(#{<<"auth">> => true, <<"reply">> => #{<<"topic">> => ReplyTopic, <<"assoc">> => Assoc}}, [force_utf8]), case iot_mqtt_publisher:publish(iot_host:downstream_topic(UUID), <<8:8, BinReply/binary>>, 2) of @@ -151,8 +150,7 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := case iot_host:has_session(Pid) of true -> lager:debug("[host_handler] activate host_id: ~p, start", [UUID]), - {ok, Assoc} = iot_host:make_assoc(Pid), - ReplyTopic = iot_host:upstream_topic(UUID), + {ok, Assoc, ReplyTopic} = iot_mqtt_reply_subscriber:make_assoc(UUID), BinReply = jiffy:encode(#{<<"auth">> => false, <<"reply">> => #{<<"topic">> => ReplyTopic, <<"assoc">> => Assoc}}, [force_utf8]), case iot_mqtt_publisher:publish(iot_host:downstream_topic(UUID), <<8:8, BinReply/binary>>, 2) of diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index febd3eb..c647695 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -14,7 +14,7 @@ %% API -export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). --export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]). +-export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, rsa_encode/3]). -export([has_session/1]). %% gen_statem callbacks @@ -41,12 +41,7 @@ metrics = #{} :: map(), %% 是否获取到了ping请求 - is_answered = false :: boolean(), - - %% 任务的自增id - increment_id = 1, - %% 关联数据 - assoc_map = #{} + is_answered = false :: boolean() }). %%%=================================================================== @@ -84,10 +79,6 @@ reload(Pid) when is_pid(Pid) -> get_aes(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_aes). --spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}. -make_assoc(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, {make_assoc, self()}). - %% 激活主机, true 表示激活; false表示关闭激活 -spec activate(Pid :: pid(), Auth :: boolean()) -> ok. activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> @@ -178,12 +169,6 @@ handle_event({call, From}, has_session, StateName, State) -> HasSession = (StateName =:= session) orelse false, {keep_state, State, [{reply, From, HasSession}]}; -handle_event({call, From}, {make_assoc, ReceiverPid}, _, State = #state{uuid = UUID, increment_id = IncrementId, assoc_map = AssocMap}) -> - BinIncrementId = erlang:integer_to_binary(IncrementId), - Assoc = <>, - - {keep_state, State#state{assoc_map = maps:put(Assoc, ReceiverPid, AssocMap), increment_id = IncrementId + 1}, [{reply, From, {ok, Assoc}}]}; - %% 基于rsa加密 handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) -> Reply = iot_cipher_rsa:encode(PlainText, PubKey), @@ -237,45 +222,33 @@ handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) -> %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理 %% 收到消息则认为主机端已经发送了心跳包 -handle_event(cast, {handle, <>}, denied, State = #state{uuid = UUID}) -> - case catch jiffy:decode(Params, [return_maps]) of - #{<<"pub_key">> := PubKey} -> - lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), +handle_event(cast, {handle, <>}, denied, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) - after 10000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) - end, - {keep_state, State#state{is_answered = true}}; - Msg -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error: ~p", [UUID, Msg]), - {keep_state, State} - end; + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) + end, + {keep_state, State#state{is_answered = true}}; -handle_event(cast, {handle, <>}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> - case catch jiffy:decode(Params, [return_maps]) of - #{<<"pub_key">> := PubKey} -> - lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = #{<<"a">> => true, <<"aes">> => Aes}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), +handle_event(cast, {handle, <>}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => true, <<"aes">> => Aes}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), - {next_state, session, State#state{pub_key = PubKey, is_answered = true}} - after 10000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), - {keep_state, State#state{is_answered = true}} - end; - Msg -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error: ~p", [UUID, Msg]), - {keep_state, State} + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), + {next_state, session, State#state{pub_key = PubKey, is_answered = true}} + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), + {keep_state, State#state{is_answered = true}} end; handle_event(cast, {handle, <>}, session, State = #state{uuid = UUID, aes = AES}) -> @@ -355,16 +328,6 @@ handle_event(cast, {handle, <>}, sessio end, {keep_state, State}; -%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string} -handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) -> - case maps:take(Assoc, AssocMap) of - error -> - {keep_state, State}; - {ReceiverPid, NAssocMap} -> - ReceiverPid ! {host_reply, Assoc, Msg}, - {keep_state, State#state{assoc_map = NAssocMap}} - end; - handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) -> erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), %% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status diff --git a/apps/iot/src/iot_mqtt_reply_subscriber.erl b/apps/iot/src/iot_mqtt_reply_subscriber.erl new file mode 100644 index 0000000..df04346 --- /dev/null +++ b/apps/iot/src/iot_mqtt_reply_subscriber.erl @@ -0,0 +1,173 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享 +%%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题 +%%% @end +%%% Created : 12. 3月 2023 21:27 +%%%------------------------------------------------------------------- +-module(iot_mqtt_reply_subscriber). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0, make_assoc/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%% 需要订阅的主题信息 +-define(Topic, <<"system/assoc_reply">>). + +-record(state, { + conn_pid :: pid(), + + %% 发送完成但是还未收到响应的请求 + inflight = #{} :: map(), + + %% 关联数据 + assoc_map = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec make_assoc(UUID :: binary()) -> {ok, Assoc :: binary(), Topic :: binary()}. +make_assoc(UUID) when is_binary(UUID) -> + gen_server:call(?MODULE, {make_assoc, UUID, self()}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 建立到emqx服务器的连接 + Opts = iot_config:emqt_opts(<<"assoc-reply-subscriber">>), + {ok, ConnPid} = emqtt:start_link(Opts), + %% 监听和host相关的全部事件 + {ok, _} = emqtt:connect(ConnPid), + lager:debug("[iot_mqtt_reply_subscriber] connect success, pid: ~p", [ConnPid]), + SubscribeResult = emqtt:subscribe(ConnPid, {?Topic, 1}), + + lager:debug("[iot_mqtt_reply_subscriber] subscribe topics: ~p, result is: ~p", [?Topic, SubscribeResult]), + + {ok, #state{conn_pid = ConnPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({make_assoc, UUID, ReceiverPid}, _From, State = #state{conn_pid = _ConnPid, assoc_map = AssocMap}) -> + Rand = list_to_binary(iot_util:rand_bytes(16)), + Assoc = <>, + + {reply, {ok, Assoc, ?Topic}, State#state{assoc_map = maps:put(Assoc, ReceiverPid, AssocMap)}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({disconnected, ReasonCode, Properties}, State = #state{}) -> + lager:debug("[iot_mqtt_reply_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + {stop, disconnected, State}; +%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 +handle_info({publish, #{payload := Payload, qos := Qos}}, State = #state{assoc_map = AssocMap}) -> + lager:debug("[iot_mqtt_reply_subscriber] Recv a reply packet: ~p, qos: ~p", [Payload, Qos]), + + %% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string} + case catch jiffy:decode(Payload, [return_maps]) of + Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc} -> + case maps:take(Assoc, AssocMap) of + error -> + {noreply, State}; + {ReceiverPid, NAssocMap} -> + ReceiverPid ! {host_reply, Assoc, Msg}, + {noreply, State#state{assoc_map = NAssocMap}} + end; + _ -> + {noreply, State} + end; + +handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> + lager:debug("[iot_mqtt_reply_subscriber] receive puback packet: ~p", [Packet]), + {noreply, State}; + +%% 收到任务的反馈信息 +handle_info({ok, Ref, _PacketId}, State = #state{inflight = Inflight}) -> + case maps:take(Ref, Inflight) of + error -> + {noreply, State}; + {{UUID, Msg}, NInflight} -> + lager:debug("[iot_mqtt_reply_subscriber] send message: ~p, to uuid: ~p, success", [Msg, UUID]), + + {noreply, State#state{inflight = NInflight}} + end; + +handle_info(Info, State = #state{}) -> + lager:debug("[iot_mqtt_reply_subscriber] get info: ~p", [Info]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> + %% 取消topic的订阅 + TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), + + ok = emqtt:disconnect(ConnPid), + lager:debug("[iot_mqtt_reply_subscriber] terminate with reason: ~p", [Reason]), + ok; +terminate(Reason, _State) -> + lager:debug("[iot_mqtt_reply_subscriber] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 4d261ee..9eb821b 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -39,6 +39,15 @@ init([]) -> modules => ['iot_mqtt_subscriber'] }, + #{ + id => 'iot_mqtt_reply_subscriber', + start => {'iot_mqtt_reply_subscriber', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_mqtt_reply_subscriber'] + }, + #{ id => 'iot_mqtt_sys_subscriber', start => {'iot_mqtt_sys_subscriber', start_link, []},