diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 270e404..12dd17c 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -10,14 +10,10 @@ -author("aresei"). -include("iot.hrl"). --behaviour(gen_statem). +-behaviour(gen_server). -%% 主机状态 --define(STATE_DENIED, denied). --define(STATE_SESSION, session). - -%% 心跳包检测时间间隔, 2分钟检测一次 --define(HEARTBEAT_INTERVAL, 120 * 1000). +%% 心跳包检测时间间隔, 15分钟检测一次 +-define(HEARTBEAT_INTERVAL, 900 * 1000). %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). @@ -26,8 +22,8 @@ -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { host_id :: integer(), @@ -35,6 +31,7 @@ uuid :: binary(), %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), + has_session = false :: boolean(), %% 心跳计数器 heartbeat_counter = 0 :: integer(), @@ -68,38 +65,38 @@ get_alias_name(HostId0) when is_integer(HostId0) -> %% 处理消息 -spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return(). handle(Pid, Packet) when is_pid(Pid) -> - gen_statem:cast(Pid, {handle, Packet}). + gen_server:cast(Pid, {handle, Packet}). -spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}. get_aes(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_aes). + gen_server:call(Pid, get_aes). -spec get_status(Pid :: pid()) -> {ok, Status :: map()}. get_status(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_status). + gen_server:call(Pid, get_status). %% 激活主机, true 表示激活; false表示关闭激活 -spec activate(Pid :: pid(), Auth :: boolean()) -> ok. activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> - gen_statem:call(Pid, {activate, Auth}). + gen_server:call(Pid, {activate, Auth}). -spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}. get_metric(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_metric). + gen_server:call(Pid, get_metric). -spec attach_channel(pid(), pid()) -> ok. attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> - gen_statem:call(Pid, {attach_channel, ChannelPid}). + gen_server:call(Pid, {attach_channel, ChannelPid}). -spec create_session(Pid :: pid(), PubKey :: binary()) -> {ok, Reply :: binary()}. create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) -> - gen_statem:call(Pid, {create_session, PubKey}). + gen_server:call(Pid, {create_session, PubKey}). %% 这里占用的的调用进程的时间 -spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) -> ok | {ok, Response :: binary()} | {error, Reason :: any()}. publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) -> - case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of + case gen_server:call(Pid, {publish_message, self(), CommandType, Params}) of {ok, Ref} -> receive {ws_response, Ref} -> @@ -117,36 +114,37 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer( -spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_statem:call(Pid, {reload_device, DeviceUUID}). + gen_server:call(Pid, {reload_device, DeviceUUID}). -spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok. delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_statem:call(Pid, {delete_device, DeviceUUID}). + gen_server:call(Pid, {delete_device, DeviceUUID}). -spec activate_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}. activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) -> - gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}). + gen_server:call(Pid, {activate_device, DeviceUUID, Auth}). -spec heartbeat(Pid :: pid()) -> no_return(). heartbeat(undefined) -> ok; heartbeat(Pid) when is_pid(Pid) -> - gen_statem:cast(Pid, heartbeat). + gen_server:cast(Pid, heartbeat). %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> - gen_statem:start_link({local, Name}, ?MODULE, [UUID], []). + gen_server:start_link({local, Name}, ?MODULE, [UUID], []). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== %% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). init([UUID]) -> {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), case host_bo:get_host_by_uuid(UUID) of @@ -163,30 +161,30 @@ init([UUID]) -> %% 心跳检测机制 erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), - {ok, ?STATE_DENIED, #state{host_id = HostId, uuid = UUID, aes = Aes}}; + {ok, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore end. %% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_metric, _From, State = #state{metrics = Metrics}) -> + {reply, {ok, Metrics}, State}; -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) -> - {keep_state, State, [{reply, From, {ok, Metrics}}]}; - -handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> - {keep_state, State, [{reply, From, {ok, Aes}}]}; +handle_call(get_aes, _From, State = #state{aes = Aes}) -> + {reply, {ok, Aes}, State}; %% 获取主机的状态 -handle_event({call, From}, get_status, StateName, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics}) -> +handle_call(get_status, _From, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> %% 启动主机相关的devices {ok, Devices} = device_bo:get_host_devices(HostId), DeviceInfos = lists:map(fun(DeviceUUID) -> @@ -199,44 +197,43 @@ handle_event({call, From}, get_status, StateName, State = #state{host_id = HostI end end, Devices), - Status = if StateName == ?STATE_SESSION -> <<"session">>; true -> <<"denied">> end, HasChannel = (ChannelPid /= undefined), Reply = #{ - <<"status">> => Status, <<"has_channel">> => HasChannel, + <<"has_session">> => HasSession, <<"heartbeat_counter">> => HeartbeatCounter, <<"metrics">> => Metrics, <<"device_infos">> => DeviceInfos }, - {keep_state, State, [{reply, From, {ok, Reply}}]}; + {reply, {ok, Reply}, State}; %% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的 -handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_SESSION, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid}) -> +handle_call({publish_message, ReceiverPid, CommandType, {aes, Command0}}, _From, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]), Command = iot_cipher_aes:encrypt(AES, Command0), %% 通过websocket发送请求 Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), - {keep_state, State, [{reply, From, {ok, Ref}}]}; + {reply, {ok, Ref}, State}; %% 只要channel存在,就负责将消息推送到边缘端主机 -handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> +handle_call({publish_message, ReceiverPid, CommandType, Command}, _From, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> %% 通过websocket发送请求 lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]), Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), - {keep_state, State, [{reply, From, {ok, Ref}}]}; + {reply, {ok, Ref}, State}; -handle_event({call, From}, {publish_message, _, _, _}, _, State) -> - {keep_state, State, [{reply, From, {error, <<"主机状态错误,发送命令失败"/utf8>>}}]}; +handle_call({publish_message, _, _, _}, _From, State) -> + {reply, {error, <<"主机状态错误,发送命令失败"/utf8>>}, State}; %% 关闭授权 -handle_event({call, From}, {activate, _}, ?STATE_DENIED, State) -> - {keep_state, State, [{reply, From, ok}]}; +handle_call({activate, _}, _From, State = #state{has_session = false}) -> + {reply, ok, State}; -handle_event({call, From}, {activate, true}, ?STATE_SESSION, State) -> - {keep_state, State, [{reply, From, ok}]}; -handle_event({call, From}, {activate, false}, ?STATE_SESSION, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid}) -> +handle_call({activate, true}, _From, State = #state{has_session = true}) -> + {reply, ok, State}; +handle_call({activate, false}, _From, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid, has_session = true}) -> %% 取消之前的monitor erlang:demonitor(MRef), ws_channel:stop(ChannelPid, closed), @@ -244,75 +241,79 @@ handle_event({call, From}, {activate, false}, ?STATE_SESSION, State = #state{hos {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), change_devices_status(HostId, ?DEVICE_UNKNOWN), - {next_state, ?STATE_DENIED, State#state{monitor_ref = undefined, channel_pid = undefined}, [{reply, From, ok}]}; + {reply, ok, State#state{monitor_ref = undefined, channel_pid = undefined, has_session = false}}; %% 绑定channel -handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> +handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = undefined}) -> lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]), MRef = erlang:monitor(process, ChannelPid), - {keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]}; + {reply, ok, State#state{channel_pid = ChannelPid, monitor_ref = MRef}}; -handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, monitor_ref = MRef0, channel_pid = ChannelPid0}) when is_pid(ChannelPid0) -> - lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel: ~p replace with: ~p", [UUID, ChannelPid0, ChannelPid]), +handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, monitor_ref = OldMRef, channel_pid = OldChannelPid}) when is_pid(OldChannelPid) -> + lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel: ~p replace with: ~p", [UUID, OldChannelPid, ChannelPid]), %% 取消之前的monitor - erlang:demonitor(MRef0), - ws_channel:stop(ChannelPid0, closed), + erlang:demonitor(OldMRef), + ws_channel:stop(OldChannelPid, closed), %% 建立到新的channel的monitor MRef = erlang:monitor(process, ChannelPid), - {keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]}; + {reply, ok, State#state{channel_pid = ChannelPid, monitor_ref = MRef}}; %% 授权通过后,才能将主机的状态设置为在线状态 -handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UUID, aes = Aes}) -> +handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = Aes}) -> {ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID), case AuthorizeStatus =:= 1 of true -> Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), - lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [?STATE_SESSION, UUID, AffectedRow]), + lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), - {next_state, ?STATE_SESSION, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; + {reply, {ok, <<10:8, EncReply/binary>>}, State#state{has_session = true}}; false -> lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {next_state, ?STATE_DENIED, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]} + {reply, {ok, <<10:8, EncReply/binary>>}, State#state{has_session = false}} end; %% 重新加载设备信息 -handle_event({call, From}, {reload_device, DeviceUUID}, _, State) -> +handle_call({reload_device, DeviceUUID}, _From, State) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:reload(DevicePid), - {keep_state, State, [{reply, From, ok}]}; + {reply, ok, State}; {error, Reason} -> - {keep_state, State, [{reply, From, {error, Reason}}]} + {reply, {error, Reason}, State} end; %% 删除设备 -handle_event({call, From}, {delete_device, DeviceUUID}, _, State) -> +handle_call({delete_device, DeviceUUID}, _From, State) -> case iot_device:get_pid(DeviceUUID) of undefined -> ok; DevicePid when is_pid(DevicePid) -> iot_device_sup:delete_device(DeviceUUID) end, - {keep_state, State, [{reply, From, ok}]}; + {reply, ok, State}; %% 激活设备 -handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) -> +handle_call({activate_device, DeviceUUID, Auth}, _From, State) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:auth(DevicePid, Auth), - {keep_state, State, [{reply, From, ok}]}; + {reply, ok, State}; {error, Reason} -> - {keep_state, State, [{reply, From, {error, Reason}}]} - end; + {reply, {error, Reason}, State} + end. +-spec handle_cast(Request :: term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}. %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 -handle_event(cast, {handle, {data, Data}}, ?STATE_SESSION, State = #state{aes = AES}) -> +handle_cast({handle, {data, Data}}, State = #state{aes = AES, has_session = true}) -> PlainData = iot_cipher_aes:decrypt(AES, Data), case catch jiffy:decode(PlainData, [return_maps]) of Info when is_map(Info) -> @@ -320,25 +321,25 @@ handle_event(cast, {handle, {data, Data}}, ?STATE_SESSION, State = #state{aes = Other -> lager:debug("[iot_host] the data is invalid json: ~p", [Other]) end, - {keep_state, State}; + {noreply, State}; %% 其他情况丢弃数据 -handle_event(cast, {handle, {data, _}}, _, State) -> - {keep_state, State}; +handle_cast({handle, {data, _}}, State = #state{has_session = false}) -> + {noreply, State}; %% 任意状态下都可以ping -handle_event(cast, {handle, {ping, CipherMetric}}, _, State = #state{uuid = UUID, aes = AES}) -> +handle_cast({handle, {ping, CipherMetric}}, State = #state{uuid = UUID, aes = AES}) -> MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), case catch jiffy:decode(MetricsInfo, [return_maps]) of Metrics when is_map(Metrics) -> lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), - {keep_state, State#state{metrics = Metrics}}; + {noreply, State#state{metrics = Metrics}}; Other -> lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]), - {keep_state, State} + {noreply, State} end; -handle_event(cast, {handle, {inform, Info0}}, ?STATE_SESSION, State = #state{uuid = UUID, host_id = HostId, aes = AES}) -> +handle_cast({handle, {inform, Info0}}, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"at">> := At, <<"services">> := ServiceInforms} -> @@ -363,9 +364,9 @@ handle_event(cast, {handle, {inform, Info0}}, ?STATE_SESSION, State = #state{uui Error -> lager:warning("[iot_host] inform get error: ~p", [Error]) end, - {keep_state, State}; + {noreply, State}; -handle_event(cast, {handle, {feedback_step, Info0}}, ?STATE_SESSION, State = #state{aes = AES}) -> +handle_cast({handle, {feedback_step, Info0}}, State = #state{aes = AES, has_session = true}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of Data = #{<<"task_id">> := TaskId, <<"code">> := Code} -> @@ -378,9 +379,9 @@ handle_event(cast, {handle, {feedback_step, Info0}}, ?STATE_SESSION, State = #st Other -> lager:warning("[iot_host] feedback_step error: ~p", [Other]) end, - {keep_state, State}; + {noreply, State}; -handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_SESSION, State = #state{aes = AES}) -> +handle_cast({handle, {feedback_result, Info0}}, State = #state{aes = AES, has_session = true}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> @@ -395,9 +396,9 @@ handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_SESSION, State = # Other -> lager:warning("[iot_host] feedback_result error: ~p", [Other]) end, - {keep_state, State}; + {noreply, State}; -handle_event(cast, {handle, {event, Event0}}, ?STATE_SESSION, State = #state{uuid = UUID, aes = AES}) -> +handle_cast({handle, {event, Event0}}, State = #state{uuid = UUID, aes = AES, has_session = true}) -> EventText = iot_cipher_aes:decrypt(AES, Event0), lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, EventText]), case catch jiffy:decode(EventText, [return_maps]) of @@ -405,55 +406,58 @@ handle_event(cast, {handle, {event, Event0}}, ?STATE_SESSION, State = #state{uui DevicePid = iot_device:get_pid(DeviceUUID), iot_device:change_status(DevicePid, Status); Event when is_map(Event) -> - lager:warning("[iot_host] event: ~p, not supported", [Event]); + lager:warning("[iot_host] host: ~p, event: ~p, not supported", [UUID, Event]); Other -> - lager:warning("[iot_host] event error: ~p", [Other]) + lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) end, - {keep_state, State}; + {noreply, State}; %% 心跳机制 -handle_event(cast, heartbeat, _, State = #state{uuid = UUID, heartbeat_counter = HeartbeatCounter}) -> +handle_cast(heartbeat, State = #state{uuid = UUID, heartbeat_counter = HeartbeatCounter}) -> lager:debug("[iot_host] uuip: ~p, get heartbeat, counter is: ~p", [UUID, HeartbeatCounter]), - {keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}}; + {noreply, State#state{heartbeat_counter = HeartbeatCounter + 1}}. -handle_event(info, {timeout, _, heartbeat_ticker}, StateName, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = HeartbeatCounter}) -> - case StateName =:= ?STATE_DENIED andalso HeartbeatCounter =:= 0 of - true -> - lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]), - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - change_devices_status(HostId, ?DEVICE_UNKNOWN); - false -> - ok - end, +-spec handle_info(Info :: timeout | term(), State :: term()) -> + {noreply, NewState :: term()} | + {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | + {stop, Reason :: term(), NewState :: term()}. +%% 没有收到心跳包,主机下线,设备状态变为: "未知" +handle_info({timeout, _, heartbeat_ticker}, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = 0, has_session = false}) -> + lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]), + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_UNKNOWN), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), + {noreply, State#state{heartbeat_counter = 0}}; +%% 其他情况下需要重置系统计数器 +handle_info({timeout, _, heartbeat_ticker}, State = #state{}) -> + erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), + {noreply, State#state{heartbeat_counter = 0}}; - {keep_state, State#state{heartbeat_counter = 0}}; +%% 当websocket断开的时候,主机的状态不一定改变;主机的状态改变通过心跳机制,会话状态需要改变 +handle_info({'DOWN', Ref, process, ChannelPid, Reason}, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid, has_session = HasSession}) -> + lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]), + {noreply, State#state{channel_pid = undefined, has_session = false}}; -%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 -handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) -> - lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [UUID, ChannelPid, Reason, StateName, State]), - {next_state, ?STATE_DENIED, State#state{channel_pid = undefined}}; +handle_info(Info, State = #state{has_session = HasSession}) -> + lager:warning("[iot_host] unknown info: ~p, state: ~p", [Info, HasSession]), -handle_event(EventType, EventContent, StateName, State) -> - lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), - - {keep_state, State}. + {noreply, State}. %% @private %% @doc This function is called by a gen_statem when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. -terminate(Reason, StateName, _State = #state{host_id = HostId, uuid = UUID}) -> - lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]), +terminate(Reason, _State = #state{host_id = HostId, uuid = UUID, has_session = HasSession}) -> + lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]), host_bo:change_status(UUID, ?HOST_OFFLINE), change_devices_status(HostId, ?DEVICE_UNKNOWN), ok. %% @private %% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions