增加基于udp的心跳检查机制
This commit is contained in:
parent
a54a42e236
commit
cfedc75e1e
@ -15,6 +15,7 @@
|
|||||||
%% 设备是否在线状态
|
%% 设备是否在线状态
|
||||||
-define(DEVICE_OFFLINE, 0).
|
-define(DEVICE_OFFLINE, 0).
|
||||||
-define(DEVICE_ONLINE, 1).
|
-define(DEVICE_ONLINE, 1).
|
||||||
|
-define(DEVICE_UNKNOWN, 2).
|
||||||
|
|
||||||
%% 下发的任务状态
|
%% 下发的任务状态
|
||||||
-define(TASK_STATUS_INIT, -1). %% 未接入
|
-define(TASK_STATUS_INIT, -1). %% 未接入
|
||||||
|
|||||||
@ -32,6 +32,16 @@ handle_request("GET", "/host/metric", #{<<"uuid">> := UUID}, _) ->
|
|||||||
end
|
end
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% 处理主机的授权的 取消激活
|
||||||
|
handle_request("GET", "/host/status", _, #{<<"uuid">> := UUID}) when is_binary(UUID) ->
|
||||||
|
case iot_host:get_pid(UUID) of
|
||||||
|
undefined ->
|
||||||
|
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
|
||||||
|
Pid when is_pid(Pid) ->
|
||||||
|
{ok, StatusInfo} = iot_host:get_status(Pid),
|
||||||
|
{ok, 200, iot_util:json_data(StatusInfo)}
|
||||||
|
end;
|
||||||
|
|
||||||
%% 重新加载对应的主机信息
|
%% 重新加载对应的主机信息
|
||||||
handle_request("POST", "/host/reload", _, #{<<"uuid">> := UUID}) when is_binary(UUID) ->
|
handle_request("POST", "/host/reload", _, #{<<"uuid">> := UUID}) when is_binary(UUID) ->
|
||||||
lager:debug("[host_handler] will reload host uuid: ~p", [UUID]),
|
lager:debug("[host_handler] will reload host uuid: ~p", [UUID]),
|
||||||
|
|||||||
@ -19,6 +19,8 @@ start(_StartType, _StartArgs) ->
|
|||||||
|
|
||||||
%% 加速内存的回收
|
%% 加速内存的回收
|
||||||
erlang:system_flag(fullsweep_after, 16),
|
erlang:system_flag(fullsweep_after, 16),
|
||||||
|
%% 启动udp服务
|
||||||
|
start_udp_server(),
|
||||||
%% 启动http服务
|
%% 启动http服务
|
||||||
start_http_server(),
|
start_http_server(),
|
||||||
%% 启动redis服务器
|
%% 启动redis服务器
|
||||||
@ -84,7 +86,14 @@ start_redis_server() ->
|
|||||||
],
|
],
|
||||||
{ok, _} = esockd:open(redis_server, Port, TransOpts, {redis_protocol, start_link, []}),
|
{ok, _} = esockd:open(redis_server, Port, TransOpts, {redis_protocol, start_link, []}),
|
||||||
|
|
||||||
lager:debug("[iot_app] the rpc server start at: ~p", [Port]).
|
lager:debug("[iot_app] the redis server start at: ~p", [Port]).
|
||||||
|
|
||||||
|
start_udp_server() ->
|
||||||
|
{ok, Props} = application:get_env(iot, udp_server),
|
||||||
|
Port = proplists:get_value(port, Props),
|
||||||
|
UdpOpts = [{udp_options, [binary, {reuseaddr, true}]}],
|
||||||
|
{ok, _} = esockd:open_udp(iot_udp_server, Port, UdpOpts, {iot_udp_server, start_link, []}),
|
||||||
|
lager:debug("[iot_app] the udp server start at: ~p", [Port]).
|
||||||
|
|
||||||
%% 启动内存数据库
|
%% 启动内存数据库
|
||||||
start_mnesia() ->
|
start_mnesia() ->
|
||||||
|
|||||||
@ -16,7 +16,7 @@
|
|||||||
-export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2]).
|
-export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
|
|
||||||
%% 终端是否授权
|
%% 终端是否授权
|
||||||
-define(DEVICE_AUTH_DENIED, 0).
|
-define(DEVICE_AUTH_DENIED, 0).
|
||||||
@ -43,7 +43,9 @@ get_pid(DeviceUUID) when is_binary(DeviceUUID) ->
|
|||||||
get_name(DeviceUUID) when is_binary(DeviceUUID) ->
|
get_name(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||||
binary_to_atom(<<"iot_device:", DeviceUUID/binary>>).
|
binary_to_atom(<<"iot_device:", DeviceUUID/binary>>).
|
||||||
|
|
||||||
-spec is_activated(Pid :: pid()) -> boolean().
|
-spec is_activated(Pid :: pid() | undefined) -> boolean().
|
||||||
|
is_activated(undefined) ->
|
||||||
|
false;
|
||||||
is_activated(Pid) when is_pid(Pid) ->
|
is_activated(Pid) when is_pid(Pid) ->
|
||||||
gen_statem:call(Pid, is_activated).
|
gen_statem:call(Pid, is_activated).
|
||||||
|
|
||||||
@ -103,14 +105,6 @@ init0(#{<<"device_uuid">> := DeviceUUID, <<"status">> := Status, <<"authorize_st
|
|||||||
callback_mode() ->
|
callback_mode() ->
|
||||||
handle_event_function.
|
handle_event_function.
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
|
|
||||||
%% (2) when gen_statem terminates abnormally.
|
|
||||||
%% This callback is optional.
|
|
||||||
format_status(_Opt, [_PDict, _StateName, _State]) ->
|
|
||||||
Status = some_term,
|
|
||||||
Status.
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||||
|
|||||||
@ -16,14 +16,18 @@
|
|||||||
-define(STATE_DENIED, denied).
|
-define(STATE_DENIED, denied).
|
||||||
-define(STATE_SESSION, session).
|
-define(STATE_SESSION, session).
|
||||||
|
|
||||||
|
%% 心跳包检测时间间隔, 2分钟检测一次
|
||||||
|
-define(HEARTBEAT_INTERVAL, 120 * 1000).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
||||||
-export([get_metric/1, publish_message/4, get_aes/1]).
|
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
||||||
-export([create_session/2, attach_channel/2]).
|
-export([create_session/2, attach_channel/2]).
|
||||||
-export([reload_device/2, delete_device/2, activate_device/3]).
|
-export([reload_device/2, delete_device/2, activate_device/3]).
|
||||||
|
-export([heartbeat/1]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
host_id :: integer(),
|
host_id :: integer(),
|
||||||
@ -32,6 +36,9 @@
|
|||||||
%% aes的key, 后续通讯需要基于这个加密
|
%% aes的key, 后续通讯需要基于这个加密
|
||||||
aes = <<>> :: binary(),
|
aes = <<>> :: binary(),
|
||||||
|
|
||||||
|
%% 心跳计数器
|
||||||
|
heartbeat_counter = 0 :: integer(),
|
||||||
|
|
||||||
%% websocket相关
|
%% websocket相关
|
||||||
channel_pid :: undefined | pid(),
|
channel_pid :: undefined | pid(),
|
||||||
monitor_ref :: undefined | reference(),
|
monitor_ref :: undefined | reference(),
|
||||||
@ -67,6 +74,10 @@ handle(Pid, Packet) when is_pid(Pid) ->
|
|||||||
get_aes(Pid) when is_pid(Pid) ->
|
get_aes(Pid) when is_pid(Pid) ->
|
||||||
gen_statem:call(Pid, get_aes).
|
gen_statem:call(Pid, get_aes).
|
||||||
|
|
||||||
|
-spec get_status(Pid :: pid()) -> {ok, Status :: map()}.
|
||||||
|
get_status(Pid) when is_pid(Pid) ->
|
||||||
|
gen_statem:call(Pid, get_status).
|
||||||
|
|
||||||
%% 激活主机, true 表示激活; false表示关闭激活
|
%% 激活主机, true 表示激活; false表示关闭激活
|
||||||
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
||||||
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||||
@ -116,6 +127,12 @@ delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
|||||||
activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
|
activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
|
||||||
gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}).
|
gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}).
|
||||||
|
|
||||||
|
-spec heartbeat(Pid :: pid()) -> no_return().
|
||||||
|
heartbeat(undefined) ->
|
||||||
|
ok;
|
||||||
|
heartbeat(Pid) when is_pid(Pid) ->
|
||||||
|
gen_statem:cast(Pid, heartbeat).
|
||||||
|
|
||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
%% function does not return until Module:init/1 has returned.
|
%% function does not return until Module:init/1 has returned.
|
||||||
@ -131,17 +148,21 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
|||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
%% process to initialize.
|
%% process to initialize.
|
||||||
init([UUID]) ->
|
init([UUID]) ->
|
||||||
%% 重启时,认为主机是离线状态; 等待主机主动建立连接
|
|
||||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||||
|
|
||||||
case host_bo:get_host_by_uuid(UUID) of
|
case host_bo:get_host_by_uuid(UUID) of
|
||||||
{ok, #{<<"id">> := HostId}} ->
|
{ok, #{<<"id">> := HostId}} ->
|
||||||
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
||||||
AliasName = get_alias_name(HostId),
|
AliasName = get_alias_name(HostId),
|
||||||
global:register_name(AliasName, self()),
|
global:register_name(AliasName, self()),
|
||||||
|
|
||||||
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
||||||
|
|
||||||
|
%% 启动主机相关的devices
|
||||||
|
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||||
|
lists:foreach(fun iot_device_sup:ensured_device_started/1, Devices),
|
||||||
|
|
||||||
|
%% 心跳检测机制
|
||||||
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
|
|
||||||
{ok, ?STATE_DENIED, #state{host_id = HostId, uuid = UUID, aes = Aes}};
|
{ok, ?STATE_DENIED, #state{host_id = HostId, uuid = UUID, aes = Aes}};
|
||||||
undefined ->
|
undefined ->
|
||||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||||
@ -154,14 +175,6 @@ init([UUID]) ->
|
|||||||
callback_mode() ->
|
callback_mode() ->
|
||||||
handle_event_function.
|
handle_event_function.
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
|
|
||||||
%% (2) when gen_statem terminates abnormally.
|
|
||||||
%% This callback is optional.
|
|
||||||
format_status(_Opt, [_PDict, _StateName, _State]) ->
|
|
||||||
Status = some_term,
|
|
||||||
Status.
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||||
@ -172,6 +185,31 @@ handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) ->
|
|||||||
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
|
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
|
||||||
{keep_state, State, [{reply, From, {ok, Aes}}]};
|
{keep_state, State, [{reply, From, {ok, Aes}}]};
|
||||||
|
|
||||||
|
%% 获取主机的状态
|
||||||
|
handle_event({call, From}, get_status, StateName, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics}) ->
|
||||||
|
%% 启动主机相关的devices
|
||||||
|
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||||
|
DeviceInfos = lists:map(fun(DeviceUUID) ->
|
||||||
|
DevicePid = iot_device:get_pid(DeviceUUID),
|
||||||
|
case iot_device:is_activated(DevicePid) of
|
||||||
|
true ->
|
||||||
|
{DeviceUUID, <<"online">>};
|
||||||
|
false ->
|
||||||
|
{DeviceUUID, <<"offline">>}
|
||||||
|
end
|
||||||
|
end, Devices),
|
||||||
|
|
||||||
|
Status = if StateName == ?STATE_SESSION -> <<"session">>; true -> <<"denied">> end,
|
||||||
|
HasChannel = (ChannelPid /= undefined),
|
||||||
|
Reply = #{
|
||||||
|
<<"status">> => Status,
|
||||||
|
<<"has_channel">> => HasChannel,
|
||||||
|
<<"heartbeat_counter">> => HeartbeatCounter,
|
||||||
|
<<"metrics">> => Metrics,
|
||||||
|
<<"device_infos">> => DeviceInfos
|
||||||
|
},
|
||||||
|
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
||||||
|
|
||||||
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
||||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_SESSION, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid}) ->
|
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_SESSION, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid}) ->
|
||||||
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
|
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
|
||||||
@ -193,22 +231,20 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State) ->
|
|||||||
{keep_state, State, [{reply, From, {error, <<"主机状态错误,发送命令失败"/utf8>>}}]};
|
{keep_state, State, [{reply, From, {error, <<"主机状态错误,发送命令失败"/utf8>>}}]};
|
||||||
|
|
||||||
%% 关闭授权
|
%% 关闭授权
|
||||||
handle_event({call, From}, {activate, Auth}, StateName, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid}) ->
|
handle_event({call, From}, {activate, _}, ?STATE_DENIED, State) ->
|
||||||
case {StateName, Auth} of
|
|
||||||
{?STATE_DENIED, _} ->
|
|
||||||
{keep_state, State, [{reply, From, ok}]};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
{?STATE_SESSION, false} ->
|
|
||||||
|
handle_event({call, From}, {activate, true}, ?STATE_SESSION, State) ->
|
||||||
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
|
handle_event({call, From}, {activate, false}, ?STATE_SESSION, State = #state{host_id = HostId, uuid = UUID, monitor_ref = MRef, channel_pid = ChannelPid}) ->
|
||||||
%% 取消之前的monitor
|
%% 取消之前的monitor
|
||||||
erlang:demonitor(MRef),
|
erlang:demonitor(MRef),
|
||||||
ws_channel:stop(ChannelPid, closed),
|
ws_channel:stop(ChannelPid, closed),
|
||||||
|
|
||||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||||
change_devices_status(HostId, ?DEVICE_OFFLINE),
|
change_devices_status(HostId, ?DEVICE_UNKNOWN),
|
||||||
|
|
||||||
{next_state, ?STATE_DENIED, State#state{monitor_ref = undefined, channel_pid = undefined}, [{reply, From, ok}]};
|
{next_state, ?STATE_DENIED, State#state{monitor_ref = undefined, channel_pid = undefined}, [{reply, From, ok}]};
|
||||||
{?STATE_SESSION, true} ->
|
|
||||||
{keep_state, State, [{reply, From, ok}]}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 绑定channel
|
%% 绑定channel
|
||||||
handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||||
@ -226,7 +262,7 @@ handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid
|
|||||||
{keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]};
|
{keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]};
|
||||||
|
|
||||||
%% 授权通过后,才能将主机的状态设置为在线状态
|
%% 授权通过后,才能将主机的状态设置为在线状态
|
||||||
handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UUID, aes = Aes, host_id = HostId}) ->
|
handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UUID, aes = Aes}) ->
|
||||||
{ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID),
|
{ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID),
|
||||||
case AuthorizeStatus =:= 1 of
|
case AuthorizeStatus =:= 1 of
|
||||||
true ->
|
true ->
|
||||||
@ -235,9 +271,6 @@ handle_event({call, From}, {create_session, PubKey}, _, State = #state{uuid = UU
|
|||||||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
||||||
lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [?STATE_SESSION, UUID, AffectedRow]),
|
lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [?STATE_SESSION, UUID, AffectedRow]),
|
||||||
|
|
||||||
%% 启动主机相关的device
|
|
||||||
start_devices(HostId),
|
|
||||||
|
|
||||||
{next_state, ?STATE_SESSION, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
{next_state, ?STATE_SESSION, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||||
false ->
|
false ->
|
||||||
lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]),
|
lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]),
|
||||||
@ -378,19 +411,27 @@ handle_event(cast, {handle, {event, Event0}}, ?STATE_SESSION, State = #state{uui
|
|||||||
end,
|
end,
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换
|
%% 心跳机制
|
||||||
handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{host_id = HostId, uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) ->
|
handle_event(cast, heartbeat, _, State = #state{uuid = UUID, heartbeat_counter = HeartbeatCounter}) ->
|
||||||
lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]),
|
lager:debug("[iot_host] uuip: ~p, get heartbeat, counter is: ~p", [UUID, HeartbeatCounter]),
|
||||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};
|
||||||
change_devices_status(HostId, ?DEVICE_OFFLINE),
|
|
||||||
|
|
||||||
%% 会话状态如果链接丢失,需要切换到activated状态,其他情况保持不变
|
handle_event(info, {timeout, _, heartbeat_ticker}, StateName, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = HeartbeatCounter}) ->
|
||||||
case StateName =:= session of
|
case StateName =:= ?STATE_DENIED andalso HeartbeatCounter =:= 0 of
|
||||||
true ->
|
true ->
|
||||||
{next_state, activated, State#state{channel_pid = undefined}};
|
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||||
|
change_devices_status(HostId, ?DEVICE_UNKNOWN);
|
||||||
false ->
|
false ->
|
||||||
{keep_state, State#state{channel_pid = undefined}}
|
ok
|
||||||
end;
|
end,
|
||||||
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
|
|
||||||
|
{keep_state, State#state{heartbeat_counter = 0}};
|
||||||
|
|
||||||
|
%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换
|
||||||
|
handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) ->
|
||||||
|
lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [UUID, ChannelPid, Reason, StateName, State]),
|
||||||
|
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined}};
|
||||||
|
|
||||||
handle_event(EventType, EventContent, StateName, State) ->
|
handle_event(EventType, EventContent, StateName, State) ->
|
||||||
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
|
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
|
||||||
@ -405,7 +446,7 @@ handle_event(EventType, EventContent, StateName, State) ->
|
|||||||
terminate(Reason, StateName, _State = #state{host_id = HostId, uuid = UUID}) ->
|
terminate(Reason, StateName, _State = #state{host_id = HostId, uuid = UUID}) ->
|
||||||
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]),
|
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]),
|
||||||
host_bo:change_status(UUID, ?HOST_OFFLINE),
|
host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||||
change_devices_status(HostId, ?DEVICE_OFFLINE),
|
change_devices_status(HostId, ?DEVICE_UNKNOWN),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -417,18 +458,6 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
|||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
start_devices(HostId) when is_integer(HostId) ->
|
|
||||||
%% 启动主机相关的device,此时device的状态为离线状态
|
|
||||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
|
||||||
lists:foreach(fun(DeviceUUID) ->
|
|
||||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
|
||||||
{ok, DevicePid} ->
|
|
||||||
iot_device:change_status(DevicePid, ?DEVICE_OFFLINE);
|
|
||||||
{error, Reason} ->
|
|
||||||
lager:notice("[iot_host] start_device: ~p, get error: ~p", [DeviceUUID, Reason])
|
|
||||||
end
|
|
||||||
end, Devices).
|
|
||||||
|
|
||||||
%% 修改设备的状态
|
%% 修改设备的状态
|
||||||
change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) ->
|
change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) ->
|
||||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||||
|
|||||||
@ -14,6 +14,10 @@
|
|||||||
{backlog, 10240}
|
{backlog, 10240}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
{udp_server, [
|
||||||
|
{port, 18080}
|
||||||
|
]},
|
||||||
|
|
||||||
%% 目标服务器地址
|
%% 目标服务器地址
|
||||||
{emqx_server, [
|
{emqx_server, [
|
||||||
{host, {39, 98, 184, 67}},
|
{host, {39, 98, 184, 67}},
|
||||||
|
|||||||
@ -14,6 +14,10 @@
|
|||||||
{backlog, 10240}
|
{backlog, 10240}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
{udp_server, [
|
||||||
|
{port, 18080}
|
||||||
|
]},
|
||||||
|
|
||||||
%% 目标服务器地址
|
%% 目标服务器地址
|
||||||
%{emqx_server, [
|
%{emqx_server, [
|
||||||
% {host, {39, 98, 184, 67}},
|
% {host, {39, 98, 184, 67}},
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user