From f269525ba9a285c81d9544db7e7b938a108dfc92 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Mon, 19 Jun 2023 11:50:14 +0800 Subject: [PATCH] change iot_host gen_server to gen_statem --- apps/iot/src/iot_host.erl | 413 +++++++++++++++++-------------------- apps/iot/src/iot_host2.erl | 400 ----------------------------------- 2 files changed, 193 insertions(+), 620 deletions(-) delete mode 100644 apps/iot/src/iot_host2.erl diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 5be92cc..ad70a47 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -2,30 +2,29 @@ %%% @author aresei %%% @copyright (C) 2023, %%% @doc -%%% TODO -%%% 1. 指令下发是需要微服务名称的,微服务的名称里面是带copy的完整名称: (modbus:12345) +%%% %%% @end -%%% Created : 12. 3月 2023 21:27 +%%% Created : 19. 6月 2023 10:32 %%%------------------------------------------------------------------- -module(iot_host). -author("aresei"). -include("iot.hrl"). --behaviour(gen_server). - -%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行 --define(TICKER_INTERVAL, 5000 * 2). +-behaviour(gen_statem). %% API -export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). -export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]). -export([has_session/1]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_statem callbacks +-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). -define(SERVER, ?MODULE). +%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行 +-define(TICKER_INTERVAL, 5000 * 2). + -record(state, { host_id :: integer(), %% 从数据库里面读取到的数据 @@ -44,12 +43,6 @@ %% 是否获取到了ping请求 is_answered = false :: boolean(), - %% 标识当前主机是否已经注册,每次会话时主机需要重新协商会话; 通过status的值判断是否已经激活,值为:-1时,表示未激活 - is_activated :: boolean(), - - %% 会话状态 - has_session = false :: boolean(), - %% 任务的自增id increment_id = 1, %% 关联数据 @@ -59,7 +52,6 @@ %%%=================================================================== %%% API %%%=================================================================== - -spec get_pid(UUID :: binary()) -> undefined | pid(). get_pid(UUID) when is_binary(UUID) -> Name = get_name(UUID), @@ -81,60 +73,60 @@ upstream_topic(UUID) when is_binary(UUID) -> %% 处理消息 -spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return(). handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) -> - gen_server:cast(Pid, {handle, Payload}). + gen_statem:cast(Pid, {handle, Payload}). %% 重新加载主机的基本信息 -spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}. reload(Pid) when is_pid(Pid) -> - gen_server:call(Pid, reload). + gen_statem:call(Pid, reload). -spec get_aes(Pid :: pid()) -> Aes :: binary(). get_aes(Pid) when is_pid(Pid) -> - gen_server:call(Pid, get_aes). + gen_statem:call(Pid, get_aes). -spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}. make_assoc(Pid) when is_pid(Pid) -> - gen_server:call(Pid, {make_assoc, self()}). + gen_statem:call(Pid, {make_assoc, self()}). %% 激活主机, true 表示激活; false表示关闭激活 -spec activate(Pid :: pid(), Auth :: boolean()) -> ok. activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> - gen_server:call(Pid, {activate, Auth}). + gen_statem:call(Pid, {activate, Auth}). -spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}. get_metric(Pid) when is_pid(Pid) -> - gen_server:call(Pid, get_metric). + gen_statem:call(Pid, get_metric). -spec has_session(Pid :: pid()) -> boolean(). has_session(Pid) when is_pid(Pid) -> - gen_server:call(Pid, has_session). + gen_statem:call(Pid, has_session). %% 基于rsa加密的指令都是不需要会话存在的 -spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) -> {ok, EncText :: binary()} | {error, Reason :: binary()}. rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) -> - gen_server:call(Pid, {rsa_encode, CommandType, PlainText}). + gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}). -spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) -> {ok, Command :: binary()} | {error, Reason :: any()}. aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) -> - gen_server:call(Pid, {aes_encode, CommandType, Params}). + gen_statem:call(Pid, {aes_encode, CommandType, Params}). -%% @doc Spawns the server and registers the local name (unique) --spec(start_link(Name :: atom(), UUID :: binary()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> - gen_server:start_link({local, Name}, ?MODULE, [UUID], []). + gen_statem:start_link({local, Name}, ?MODULE, [UUID], []). %%%=================================================================== -%%% gen_server callbacks +%%% gen_statem callbacks %%%=================================================================== %% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. init([UUID]) -> case host_bo:get_host_by_uuid(UUID) of {ok, #{<<"status">> := Status, <<"id">> := HostId}} -> @@ -144,92 +136,88 @@ init([UUID]) -> %% 告知主机端需要重新授权 gen_server:cast(self(), need_auth), - {ok, #state{host_id = HostId, uuid = UUID, aes = Aes, is_activated = (Status /= ?HOST_STATUS_INACTIVE), status = Status, has_session = false}}; + StateName = case Status =:= ?HOST_STATUS_INACTIVE of + true -> denied; + false -> activated + end, + + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore end. + %% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(get_metric, _From, State = #state{metrics = Metrics, has_session = true}) -> - {reply, {ok, Metrics}, State}; -handle_call(get_metric, _From, State) -> - {reply, {ok, #{}}, State}; +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. -handle_call(get_aes, _From, State = #state{aes = Aes}) -> - {reply, {ok, Aes}, State}; +%% @private +%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or +%% (2) when gen_statem terminates abnormally. +%% This callback is optional. +format_status(_Opt, [_PDict, _StateName, _State]) -> + Status = some_term, + Status. -%% 获取当前是否存在会话 -handle_call(has_session, _From, State = #state{has_session = HasSession}) -> - {reply, HasSession, State}; +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) -> + {keep_state, State, [{reply, From, {ok, Metrics}}]}; +handle_event({call, From}, get_metric, _, State) -> + {keep_state, State, [{reply, From, {ok, #{}}}]}; -%% 实现rsa加密 -%% 基于rsa加密的指令都是不需要会话存在的 -handle_call({rsa_encode, _, _}, _From, State = #state{pub_key = <<>>}) -> - {reply, {error, <<"会话未建立"/utf8>>}, State}; -handle_call({rsa_encode, CommandType, PlainText}, _From, State = #state{pub_key = PubKey}) -> +handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> + {keep_state, State, [{reply, From, {ok, Aes}}]}; + +handle_event({call, From}, has_session, StateName, State) -> + HasSession = (StateName =:= session) orelse false, + {keep_state, State, [{reply, From, HasSession}]}; + +%% 基于rsa加密 +handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) -> Reply = iot_cipher_rsa:encode(PlainText, PubKey), + {keep_state, State, [{reply, From, {ok, <>}}]}; +handle_event({call, From}, {rsa_encode, _, _}, _, State) -> + {keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]}; - {reply, {ok, <>}, State}; +%% 基于aes加密 +handle_event({call, From}, {aes_encode, CommandType, Command}, session, State = #state{aes = AES}) -> + EncCommand = iot_cipher_aes:encrypt(AES, Command), + {keep_state, State, [{reply, From, {ok, <>}}]}; +handle_event({call, From}, {aes_encode, _, _}, _, State) -> + {keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]}; -handle_call({make_assoc, ReceivePid}, _From, State = #state{uuid = UUID, increment_id = IncrementId, assoc_map = AssocMap}) -> - BinIncrementId = erlang:integer_to_binary(IncrementId), - Assoc = <>, - - {reply, {ok, Assoc}, State#state{assoc_map = maps:put(Assoc, ReceivePid, AssocMap), increment_id = IncrementId + 1}}; - -%% 重新加载主机信息 -handle_call(reload, _From, State = #state{uuid = UUID}) -> +handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) -> %% 重新加载主机信息 case host_bo:get_host_by_uuid(UUID) of - {ok, Host = #{<<"status">> := Status, <<"id">> := HostId}} -> + {ok, Host = #{<<"status">> := Status}} -> lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]), - {reply, ok, State#state{host_id = HostId, is_activated = (Status /= ?HOST_STATUS_INACTIVE), status = Status}}; + case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of + true -> + {next_state, activated, State#state{status = Status}, [{reply, From, ok}]}; + false -> + {keep_state, State#state{status = Status}, [{reply, From, ok}]} + end; undefined -> lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]), - {reply, {error, <<"host not found">>}, State} + {keep_state, State, [{reply, From, {error, <<"host not found">>}}]} end; -% 处理主机的激活 -handle_call({activate, Auth}, _From, State = #state{uuid = UUID, is_activated = IsActivated}) -> - lager:debug("[iot_host] host uuid: ~p, new activate status: ~p, before status is: ~p", [UUID, Auth, IsActivated]), - %% 关闭授权时,认为会话时关闭状态 - case Auth of - true -> - {reply, ok, State#state{is_activated = true}}; - false -> - {reply, ok, State#state{is_activated = false, has_session = false, pub_key = <<>>}} - end; +%% 关闭授权 +handle_event({call, From}, {activate, false}, _, State) -> + {next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]}; +%% 开启授权 +handle_event({call, From}, {activate, true}, denied, State) -> + {next_state, activated, State, [{reply, From, ok}]}; +handle_event({call, From}, {activate, true}, _, State) -> + {keep_state, State, [{reply, From, ok}]}; -%% 创建命令 -handle_call({aes_encode, _, _}, _From, State = #state{has_session = false}) -> - {reply, {error, <<"会话未建立"/utf8>>}, State}; -handle_call({aes_encode, CommandType, Command}, _From, State = #state{aes = AES, has_session = true}) -> - EncCommand = iot_cipher_aes:encrypt(AES, Command), - - {reply, {ok, <>}, State}; - -handle_call(Info, _From, State) -> - lager:debug("[iot_host] handle info: ~p", [Info]), - - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(need_auth, State = #state{uuid = UUID}) -> +handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) -> Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]), {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1), receive @@ -238,130 +226,86 @@ handle_cast(need_auth, State = #state{uuid = UUID}) -> after 5000 -> lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID]) end, - {noreply, State}; -handle_cast({handle, Payload}, State) -> - %% 处理消息 - NState = handle_message(Payload, State), - {noreply, NState#state{is_answered = true}}. + {keep_state, State}; -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 超时,并且没有收到任何消息 -handle_info({timeout, _, ping_ticker}, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) -> - erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), - %% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status - NextStatus = if - not IsAnswered andalso Status == ?HOST_STATUS_ONLINE -> - {next_status, ?HOST_STATUS_OFFLINE}; - IsAnswered andalso Status == ?HOST_STATUS_OFFLINE -> - {next_status, ?HOST_STATUS_ONLINE}; - true -> - keep_status - end, - - case NextStatus of - keep_status -> - {noreply, State#state{is_answered = false}}; - {next_status, NStatus} -> - case host_bo:change_status(UUID, NStatus) of - {ok, _} -> - {noreply, State#state{status = NStatus, is_answered = false}}; - {error, Reason} -> - lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]), - {noreply, State#state{is_answered = false}} - end - end; - -handle_info(Info, State = #state{uuid = UUID}) -> - lager:warning("[iot_host] host uuid: ~p, get unknown info: ~p", [UUID, Info]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, #state{uuid = UUID}) -> - lager:debug("[iot_host] terminate with reason: ~p", [Reason]), - ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), - lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -%% 基于topic分离了,因此数据里面不需要uuid -handle_message(Payload, State) when is_binary(Payload) -> +%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理 +%% 收到消息则认为主机端已经发送了心跳包 +handle_event(cast, {handle, Payload}, _StateName, State) -> Message = catch jiffy:decode(Payload, [return_maps]), lager:debug("[iot_host] get message: ~p", [Message]), - handle_message(Message, State); + gen_statem:cast(self(), {handle_message, Message}), -%% create_session操作, 需要等待mqtt服务的响应 -handle_message(#{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}, - State = #state{is_activated = IsActivated, uuid = UUID, aes = Aes}) -> + {keep_state, State#state{is_answered = true}}; +handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) -> lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = case IsActivated of - true -> - #{<<"a">> => true, <<"aes">> => Aes}; - false -> - #{<<"a">> => false, <<"aes">> => <<"">>} - end, + Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) + end, + {keep_state, State}; + +handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), receive {ok, Ref, PacketId} -> lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), - State#state{pub_key = PubKey, has_session = true} + {next_state, session, State#state{pub_key = PubKey}} after 10000 -> lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), - State + {keep_state, State} end; -%% 数据上传 -handle_message(#{<<"method">> := <<"data">>, <<"params">> := Data}, State = #state{has_session = true, aes = AES, uuid = UUID}) -> +handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) -> PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)), case catch jiffy:decode(PlainData, [return_maps]) of Infos when is_list(Infos) -> lager:debug("[iot_host] the data is: ~p", [Infos]), - insert_metrics(UUID, Infos); + %% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息 + lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) -> + Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()), + + NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, + %% 微服务名前缀作为measurement来保存数据 + [Measurement | _] = binary:split(ServiceName, <<":">>), + + Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList), + Precision = influx_client:get_precision(Timestamp), + + poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end) + end, Infos); Other -> lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other]) end, - State; + {keep_state, State}; -%% 处理服务器的ping -handle_message(#{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}, State = #state{has_session = true, uuid = UUID, aes = AES}) -> +handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) -> MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)), Metrics = jiffy:decode(MetricsInfo, [return_maps]), lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), - State#state{metrics = Metrics}; + {keep_state, State#state{metrics = Metrics}}; -%% 处理微服务的信息上报 -handle_message(#{<<"method">> := <<"inform">>, <<"params">> := Info0}, State = #state{has_session = true, host_id = HostId, aes = AES}) -> +handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) -> Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), case catch jiffy:decode(Info, [return_maps]) of #{<<"at">> := At, <<"services">> := ServiceInforms} -> lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) -> %% props 主机id:场景id:微服务id - [_, SceneId, MicroId] = binary:split(Props, <<":">>, [global]), + [_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]), + SceneId = binary_to_integer(SceneId0), + MicroId = binary_to_integer(MicroId0), + micro_inform_log:insert(#{ <<"host_id">> => HostId, <<"scene_id">> => SceneId, @@ -376,52 +320,81 @@ handle_message(#{<<"method">> := <<"inform">>, <<"params">> := Info0}, State = # Error -> lager:warning("[iot_host] inform get error: ~p", [Error]) end, - State; + {keep_state, State}; -%% 处理命令的上报结果 -handle_message(#{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}, State = #state{has_session = true, aes = AES}) -> +handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) -> Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), case catch jiffy:decode(Info, [return_maps]) of #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> - scene_feedback:insert(#{ - <<"task_id">> => TaskId, - <<"task_type">> => Type, - <<"code">> => Code, - <<"reason">> => Reason, - <<"error">> => Error, - <<"created_at">> => Time - }); + scene_feedback:insert(#{ + <<"task_id">> => TaskId, + <<"task_type">> => Type, + <<"code">> => Code, + <<"reason">> => Reason, + <<"error">> => Error, + <<"created_at">> => Time + }); Other -> lager:warning("[iot_host] feedback_result error: ~p", [Other]) end, - State; + {keep_state, State}; %% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string} -handle_message(Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}, State = #state{assoc_map = AssocMap}) -> +handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) -> case maps:take(Assoc, AssocMap) of error -> - State; + {keep_state, State}; {ReceiverPid, NAssocMap} -> ReceiverPid ! {host_reply, Assoc, Msg}, - State#state{assoc_map = NAssocMap} + {keep_state, State#state{assoc_map = NAssocMap}} end; -handle_message(Message, State = #state{uuid = UUID, has_session = HasSession}) -> - not HasSession andalso gen_server:cast(self(), need_auth), - lager:warning("[iot_host] host_id uuid: ~p, get a unknown message: ~p, session: ~p", [UUID, Message, HasSession]), - State. +handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) -> + erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), + %% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status + NextStatus = if + not IsAnswered andalso Status == ?HOST_STATUS_ONLINE -> + {change_status, ?HOST_STATUS_OFFLINE}; + IsAnswered andalso Status == ?HOST_STATUS_OFFLINE -> + {change_status, ?HOST_STATUS_ONLINE}; + true -> + keep_status + end, -%% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息 -insert_metrics(UUID, Infos) when is_binary(UUID), is_list(Infos) -> - [insert_metrics0(UUID, Info) || Info <- Infos]. -insert_metrics0(UUID, Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) -> - Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()), + case NextStatus of + keep_status -> + {keep_state, State#state{is_answered = false}}; + {change_status, NStatus} -> + case host_bo:change_status(UUID, NStatus) of + {ok, _} -> + {keep_state, State#state{status = NStatus, is_answered = false}}; + {error, Reason} -> + lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]), + {keep_state, State#state{is_answered = false}} + end + end; - NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, - %% TODO 微服务名前缀作为measurement来保存数据 - [Measurement | _] = binary:split(ServiceName, <<":">>), +handle_event(EventType, EventContent, StateName, State) -> + lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), - Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList), - Precision = influx_client:get_precision(Timestamp), + {keep_state, State}. - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end). \ No newline at end of file +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, _State = #state{uuid = UUID}) -> + lager:debug("[iot_host] terminate with reason: ~p", [Reason]), + ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), + lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/iot_host2.erl b/apps/iot/src/iot_host2.erl deleted file mode 100644 index c33d814..0000000 --- a/apps/iot/src/iot_host2.erl +++ /dev/null @@ -1,400 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 19. 6月 2023 10:32 -%%%------------------------------------------------------------------- --module(iot_host2). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_statem). - -%% API --export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). --export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]). --export([has_session/1]). - -%% gen_statem callbacks --export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). - --define(SERVER, ?MODULE). - -%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行 --define(TICKER_INTERVAL, 5000 * 2). - --record(state, { - host_id :: integer(), - %% 从数据库里面读取到的数据 - uuid :: binary(), - %% 当前的状态 - status :: integer(), - - %% rsa公钥 - pub_key = <<>> :: binary(), - %% aes的key, 后续通讯需要基于这个加密 - aes = <<>> :: binary(), - - %% 主机的相关信息 - metrics = #{} :: map(), - - %% 是否获取到了ping请求 - is_answered = false :: boolean(), - - %% 任务的自增id - increment_id = 1, - %% 关联数据 - assoc_map = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== --spec get_pid(UUID :: binary()) -> undefined | pid(). -get_pid(UUID) when is_binary(UUID) -> - Name = get_name(UUID), - whereis(Name). - --spec get_name(UUID :: binary()) -> atom(). -get_name(UUID) when is_binary(UUID) -> - binary_to_atom(<<"iot_host:", UUID/binary>>). - -%% 获取主机的下行主题 --spec downstream_topic(UUID :: binary()) -> Topic :: binary(). -downstream_topic(UUID) when is_binary(UUID) -> - <<"host/downstream/", UUID/binary>>. - --spec upstream_topic(UUID :: binary()) -> Topic :: binary(). -upstream_topic(UUID) when is_binary(UUID) -> - <<"host/upstream/", UUID/binary>>. - -%% 处理消息 --spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return(). -handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) -> - gen_statem:cast(Pid, {handle, Payload}). - -%% 重新加载主机的基本信息 --spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}. -reload(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, reload). - --spec get_aes(Pid :: pid()) -> Aes :: binary(). -get_aes(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_aes). - --spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}. -make_assoc(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, {make_assoc, self()}). - -%% 激活主机, true 表示激活; false表示关闭激活 --spec activate(Pid :: pid(), Auth :: boolean()) -> ok. -activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> - gen_statem:call(Pid, {activate, Auth}). - --spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}. -get_metric(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_metric). - --spec has_session(Pid :: pid()) -> boolean(). -has_session(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, has_session). - -%% 基于rsa加密的指令都是不需要会话存在的 --spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) -> - {ok, EncText :: binary()} | {error, Reason :: binary()}. -rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) -> - gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}). - --spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) -> - {ok, Command :: binary()} | {error, Reason :: any()}. -aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) -> - gen_statem:call(Pid, {aes_encode, CommandType, Params}). - - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. -start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> - gen_statem:start_link({local, Name}, ?MODULE, [UUID], []). - -%%%=================================================================== -%%% gen_statem callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. -init([UUID]) -> - case host_bo:get_host_by_uuid(UUID) of - {ok, #{<<"status">> := Status, <<"id">> := HostId}} -> - %% 启动心跳定时器 - erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), - Aes = list_to_binary(iot_util:rand_bytes(32)), - %% 告知主机端需要重新授权 - gen_server:cast(self(), need_auth), - - StateName = case Status =:= ?HOST_STATUS_INACTIVE of - true -> denied; - false -> activated - end, - - {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}}; - undefined -> - lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), - ignore - end. - - -%% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. - -%% @private -%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or -%% (2) when gen_statem terminates abnormally. -%% This callback is optional. -format_status(_Opt, [_PDict, _StateName, _State]) -> - Status = some_term, - Status. - -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) -> - {keep_state, State, [{reply, From, {ok, Metrics}}]}; -handle_event({call, From}, get_metric, _, State) -> - {keep_state, State, [{reply, From, {ok, #{}}}]}; - -handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> - {keep_state, State, [{reply, From, {ok, Aes}}]}; - -handle_event({call, From}, has_session, StateName, State) -> - HasSession = (StateName =:= session) orelse false, - {keep_state, State, [{reply, From, HasSession}]}; - -%% 基于rsa加密 -handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) -> - Reply = iot_cipher_rsa:encode(PlainText, PubKey), - {keep_state, State, [{reply, From, {ok, <>}}]}; -handle_event({call, From}, {rsa_encode, _, _}, _, State) -> - {keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]}; - -%% 基于aes加密 -handle_event({call, From}, {aes_encode, CommandType, Command}, session, State = #state{aes = AES}) -> - EncCommand = iot_cipher_aes:encrypt(AES, Command), - {keep_state, State, [{reply, From, {ok, <>}}]}; -handle_event({call, From}, {aes_encode, _, _}, _, State) -> - {keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]}; - -handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) -> - %% 重新加载主机信息 - case host_bo:get_host_by_uuid(UUID) of - {ok, Host = #{<<"status">> := Status}} -> - lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]), - case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of - true -> - {next_state, activated, State#state{status = Status}, [{reply, From, ok}]}; - false -> - {keep_state, State#state{status = Status}, [{reply, From, ok}]} - end; - undefined -> - lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]), - {keep_state, State, [{reply, From, {error, <<"host not found">>}}]} - end; - -%% 关闭授权 -handle_event({call, From}, {activate, false}, _, State) -> - {next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]}; -%% 开启授权 -handle_event({call, From}, {activate, true}, denied, State) -> - {next_state, activated, State, [{reply, From, ok}]}; -handle_event({call, From}, {activate, true}, _, State) -> - {keep_state, State, [{reply, From, ok}]}; - -handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) -> - Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]), - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish need_auth reply success", [UUID, PacketId]) - after 5000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID]) - end, - {keep_state, State}; - -%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理 -%% 收到消息则认为主机端已经发送了心跳包 -handle_event(cast, {handle, Payload}, _StateName, State) -> - Message = catch jiffy:decode(Payload, [return_maps]), - lager:debug("[iot_host] get message: ~p", [Message]), - gen_statem:cast(self(), {handle_message, Message}), - - {keep_state, State#state{is_answered = true}}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) -> - lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) - after 10000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) - end, - {keep_state, State}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> - lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = #{<<"a">> => true, <<"aes">> => Aes}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), - {next_state, session, State#state{pub_key = PubKey}} - after 10000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), - {keep_state, State} - end; - -handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) -> - PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)), - case catch jiffy:decode(PlainData, [return_maps]) of - Infos when is_list(Infos) -> - lager:debug("[iot_host] the data is: ~p", [Infos]), - %% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息 - lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) -> - Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()), - - NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, - %% 微服务名前缀作为measurement来保存数据 - [Measurement | _] = binary:split(ServiceName, <<":">>), - - Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList), - Precision = influx_client:get_precision(Timestamp), - - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end) - end, Infos); - Other -> - lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other]) - end, - {keep_state, State}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) -> - MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)), - Metrics = jiffy:decode(MetricsInfo, [return_maps]), - - lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), - {keep_state, State#state{metrics = Metrics}}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) -> - Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), - case catch jiffy:decode(Info, [return_maps]) of - #{<<"at">> := At, <<"services">> := ServiceInforms} -> - lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) -> - %% props 主机id:场景id:微服务id - [_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]), - SceneId = binary_to_integer(SceneId0), - MicroId = binary_to_integer(MicroId0), - - micro_inform_log:insert(#{ - <<"host_id">> => HostId, - <<"scene_id">> => SceneId, - <<"service_name">> => Name, - <<"version">> => Version, - <<"version_copy">> => VersionCopy, - <<"status">> => Status, - <<"created_at">> => At - }), - micro_set_bo:change_status(HostId, SceneId, MicroId, Status) - end, ServiceInforms); - Error -> - lager:warning("[iot_host] inform get error: ~p", [Error]) - end, - {keep_state, State}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) -> - Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), - case catch jiffy:decode(Info, [return_maps]) of - #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> - scene_feedback:insert(#{ - <<"task_id">> => TaskId, - <<"task_type">> => Type, - <<"code">> => Code, - <<"reason">> => Reason, - <<"error">> => Error, - <<"created_at">> => Time - }); - Other -> - lager:warning("[iot_host] feedback_result error: ~p", [Other]) - end, - {keep_state, State}; - -%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string} -handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) -> - case maps:take(Assoc, AssocMap) of - error -> - {keep_state, State}; - {ReceiverPid, NAssocMap} -> - ReceiverPid ! {host_reply, Assoc, Msg}, - {keep_state, State#state{assoc_map = NAssocMap}} - end; - -handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) -> - erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), - %% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status - NextStatus = if - not IsAnswered andalso Status == ?HOST_STATUS_ONLINE -> - {change_status, ?HOST_STATUS_OFFLINE}; - IsAnswered andalso Status == ?HOST_STATUS_OFFLINE -> - {change_status, ?HOST_STATUS_ONLINE}; - true -> - keep_status - end, - - case NextStatus of - keep_status -> - {keep_state, State#state{is_answered = false}}; - {change_status, NStatus} -> - case host_bo:change_status(UUID, NStatus) of - {ok, _} -> - {keep_state, State#state{status = NStatus, is_answered = false}}; - {error, Reason} -> - lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]), - {keep_state, State#state{is_answered = false}} - end - end; - -handle_event(EventType, EventContent, StateName, State) -> - lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), - - {keep_state, State}. - -%% @private -%% @doc This function is called by a gen_statem when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, _StateName, _State = #state{uuid = UUID}) -> - lager:debug("[iot_host] terminate with reason: ~p", [Reason]), - ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), - lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]), - ok. - -%% @private -%% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== \ No newline at end of file