diff --git a/apps/iot/src/iot_host2.erl b/apps/iot/src/iot_host2.erl new file mode 100644 index 0000000..34b5b90 --- /dev/null +++ b/apps/iot/src/iot_host2.erl @@ -0,0 +1,397 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 19. 6月 2023 10:32 +%%%------------------------------------------------------------------- +-module(iot_host2). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_statem). + +%% API +-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). +-export([get_metric/1, aes_encode/3, downstream_topic/1, upstream_topic/1, get_aes/1, make_assoc/1, rsa_encode/3]). +-export([has_session/1]). + +%% gen_statem callbacks +-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +-define(SERVER, ?MODULE). + +%% 心跳包的间隔周期, 值需要比host上传的间隔大一些才行 +-define(TICKER_INTERVAL, 5000 * 2). + +-record(state, { + host_id :: integer(), + %% 从数据库里面读取到的数据 + uuid :: binary(), + %% 当前的状态 + status :: integer(), + + %% rsa公钥 + pub_key = <<>> :: binary(), + %% aes的key, 后续通讯需要基于这个加密 + aes = <<>> :: binary(), + + %% 主机的相关信息 + metrics = #{} :: map(), + + %% 是否获取到了ping请求 + is_answered = false :: boolean(), + + %% 任务的自增id + increment_id = 1, + %% 关联数据 + assoc_map = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec get_pid(UUID :: binary()) -> undefined | pid(). +get_pid(UUID) when is_binary(UUID) -> + Name = get_name(UUID), + whereis(Name). + +-spec get_name(UUID :: binary()) -> atom(). +get_name(UUID) when is_binary(UUID) -> + binary_to_atom(<<"iot_host:", UUID/binary>>). + +%% 获取主机的下行主题 +-spec downstream_topic(UUID :: binary()) -> Topic :: binary(). +downstream_topic(UUID) when is_binary(UUID) -> + <<"host/downstream/", UUID/binary>>. + +-spec upstream_topic(UUID :: binary()) -> Topic :: binary(). +upstream_topic(UUID) when is_binary(UUID) -> + <<"host/upstream/", UUID/binary>>. + +%% 处理消息 +-spec handle(Pid :: pid(), Payload :: binary() | map()) -> no_return(). +handle(Pid, Payload) when is_pid(Pid), is_binary(Payload); is_map(Payload) -> + gen_statem:cast(Pid, {handle, Payload}). + +%% 重新加载主机的基本信息 +-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}. +reload(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, reload). + +-spec get_aes(Pid :: pid()) -> Aes :: binary(). +get_aes(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_aes). + +-spec make_assoc(Pid :: pid()) -> {ok, Assoc :: binary()}. +make_assoc(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, {make_assoc, self()}). + +%% 激活主机, true 表示激活; false表示关闭激活 +-spec activate(Pid :: pid(), Auth :: boolean()) -> ok. +activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> + gen_statem:call(Pid, {activate, Auth}). + +-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}. +get_metric(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_metric). + +-spec has_session(Pid :: pid()) -> boolean(). +has_session(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, has_session). + +%% 基于rsa加密的指令都是不需要会话存在的 +-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) -> + {ok, EncText :: binary()} | {error, Reason :: binary()}. +rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) -> + gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}). + +-spec aes_encode(Pid :: pid(), CommandType :: integer(), Params :: binary()) -> + {ok, Command :: binary()} | {error, Reason :: any()}. +aes_encode(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType), is_binary(Params) -> + gen_statem:call(Pid, {aes_encode, CommandType, Params}). + + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> + gen_statem:start_link({local, Name}, ?MODULE, [UUID], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([UUID]) -> + case host_bo:get_host_by_uuid(UUID) of + {ok, #{<<"status">> := Status, <<"id">> := HostId}} -> + %% 启动心跳定时器 + erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), + Aes = list_to_binary(iot_util:rand_bytes(32)), + %% 告知主机端需要重新授权 + gen_server:cast(self(), need_auth), + + StateName = case Status =:= ?HOST_STATUS_INACTIVE of + true -> denied; + false -> activated + end, + + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}}; + undefined -> + lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), + ignore + end. + + +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or +%% (2) when gen_statem terminates abnormally. +%% This callback is optional. +format_status(_Opt, [_PDict, _StateName, _State]) -> + Status = some_term, + Status. + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) -> + {keep_state, State, [{reply, From, {ok, Metrics}}]}; +handle_event({call, From}, get_metric, _, State) -> + {keep_state, State, [{reply, From, {ok, #{}}}]}; + +handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> + {keep_state, State, [{reply, From, {ok, Aes}}]}; + +handle_event({call, From}, has_session, StateName, State) -> + HasSession = (StateName =:= session) orelse false, + {keep_state, State, [{reply, From, HasSession}]}; + +%% 基于rsa加密 +handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) -> + Reply = iot_cipher_rsa:encode(PlainText, PubKey), + {keep_state, State, [{reply, From, {ok, <>}}]}; +handle_event({call, From}, {rsa_encode, _, _}, _, State) -> + {keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]}; + +%% 基于aes加密 +handle_event({call, From}, {aes_encode, CommandType, Command}, session, State = #state{aes = AES}) -> + EncCommand = iot_cipher_aes:encrypt(AES, Command), + {keep_state, State, [{reply, From, {ok, <>}}]}; +handle_event({call, From}, {aes_encode, _, _}, _, State) -> + {keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]}; + +handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) -> + %% 重新加载主机信息 + case host_bo:get_host_by_uuid(UUID) of + {ok, Host = #{<<"status">> := Status}} -> + lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]), + case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of + true -> + {next_state, activated, State#state{status = Status}, [{reply, From, ok}]}; + false -> + {keep_state, State#state{status = Status}, [{reply, From, ok}]} + end; + undefined -> + lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]), + {keep_state, State, [{reply, From, {error, <<"host not found">>}}]} + end; + +%% 关闭授权 +handle_event({call, From}, {activate, false}, _, State) -> + {next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]}; +%% 开启授权 +handle_event({call, From}, {activate, true}, denied, State) -> + {next_state, activated, State, [{reply, From, ok}]}; +handle_event({call, From}, {activate, true}, _, State) -> + {keep_state, State, [{reply, From, ok}]}; + +handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) -> + Reply = jiffy:encode(#{<<"auth">> => false, <<"aes">> => <<"">>}, [force_utf8]), + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<8:8, Reply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish need_auth reply success", [UUID, PacketId]) + after 5000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish need_auth reply get error is: timeout", [UUID]) + end, + {keep_state, State}; + +%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理 +%% 收到消息则认为主机端已经发送了心跳包 +handle_event(cast, {handle, Payload}, _StateName, State) -> + Message = catch jiffy:decode(Payload, [return_maps]), + lager:debug("[iot_host] get message: ~p", [Message]), + gen_statem:cast(self(), {handle_message, Message}), + + {keep_state, State#state{is_answered = true}}; + +handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) + end, + {keep_state, State}; + +handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => true, <<"aes">> => Aes}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), + {next_state, session, State#state{pub_key = PubKey}} + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), + {keep_state, State} + end; + +handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) -> + PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)), + case catch jiffy:decode(PlainData, [return_maps]) of + Infos when is_list(Infos) -> + lager:debug("[iot_host] the data is: ~p", [Infos]), + %% 记录数据, TODO 转换点位信息, Fields里面包含了 <<"device_id">> 信息 + lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) -> + Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()), + + NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, + %% 微服务名前缀作为measurement来保存数据 + [Measurement | _] = binary:split(ServiceName, <<":">>), + + Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList), + Precision = influx_client:get_precision(Timestamp), + + poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end) + end, Infos); + Other -> + lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other]) + end, + {keep_state, State}; + +handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) -> + MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)), + Metrics = jiffy:decode(MetricsInfo, [return_maps]), + + lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), + {keep_state, State#state{metrics = Metrics}}; + +handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) -> + Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), + case catch jiffy:decode(Info, [return_maps]) of + #{<<"at">> := At, <<"services">> := ServiceInforms} -> + lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) -> + %% props 主机id:场景id:微服务id + [_, SceneId, MicroId] = binary:split(Props, <<":">>, [global]), + micro_inform_log:insert(#{ + <<"host_id">> => HostId, + <<"scene_id">> => SceneId, + <<"service_name">> => Name, + <<"version">> => Version, + <<"version_copy">> => VersionCopy, + <<"status">> => Status, + <<"created_at">> => At + }), + micro_set_bo:change_status(HostId, SceneId, MicroId, Status) + end, ServiceInforms); + Error -> + lager:warning("[iot_host] inform get error: ~p", [Error]) + end, + {keep_state, State}; + +handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) -> + Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), + case catch jiffy:decode(Info, [return_maps]) of + #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> + scene_feedback:insert(#{ + <<"task_id">> => TaskId, + <<"task_type">> => Type, + <<"code">> => Code, + <<"reason">> => Reason, + <<"error">> => Error, + <<"created_at">> => Time + }); + Other -> + lager:warning("[iot_host] feedback_result error: ~p", [Other]) + end, + {keep_state, State}; + +%% 处理客户端激活的响应, 完整格式为: {"code": 0|1, "message": "", "assoc": string} +handle_event(cast, {handle_message, Msg = #{<<"code">> := _Code, <<"assoc">> := Assoc}}, _, State = #state{assoc_map = AssocMap}) -> + case maps:take(Assoc, AssocMap) of + error -> + {keep_state, State}; + {ReceiverPid, NAssocMap} -> + ReceiverPid ! {host_reply, Assoc, Msg}, + {keep_state, State#state{assoc_map = NAssocMap}} + end; + +handle_event(info, {timeout, _, ping_ticker}, _StateName, State = #state{uuid = UUID, is_answered = IsAnswered, status = Status}) -> + erlang:start_timer(?TICKER_INTERVAL, self(), ping_ticker), + %% 需要考虑到主机未激活的情况,主机未激活,返回: keep_status + NextStatus = if + not IsAnswered andalso Status == ?HOST_STATUS_ONLINE -> + {change_status, ?HOST_STATUS_OFFLINE}; + IsAnswered andalso Status == ?HOST_STATUS_OFFLINE -> + {change_status, ?HOST_STATUS_ONLINE}; + true -> + keep_status + end, + + case NextStatus of + keep_status -> + {keep_state, State#state{is_answered = false}}; + {change_status, NStatus} -> + case host_bo:change_status(UUID, NStatus) of + {ok, _} -> + {keep_state, State#state{status = NStatus, is_answered = false}}; + {error, Reason} -> + lager:warning("[iot_host] change host status of uuid: ~p, error: ~p", [UUID, Reason]), + {keep_state, State#state{is_answered = false}} + end + end; + +handle_event(EventType, EventContent, StateName, State) -> + lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), + + {keep_state, State}. + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, _State = #state{uuid = UUID}) -> + lager:debug("[iot_host] terminate with reason: ~p", [Reason]), + ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE), + lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file