%%%------------------------------------------------------------------- %%% @author %%% @copyright (C) 2023, %%% @doc %%% %%% @end %%% Created : 22. 9月 2023 16:38 %%%------------------------------------------------------------------- -module(iot_host). -author("aresei"). -include("iot.hrl"). -include("message.hrl"). -behaviour(gen_statem). %% 心跳包检测时间间隔, 15分钟检测一次 -define(HEARTBEAT_INTERVAL, 900 * 1000). %% 状态 -define(STATE_DENIED, denied). -define(STATE_ACTIVATED, activated). %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). -export([get_metric/1, get_status/1, kill/1]). %% 通讯相关 -export([pub/3, attach_channel/2, command/3]). -export([deploy_container/3, start_container/2, stop_container/2, remove_container/2, kill_container/2, config_container/3, get_containers/1, await_reply/2]). -export([heartbeat/1]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). -record(state, { host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), has_session = false :: boolean(), %% 心跳计数器 heartbeat_counter = 0 :: integer(), %% websocket相关 channel_pid :: undefined | pid(), %% 主机的相关信息 metrics = #{} :: map() }). %%%=================================================================== %%% API %%%=================================================================== -spec get_pid(UUID :: binary()) -> undefined | pid(). get_pid(UUID) when is_binary(UUID) -> Name = get_name(UUID), whereis(Name). -spec get_name(UUID :: binary()) -> atom(). get_name(UUID) when is_binary(UUID) -> binary_to_atom(<<"iot_host:", UUID/binary>>). -spec get_alias_name(HostId :: integer()) -> atom(). get_alias_name(HostId0) when is_integer(HostId0) -> HostId = integer_to_binary(HostId0), binary_to_atom(<<"iot_host_id:", HostId/binary>>). -spec kill(UUID :: binary()) -> no_return(). kill(UUID) when is_binary(UUID) -> case whereis(get_name(UUID)) of undefined -> ok; Pid -> exit(Pid, kill) end. %% 处理消息 -spec handle(Pid :: pid(), Packet :: {atom(), any()}) -> no_return(). handle(Pid, Packet) when is_pid(Pid) -> gen_statem:cast(Pid, {handle, Packet}). -spec get_status(Pid :: pid()) -> {ok, Status :: map()}. get_status(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_status). %% 激活主机, true 表示激活; false表示关闭激活 -spec activate(Pid :: pid(), Auth :: boolean()) -> ok. activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> gen_statem:call(Pid, {activate, Auth}). -spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}. get_metric(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_metric). -spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()} | {denied, Reason :: binary()}. attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> gen_statem:call(Pid, {attach_channel, ChannelPid}). -spec get_containers(Pid :: pid()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. get_containers(Pid) when is_pid(Pid) -> Request = #jsonrpc_request{method = <<"get_containers">>, params = #{}}, EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}). -spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) -> Request = #jsonrpc_request{method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}}, EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}). -spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: map()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_map(Config) -> Request = #jsonrpc_request{method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}}, EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncDeployBin}). -spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> Request = #jsonrpc_request{method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}}, EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}). -spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> Request = #jsonrpc_request{method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}}, EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}). -spec kill_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. kill_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> Request = #jsonrpc_request{method = <<"kill_container">>, params = #{<<"container_name">> => ContainerName}}, EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}). -spec remove_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. remove_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> Request = #jsonrpc_request{method = <<"remove_container">>, params = #{<<"container_name">> => ContainerName}}, EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}). -spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}. await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> receive {jsonrpc_reply, Ref, #jsonrpc_reply{result = Result, error = undefined}} -> {ok, Result}; {jsonrpc_reply, Ref, #jsonrpc_reply{result = undefined, error = #{<<"message">> := Message}}} -> {error, Message} after Timeout -> {error, <<"timeout">>} end. -spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}. pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> gen_statem:call(Pid, {pub, Topic, Content}). -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}. command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) -> gen_statem:call(Pid, {command, CommandType, Command}). -spec heartbeat(Pid :: pid()) -> no_return(). heartbeat(undefined) -> ok; heartbeat(Pid) when is_pid(Pid) -> gen_statem:cast(Pid, heartbeat). %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> gen_statem:start_link({local, Name}, ?MODULE, [UUID], []). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== %% @private %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([UUID]) -> case iot_api:get_host_by_uuid(UUID) of {ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} -> %% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid AliasName = get_alias_name(HostId), global:register_name(AliasName, self()), %% 心跳检测机制 erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), StateName = case AuthorizeStatus =:= 1 of true -> ?STATE_ACTIVATED; false -> ?STATE_DENIED end, {ok, StateName, #state{host_id = HostId, uuid = UUID, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore end. %% @private %% @doc This function is called by a gen_statem when it needs to find out %% the callback mode of the callback module. callback_mode() -> handle_event_function. %% @private %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal %% process message, this function is called. handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) -> {keep_state, State, [{reply, From, {ok, Metrics}}]}; %% 获取主机的状态 handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> HasChannel = (ChannelPid /= undefined), Reply = #{ <<"has_channel">> => HasChannel, <<"has_session">> => HasSession, <<"heartbeat_counter">> => HeartbeatCounter, <<"metrics">> => Metrics }, {keep_state, State, [{reply, From, {ok, Reply}}]}; %% 只要channel存在,就负责将消息推送到边缘端主机 handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> %% 通过websocket发送请求 Ref = tcp_channel:jsonrpc_call(ChannelPid, ReceiverPid, RpcCall), {keep_state, State, [{reply, From, {ok, Ref}}]}; false -> lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送请求失败"/utf8>>}}]} end; %% 发送指令时, pub/sub handle_event({call, From}, {pub, Topic, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]), %% 通过websocket发送请求 tcp_channel:pub(ChannelPid, Topic, Content), {keep_state, State, [{reply, From, ok}]}; false -> lager:debug("[iot_host] uuid: ~p, publish to topic: ~p, content: ~p, invalid state: ~p", [UUID, Topic, Content, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送失败"/utf8>>}}]} end; %% 发送指令时 handle_event({call, From}, {command, CommandType, Command}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> lager:debug("[iot_host] host: ~p, command_type: ~p, command: ~p", [UUID, CommandType, Command]), %% 通过websocket发送请求 tcp_channel:command(ChannelPid, CommandType, Command), {keep_state, State, [{reply, From, ok}]}; false -> lager:debug("[iot_host] host: ~p, command_type: ~p, command: ~p, invalid state: ~p", [UUID, CommandType, Command, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]} end; %% 激活主机 handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) -> case is_pid(ChannelPid) of true -> lager:debug("[iot_host] uuid: ~p, activate: true", [UUID]), tcp_channel:command(ChannelPid, ?COMMAND_AUTH, <<1:8>>); false -> lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]) end, {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; %% 关闭授权 handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) -> case is_pid(ChannelPid) of true -> tcp_channel:command(ChannelPid, ?COMMAND_AUTH, <<0:8>>), lager:debug("[iot_host] uuid: ~p, activate: false", [UUID]), tcp_channel:stop(ChannelPid, closed); false -> lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]) end, {next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]}; %% 绑定channel handle_event({call, From}, {attach_channel, ChannelPid}, StateName, State = #state{uuid = UUID, channel_pid = undefined}) -> case StateName of ?STATE_ACTIVATED -> erlang:monitor(process, ChannelPid), %% 更新主机为在线状态 ChangeResult = iot_api:change_host_status(UUID, ?HOST_ONLINE), lager:debug("[iot_host] host_id(attach_channel) uuid: ~p, will change status, result: ~p", [UUID, ChangeResult]), {keep_state, State#state{channel_pid = ChannelPid, has_session = true}, [{reply, From, ok}]}; %% 主机未激活 ?STATE_DENIED -> lager:notice("[iot_host] attach_channel host_id uuid: ~p, channel: ~p, host inactivated", [UUID, ChannelPid]), erlang:monitor(process, ChannelPid), {keep_state, State#state{channel_pid = ChannelPid}, [{reply, From, {denied, <<"host inactivated">>}}]} end; %% 已经绑定了channel handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]), {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; %% 数据分发 handle_event(cast, {handle, {data, #data{route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> lager:debug("[iot_host] metric_data host: ~p, route_key: ~p, metric: ~p", [UUID, RouteKey0, Metric]), RouteKey = get_route_key(RouteKey0), endpoint_subscription:publish(RouteKey, Metric), {keep_state, State}; %% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), {keep_state, State#state{metrics = Metrics}}; %% 心跳机制 handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) -> {keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}}; %% 没有收到心跳包,主机下线, 设备状态不变 handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID, heartbeat_counter = 0, channel_pid = ChannelPid}) -> lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]), {ok, #{<<"status">> := Status}} = iot_api:get_host_by_uuid(UUID), case Status of ?HOST_NOT_JOINED -> lager:debug("[iot_host] host: ~p, host_maybe_offline, host not joined, can not change to offline", [UUID]); ?HOST_OFFLINE -> lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]); ?HOST_ONLINE -> iot_api:change_host_status(UUID, ?HOST_OFFLINE) end, %% 关闭channel,主机需要重新连接,才能保存状态的一致 is_pid(ChannelPid) andalso tcp_channel:stop(ChannelPid, closed), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), {keep_state, State#state{channel_pid = undefined, has_session = false, heartbeat_counter = 0}}; %% 其他情况下需要重置系统计数器 handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{}) -> erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), {keep_state, State#state{heartbeat_counter = 0}}; %% 当websocket断开的时候,主机的状态不一定改变;主机的状态改变通过心跳机制,会话状态需要改变 handle_event(info, {'DOWN', _Ref, process, ChannelPid, Reason}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]), {keep_state, State#state{channel_pid = undefined, has_session = false}}; handle_event(info, {'DOWN', _Ref, process, Pid, Reason}, _, State = #state{uuid = UUID}) -> lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]), {keep_state, State}; handle_event(Event, Info, StateName, State = #state{uuid = UUID}) -> lager:warning("[iot_host] host: ~p, event: ~p, unknown message: ~p, state_name: ~p, state: ~p", [UUID, Event, Info, StateName, state_map(State)]), {keep_state, State}. %% @private %% @doc This function is called by a gen_statem when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. terminate(Reason, _StateName, _State = #state{uuid = UUID, has_session = HasSession}) -> lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]), ok. %% @private %% @doc Convert process state when code is changed code_change(_OldVsn, StateName, State = #state{}, _Extra) -> {ok, StateName, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== -spec get_route_key(binary()) -> binary(). get_route_key(<<"">>) -> <<"/">>; get_route_key(RouteKey) when is_binary(RouteKey) -> RouteKey. %% 将当前的state转换成map state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) -> #{ host_id => HostId, uuid => UUID, has_session => HasSession, heartbeat_counter => HeartbeatCounter, channel_pid => ChannelPid, metrics => Metrics }.