From 360d47d60c95b67be4b025489f55142c5d4f7798 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Wed, 14 Jun 2023 19:41:32 +0800 Subject: [PATCH] =?UTF-8?q?=E5=A4=84=E7=90=86=E5=93=8D=E5=BA=94=E6=95=B0?= =?UTF-8?q?=E6=8D=AE?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .../src/http_handler/http_host_handler.erl | 6 +- apps/iot/src/iot_host.erl | 29 +++- apps/iot/src/iot_mqtt_reply_subscriber.erl | 156 ------------------ apps/iot/src/iot_sup.erl | 9 - apps/iot/src/mocker/host_mocker.erl | 37 ++++- docs/host-mqtt-jiaohu.md | 4 +- 6 files changed, 63 insertions(+), 178 deletions(-) delete mode 100644 apps/iot/src/iot_mqtt_reply_subscriber.erl diff --git a/apps/iot/src/http_handler/http_host_handler.erl b/apps/iot/src/http_handler/http_host_handler.erl index 7502698..8293de6 100644 --- a/apps/iot/src/http_handler/http_host_handler.erl +++ b/apps/iot/src/http_handler/http_host_handler.erl @@ -111,12 +111,12 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := lager:debug("[host_handler] activate host_id: ~p, failed with reason: ~p", [UUID, Reason]), {ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, Pid} when is_pid(Pid) -> - lager:debug("[host_handler] reload host_id: ~p, success", [UUID]), + lager:debug("[host_handler] activate host_id: ~p, start", [UUID]), %% 下发指令 - {ok, Assoc, ReplyTopic} = iot_mqtt_reply_subscriber:make_assoc(UUID), + {ok, Assoc, Aes, ReplyTopic} = iot_host:make_assoc(Pid), Reply = case Auth of true -> - #{<<"auth">> => true, <<"aes">> => iot_host:get_aes(Pid), <<"reply">> => #{<<"topic">> => ReplyTopic, <<"assoc">> => Assoc}}; + #{<<"auth">> => true, <<"aes">> => Aes, <<"reply">> => #{<<"topic">> => ReplyTopic, <<"assoc">> => Assoc}}; false -> #{<<"auth">> => false, <<"aes">> => <<"">>, <<"reply">> => #{ <<"topic">> => ReplyTopic, <<"assoc">> => Assoc}} end, diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index f0230e5..28da419 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -18,7 +18,7 @@ %% API -export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/1]). --export([get_metric/1, build_command/3, downstream_topic/1, upstream_topic/1, get_aes/1]). +-export([get_metric/1, build_command/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -46,7 +46,12 @@ is_activated :: boolean(), %% 会话状态 - has_session = false :: boolean() + has_session = false :: boolean(), + + %% 任务的自增id + increment_id = 1, + %% 关联数据 + assoc_map = #{} }). %%%=================================================================== @@ -84,6 +89,10 @@ reload(Pid) when is_pid(Pid) -> get_aes(Pid) when is_pid(Pid) -> gen_server:call(Pid, get_aes). +-spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary(), Aes :: binary(), Topic :: binary()}. +make_assoc(Pid) when is_pid(Pid) -> + gen_server:call(Pid, {make_assoc, self()}). + %% 激活主机 -spec activate(Pid :: pid()) -> ok. activate(Pid) when is_pid(Pid) -> @@ -142,6 +151,12 @@ handle_call(get_metric, _From, State = #state{metrics = Metrics}) -> handle_call(get_aes, _From, State = #state{aes = Aes}) -> {reply, {ok, Aes}, State}; +handle_call({make_assoc, ReceivePid}, _From, State = #state{uuid = UUID, increment_id = IncrementId, assoc_map = AssocMap, aes = Aes}) -> + BinIncrementId = erlang:integer_to_binary(IncrementId), + Assoc = <>, + + {reply, {ok, Assoc, Aes, upstream_topic(UUID)}, State#state{assoc_map = maps:put(Assoc, ReceivePid, AssocMap), increment_id = IncrementId + 1}}; + %% 重新加载主机信息 handle_call(reload, _From, State = #state{uuid = UUID}) -> %% 重新加载主机信息 @@ -294,6 +309,16 @@ handle_message(#{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}, Stat lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), State#state{metrics = Metrics}; +%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string} +handle_message(Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}, State = #state{assoc_map = AssocMap}) -> + case maps:take(Assoc, AssocMap) of + error -> + {noreply, State}; + {ReceiverPid, NAssocMap} -> + ReceiverPid ! {host_reply, Assoc, Msg}, + {noreply, State#state{assoc_map = NAssocMap}} + end; + handle_message(Message, State = #state{uuid = UUID, has_session = HasSession}) -> lager:warning("[iot_host] host_id uuid: ~p, get a unknown message: ~p, session: ~p", [UUID, Message, HasSession]), State. diff --git a/apps/iot/src/iot_mqtt_reply_subscriber.erl b/apps/iot/src/iot_mqtt_reply_subscriber.erl deleted file mode 100644 index 4be7b2c..0000000 --- a/apps/iot/src/iot_mqtt_reply_subscriber.erl +++ /dev/null @@ -1,156 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享 -%%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题 -%%% @end -%%% Created : 12. 3月 2023 21:27 -%%%------------------------------------------------------------------- --module(iot_mqtt_reply_subscriber). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/0]). --export([make_assoc/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% 需要订阅的主题信息 --define(Topic, <<"host/reply">>). - --record(state, { - conn_pid :: pid(), - %% 关联数据 - assoc_map = #{}, - %% 任务的自增id - increment_id = 1 -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -make_assoc(UUID) when is_binary(UUID) -> - gen_server:call(?MODULE, {make_assoc, UUID, self()}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - %% 建立到emqx服务器的连接 - Opts = iot_config:emqt_opts(<<"reply-subscriber">>), - {ok, ConnPid} = emqtt:start_link(Opts), - %% 监听和host相关的全部事件 - {ok, _} = emqtt:connect(ConnPid), - lager:debug("[iot_mqtt_reply_subscriber] connect success, pid: ~p", [ConnPid]), - SubscribeResult = emqtt:subscribe(ConnPid, [{?Topic, 1}]), - - lager:debug("[iot_mqtt_reply_subscriber] subscribe topics: ~p, result is: ~p", [?Topic, SubscribeResult]), - - {ok, #state{conn_pid = ConnPid}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call({make_assoc, UUID, ReceivePid}, _From, State = #state{increment_id = IncrementId, assoc_map = AssocMap}) -> - BinIncrementId = erlang:integer_to_binary(IncrementId), - Assoc = <>, - - {reply, {ok, Assoc, ?Topic}, State#state{assoc_map = maps:put(Assoc, ReceivePid, AssocMap)}}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> - lager:debug("[iot_mqtt_reply_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), - {stop, disconnected, State}; -handle_info({publish, #{payload := Payload, qos := Qos, topic := ?Topic}}, State = #state{assoc_map = AssocMap}) -> - lager:debug("[iot_mqtt_reply_subscriber] Recv a reply packet: ~p, qos: ~p", [Payload, Qos]), - case catch jiffy:decode(Payload, [return_maps]) of - Msg = #{<<"assoc">> := Assoc} -> - case maps:take(Assoc, AssocMap) of - error -> - {noreply, State}; - {ReceiverPid, NAssocMap} -> - ReceiverPid ! {host_reply, Assoc, Msg}, - {noreply, State#state{assoc_map = NAssocMap}} - end; - _ -> - {noreply, State} - end; - -handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> - lager:debug("[iot_mqtt_reply_subscriber] receive puback packet: ~p", [Packet]), - {noreply, State}; - -handle_info(Info, State = #state{}) -> - lager:debug("[iot_mqtt_reply_subscriber] get info: ~p", [Info]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> - %% 取消topic的订阅 - {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, ?Topic), - - ok = emqtt:disconnect(ConnPid), - lager:debug("[iot_mqtt_reply_subscriber] terminate with reason: ~p", [Reason]), - ok; -terminate(Reason, _State) -> - lager:debug("[iot_mqtt_reply_subscriber] terminate with reason: ~p", [Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 348db54..f048218 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -46,15 +46,6 @@ init([]) -> modules => ['iot_mqtt_subscriber'] }, - #{ - id => 'iot_mqtt_reply_subscriber', - start => {'iot_mqtt_reply_subscriber', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_mqtt_reply_subscriber'] - }, - #{ id => 'iot_mqtt_sys_subscriber', start => {'iot_mqtt_sys_subscriber', start_link, []}, diff --git a/apps/iot/src/mocker/host_mocker.erl b/apps/iot/src/mocker/host_mocker.erl index ea072e6..f37a63d 100644 --- a/apps/iot/src/mocker/host_mocker.erl +++ b/apps/iot/src/mocker/host_mocker.erl @@ -153,7 +153,7 @@ handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{payload := Payload, qos := Qos, topic := FromTopic}}, - State = #state{topic = Topic, pub_key = PubKey, pri_key = PrivateKey}) -> + State = #state{topic = Topic, pub_key = PubKey, pri_key = PrivateKey, aes = Aes0}) -> lager:debug("[host_mocker] Recv a publish packet: ~p, qos: ~p, from topic: ~p", [Payload, Qos, FromTopic]), case Payload of @@ -182,8 +182,23 @@ handle_info({publish, #{payload := Payload, qos := Qos, topic := FromTopic}}, end, {noreply, State}; - <> -> + %% 10是通过rsa加密的数据,里面保存了aes;后续的所有通讯都是通过aes加密的 + <<10:8, Command0/binary>> -> Command = iot_cipher_rsa:decode(Command0, PrivateKey), + case jiffy:decode(Command, [return_maps]) of + #{<<"aes">> := Aes, <<"a">> := true} -> + %% 启动周期ping + % erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), + + %% 数据收集 + % erlang:start_timer(?TICKER_INTERVAL + 1000, self(), data_ticker), + {noreply, State#state{aes = Aes}}; + _ -> + lager:debug("[host_mocker] auth failed") + end; + + <> -> + Command = iot_cipher_aes:decrypt(Aes0, Command0), CommandJson = jiffy:decode(Command, [return_maps]), lager:debug("[host_mocker] get command: ~p, json: ~p, type: ~p", [Command, CommandJson, Type]), @@ -308,12 +323,20 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -handle_command(10, #{<<"aes">> := Aes, <<"a">> := true}, State) -> - %% 启动周期ping - erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), +handle_command(8, #{<<"aes">> := Aes, <<"auth">> := true, <<"reply">> := #{<<"topic">> := Topic, <<"assoc">> := Assoc}}, State) -> + Msg = jiffy:encode(#{ + <<"code">> => 1, + <<"message">> => "", + <<"assoc">> => Assoc + }, [force_utf8]), - %% 数据收集 - erlang:start_timer(?TICKER_INTERVAL + 1000, self(), data_ticker), + {ok, Ref} = iot_mqtt_publisher:publish(Topic, Msg, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[host_mocker] send reply success, packet_id: ~p", [PacketId]); + {error, Reason} -> + lager:debug("[host_mocker] send reply failed, reason: ~p", [Reason]) + end, State#state{aes = Aes}. diff --git a/docs/host-mqtt-jiaohu.md b/docs/host-mqtt-jiaohu.md index e0d201a..08b98ba 100644 --- a/docs/host-mqtt-jiaohu.md +++ b/docs/host-mqtt-jiaohu.md @@ -74,6 +74,8 @@ t = 8 ```json +// 数据采用之前协商的aes加密 + { "auth": true/false, // true表示授权,此时aes的值不为空;false表示取消授权 "aes": "", @@ -87,7 +89,7 @@ t = 8 { "code": 0 | 1, // 0表示操作失败,1表示成功 - messge: "", + message: "", assoc: "下发给你值" }