482 lines
23 KiB
Erlang
482 lines
23 KiB
Erlang
%%%-------------------------------------------------------------------
|
||
%%% @author
|
||
%%% @copyright (C) 2023, <COMPANY>
|
||
%%% @doc
|
||
%%%
|
||
%%% @end
|
||
%%% Created : 22. 9月 2023 16:38
|
||
%%%-------------------------------------------------------------------
|
||
-module(iot_host).
|
||
-author("aresei").
|
||
-include("iot.hrl").
|
||
-include("message_pb.hrl").
|
||
|
||
-behaviour(gen_statem).
|
||
|
||
%% 心跳包检测时间间隔, 15分钟检测一次
|
||
-define(HEARTBEAT_INTERVAL, 900 * 1000).
|
||
|
||
%% 状态
|
||
-define(STATE_DENIED, denied).
|
||
-define(STATE_ACTIVATED, activated).
|
||
|
||
%% API
|
||
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
||
-export([get_metric/1, get_status/1]).
|
||
%% 通讯相关
|
||
-export([pub/3, attach_channel/2, command/3]).
|
||
-export([deploy_service/4, start_service/2, stop_service/2, invoke_service/4, async_service_config/4, task_log/2, await_reply/2]).
|
||
%% 设备管理
|
||
-export([reload_device/2, delete_device/2, activate_device/3]).
|
||
-export([heartbeat/1]).
|
||
|
||
%% gen_statem callbacks
|
||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||
|
||
-record(state, {
|
||
host_id :: integer(),
|
||
%% 从数据库里面读取到的数据
|
||
uuid :: binary(),
|
||
has_session = false :: boolean(),
|
||
%% 心跳计数器
|
||
heartbeat_counter = 0 :: integer(),
|
||
%% websocket相关
|
||
channel_pid :: undefined | pid(),
|
||
|
||
%% 设备的关系, #{device_uuid => Device}
|
||
device_map = #{},
|
||
|
||
%% 主机的相关信息
|
||
metrics = #{} :: map()
|
||
}).
|
||
|
||
%%%===================================================================
|
||
%%% API
|
||
%%%===================================================================
|
||
-spec get_pid(UUID :: binary()) -> undefined | pid().
|
||
get_pid(UUID) when is_binary(UUID) ->
|
||
Name = get_name(UUID),
|
||
whereis(Name).
|
||
|
||
-spec get_name(UUID :: binary()) -> atom().
|
||
get_name(UUID) when is_binary(UUID) ->
|
||
binary_to_atom(<<"iot_host:", UUID/binary>>).
|
||
|
||
-spec get_alias_name(HostId :: integer()) -> atom().
|
||
get_alias_name(HostId0) when is_integer(HostId0) ->
|
||
HostId = integer_to_binary(HostId0),
|
||
binary_to_atom(<<"iot_host_id:", HostId/binary>>).
|
||
|
||
%% 处理消息
|
||
-spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return().
|
||
handle(Pid, Packet) when is_pid(Pid) ->
|
||
gen_statem:cast(Pid, {handle, Packet}).
|
||
|
||
-spec get_status(Pid :: pid()) -> {ok, Status :: map()}.
|
||
get_status(Pid) when is_pid(Pid) ->
|
||
gen_statem:call(Pid, get_status).
|
||
|
||
%% 激活主机, true 表示激活; false表示关闭激活
|
||
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
||
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||
gen_statem:call(Pid, {activate, Auth}).
|
||
|
||
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
|
||
get_metric(Pid) when is_pid(Pid) ->
|
||
gen_statem:call(Pid, get_metric).
|
||
|
||
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()} | {denied, Reason :: binary()}.
|
||
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
|
||
gen_statem:call(Pid, {attach_channel, ChannelPid}).
|
||
|
||
-spec async_service_config(Pid :: pid(), ServiceId :: binary(), ConfigJson :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||
async_service_config(Pid, ServiceId, ConfigJson, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout) ->
|
||
ConfigBin = message_pb:encode_msg(#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout}),
|
||
gen_statem:call(Pid, {async_call, self(), ?PUSH_SERVICE_CONFIG, ConfigBin}).
|
||
|
||
-spec deploy_service(Pid :: pid(), TaskId :: integer(), ServiceId :: binary(), TarUrl :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||
deploy_service(Pid, TaskId, ServiceId, TarUrl) when is_pid(Pid), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) ->
|
||
PushBin = message_pb:encode_msg(#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl}),
|
||
gen_statem:call(Pid, {async_call, self(), ?PUSH_DEPLOY, PushBin}).
|
||
|
||
-spec start_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||
start_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) ->
|
||
gen_statem:call(Pid, {async_call, self(), ?PUSH_START_SERVICE, ServiceId}).
|
||
|
||
-spec stop_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||
stop_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) ->
|
||
gen_statem:call(Pid, {async_call, self(), ?PUSH_STOP_SERVICE, ServiceId}).
|
||
|
||
-spec invoke_service(Pid :: pid(), ServiceId :: binary(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||
invoke_service(Pid, ServiceId, Payload, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout) ->
|
||
InvokeBin = message_pb:encode_msg(#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}),
|
||
gen_statem:call(Pid, {async_call, self(), ?PUSH_INVOKE, InvokeBin}).
|
||
|
||
-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||
task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) ->
|
||
TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}),
|
||
gen_statem:call(Pid, {async_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
|
||
|
||
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}.
|
||
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
||
receive
|
||
{async_call_reply, Ref, #async_call_reply{code = 1, result = Result}} ->
|
||
{ok, Result};
|
||
{async_call_reply, Ref, #async_call_reply{code = 0, message = Message}} ->
|
||
{error, Message}
|
||
after Timeout ->
|
||
{error, <<"timeout">>}
|
||
end.
|
||
|
||
-spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}.
|
||
pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) ->
|
||
gen_statem:call(Pid, {pub, Topic, Content}).
|
||
|
||
-spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}.
|
||
command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) ->
|
||
gen_statem:call(Pid, {command, CommandType, Command}).
|
||
|
||
%% 设备管理相关
|
||
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
||
reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||
gen_statem:call(Pid, {reload_device, DeviceUUID}).
|
||
|
||
-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok.
|
||
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||
gen_statem:call(Pid, {delete_device, DeviceUUID}).
|
||
|
||
-spec activate_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}.
|
||
activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
|
||
gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}).
|
||
|
||
-spec heartbeat(Pid :: pid()) -> no_return().
|
||
heartbeat(undefined) ->
|
||
ok;
|
||
heartbeat(Pid) when is_pid(Pid) ->
|
||
gen_statem:cast(Pid, heartbeat).
|
||
|
||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||
%% initialize. To ensure a synchronized start-up procedure, this
|
||
%% function does not return until Module:init/1 has returned.
|
||
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
||
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
|
||
|
||
%%%===================================================================
|
||
%%% gen_statem callbacks
|
||
%%%===================================================================
|
||
|
||
%% @private
|
||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||
%% process to initialize.
|
||
init([UUID]) ->
|
||
case host_bo:get_host_by_uuid(UUID) of
|
||
{ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} ->
|
||
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
||
AliasName = get_alias_name(HostId),
|
||
global:register_name(AliasName, self()),
|
||
|
||
%% 心跳检测机制
|
||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||
|
||
StateName = case AuthorizeStatus =:= 1 of
|
||
true -> ?STATE_ACTIVATED;
|
||
false -> ?STATE_DENIED
|
||
end,
|
||
|
||
%% 加载设备信息
|
||
{ok, DeviceInfos} = device_bo:get_host_devices(HostId),
|
||
Devices = lists:filtermap(fun(DeviceInfo = #{<<"device_uuid">> := DeviceUUID}) ->
|
||
case iot_device:new(DeviceInfo) of
|
||
error ->
|
||
false;
|
||
{ok, Device} ->
|
||
{true, {DeviceUUID, Device}}
|
||
end
|
||
end, DeviceInfos),
|
||
|
||
{ok, StateName, #state{host_id = HostId, uuid = UUID, device_map = maps:from_list(Devices), has_session = false}};
|
||
undefined ->
|
||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||
ignore
|
||
end.
|
||
|
||
%% @private
|
||
%% @doc This function is called by a gen_statem when it needs to find out
|
||
%% the callback mode of the callback module.
|
||
callback_mode() ->
|
||
handle_event_function.
|
||
|
||
%% @private
|
||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||
%% process message, this function is called.
|
||
|
||
handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) ->
|
||
{keep_state, State, [{reply, From, {ok, Metrics}}]};
|
||
|
||
%% 获取主机的状态
|
||
handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) ->
|
||
HasChannel = (ChannelPid /= undefined),
|
||
Reply = #{
|
||
<<"has_channel">> => HasChannel,
|
||
<<"has_session">> => HasSession,
|
||
<<"heartbeat_counter">> => HeartbeatCounter,
|
||
<<"metrics">> => Metrics
|
||
},
|
||
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
||
|
||
%% 只要channel存在,就负责将消息推送到边缘端主机
|
||
handle_event({call, From}, {async_call, ReceiverPid, PushType, PushBin}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
||
case HasSession andalso is_pid(ChannelPid) of
|
||
true ->
|
||
%% 通过websocket发送请求
|
||
Ref = tcp_channel:async_call(ChannelPid, ReceiverPid, PushType, PushBin),
|
||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||
false ->
|
||
lager:debug("[iot_host] uuid: ~p, publish_type: ~p, invalid state: ~p", [UUID, PushType, state_map(State)]),
|
||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送请求失败"/utf8>>}}]}
|
||
end;
|
||
|
||
%% 发送指令时, pub/sub
|
||
handle_event({call, From}, {pub, Topic, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
||
case HasSession andalso is_pid(ChannelPid) of
|
||
true ->
|
||
lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]),
|
||
%% 通过websocket发送请求
|
||
tcp_channel:pub(ChannelPid, Topic, Content),
|
||
|
||
{keep_state, State, [{reply, From, ok}]};
|
||
false ->
|
||
lager:debug("[iot_host] uuid: ~p, publish to topic: ~p, content: ~p, invalid state: ~p", [UUID, Topic, Content, state_map(State)]),
|
||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送失败"/utf8>>}}]}
|
||
end;
|
||
|
||
%% 发送指令时
|
||
handle_event({call, From}, {command, CommandType, Command}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
||
case HasSession andalso is_pid(ChannelPid) of
|
||
true ->
|
||
lager:debug("[iot_host] host: ~p, command_type: ~p, command: ~p", [UUID, CommandType, Command]),
|
||
%% 通过websocket发送请求
|
||
tcp_channel:command(ChannelPid, CommandType, Command),
|
||
{keep_state, State, [{reply, From, ok}]};
|
||
false ->
|
||
lager:debug("[iot_host] host: ~p, command_type: ~p, command: ~p, invalid state: ~p", [UUID, CommandType, Command, state_map(State)]),
|
||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}
|
||
end;
|
||
|
||
%% 激活主机
|
||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) ->
|
||
case is_pid(ChannelPid) of
|
||
true ->
|
||
lager:debug("[iot_host] uuid: ~p, activate: true", [UUID]),
|
||
tcp_channel:command(ChannelPid, ?COMMAND_AUTH, <<1:8>>);
|
||
false ->
|
||
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID])
|
||
end,
|
||
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||
|
||
%% 关闭授权
|
||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) ->
|
||
case is_pid(ChannelPid) of
|
||
true ->
|
||
tcp_channel:command(ChannelPid, ?COMMAND_AUTH, <<0:8>>),
|
||
lager:debug("[iot_host] uuid: ~p, activate: false", [UUID]),
|
||
tcp_channel:stop(ChannelPid, closed);
|
||
false ->
|
||
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID])
|
||
end,
|
||
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
|
||
|
||
%% 绑定channel
|
||
handle_event({call, From}, {attach_channel, ChannelPid}, StateName, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||
case StateName of
|
||
?STATE_ACTIVATED ->
|
||
erlang:monitor(process, ChannelPid),
|
||
%% 更新主机为在线状态
|
||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
||
report_event(UUID, ?HOST_ONLINE),
|
||
lager:debug("[iot_host] host_id(attach_channel) uuid: ~p, will change status, affected_row: ~p", [UUID, AffectedRow]),
|
||
{keep_state, State#state{channel_pid = ChannelPid, has_session = true}, [{reply, From, ok}]};
|
||
%% 主机未激活
|
||
?STATE_DENIED ->
|
||
lager:notice("[iot_host] attach_channel host_id uuid: ~p, channel: ~p, host inactivated", [UUID, ChannelPid]),
|
||
erlang:monitor(process, ChannelPid),
|
||
{keep_state, State#state{channel_pid = ChannelPid}, [{reply, From, {denied, <<"host inactivated">>}}]}
|
||
end;
|
||
%% 已经绑定了channel
|
||
handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, channel_pid = OldChannelPid}) ->
|
||
lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]),
|
||
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
|
||
|
||
%% 重新加载设备信息
|
||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
|
||
case maps:find(DeviceUUID, DeviceMap) of
|
||
error ->
|
||
{keep_state, State, [{reply, From, {error, <<"device not found">>}}]};
|
||
{ok, Device} ->
|
||
case iot_device:reload(Device) of
|
||
error ->
|
||
{keep_state, State#state{device_map = maps:remove(Device, DeviceMap)}, [{reply, From, {error, <<"reload device error">>}}]};
|
||
{ok, NDevice} ->
|
||
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}, [{reply, From, ok}]}
|
||
end
|
||
end;
|
||
|
||
%% 删除设备
|
||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
|
||
{keep_state, State#state{device_map = maps:remove(DeviceUUID, DeviceMap)}, [{reply, From, ok}]};
|
||
|
||
%% 激活设备
|
||
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{device_map = DeviceMap}) ->
|
||
case maps:find(DeviceUUID, DeviceMap) of
|
||
error ->
|
||
{keep_state, State, [{reply, From, {error, <<"device not found">>}}]};
|
||
{ok, Device} ->
|
||
NDevice = iot_device:auth(Device, Auth),
|
||
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}, [{reply, From, ok}]}
|
||
end;
|
||
|
||
%% todo
|
||
handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) ->
|
||
lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]),
|
||
case DeviceUUID =/= <<"">> of
|
||
true ->
|
||
case maps:find(DeviceUUID, DeviceMap) of
|
||
error ->
|
||
lager:warning("[iot_host] host uuid: ~p, device uuid: ~p not found, metric: ~p", [UUID, DeviceUUID, Metric]),
|
||
{keep_state, State};
|
||
{ok, Device} ->
|
||
case iot_device:is_activated(Device) of
|
||
true ->
|
||
RouteKey = get_route_key(RouteKey0),
|
||
case endpoint:get_alias_pid(RouteKey) of
|
||
undefined ->
|
||
ok;
|
||
EndpointPid ->
|
||
endpoint:forward(EndpointPid, ServiceId, Metric)
|
||
end,
|
||
NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE),
|
||
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}};
|
||
false ->
|
||
lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, metric: ~p", [UUID, DeviceUUID, Metric]),
|
||
{keep_state, State}
|
||
end
|
||
end;
|
||
false ->
|
||
{keep_state, State}
|
||
end;
|
||
|
||
%% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行
|
||
handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
||
lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
||
{keep_state, State#state{metrics = Metrics}};
|
||
|
||
handle_event(cast, {handle, {inform, #service_inform{service_id = ServiceId, status = Status, timestamp = Timestamp}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
||
lager:debug("[iot_host] inform host: ~p, service_id: ~p, status: ~p, timestamp: ~p", [UUID, ServiceId, Status, Timestamp]),
|
||
{keep_state, State};
|
||
|
||
handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
|
||
lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
|
||
%DevicePid = iot_device:get_pid(DeviceUUID),
|
||
%iot_device:change_status(DevicePid, Status),
|
||
|
||
{keep_state, State};
|
||
|
||
%% 心跳机制
|
||
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
|
||
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};
|
||
|
||
%% 没有收到心跳包,主机下线, 设备状态不变
|
||
handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID, heartbeat_counter = 0, channel_pid = ChannelPid}) ->
|
||
lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]),
|
||
{ok, #{<<"status">> := Status}} = host_bo:get_host_by_uuid(UUID),
|
||
case Status of
|
||
?HOST_NOT_JOINED ->
|
||
lager:debug("[iot_host] host: ~p, host_maybe_offline, host not joined, can not change to offline", [UUID]);
|
||
?HOST_OFFLINE ->
|
||
lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]);
|
||
?HOST_ONLINE ->
|
||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||
report_event(UUID, ?HOST_OFFLINE)
|
||
end,
|
||
|
||
%% 关闭channel,主机需要重新连接,才能保存状态的一致
|
||
is_pid(ChannelPid) andalso tcp_channel:stop(ChannelPid, closed),
|
||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||
|
||
{keep_state, State#state{channel_pid = undefined, has_session = false, heartbeat_counter = 0}};
|
||
|
||
%% 其他情况下需要重置系统计数器
|
||
handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{}) ->
|
||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||
{keep_state, State#state{heartbeat_counter = 0}};
|
||
|
||
%% 当websocket断开的时候,主机的状态不一定改变;主机的状态改变通过心跳机制,会话状态需要改变
|
||
handle_event(info, {'DOWN', _Ref, process, ChannelPid, Reason}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
||
lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]),
|
||
{keep_state, State#state{channel_pid = undefined, has_session = false}};
|
||
|
||
handle_event(info, {'DOWN', _Ref, process, Pid, Reason}, _, State = #state{uuid = UUID}) ->
|
||
lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]),
|
||
{keep_state, State};
|
||
|
||
handle_event(Event, Info, StateName, State = #state{uuid = UUID}) ->
|
||
lager:warning("[iot_host] host: ~p, event: ~p, unknown message: ~p, state_name: ~p, state: ~p", [UUID, Event, Info, StateName, state_map(State)]),
|
||
{keep_state, State}.
|
||
|
||
%% @private
|
||
%% @doc This function is called by a gen_statem when it is about to
|
||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||
%% Reason. The return value is ignored.
|
||
terminate(Reason, _StateName, _State = #state{uuid = UUID, has_session = HasSession}) ->
|
||
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]),
|
||
ok.
|
||
|
||
%% @private
|
||
%% @doc Convert process state when code is changed
|
||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||
{ok, StateName, State}.
|
||
|
||
%%%===================================================================
|
||
%%% Internal functions
|
||
%%%===================================================================
|
||
|
||
get_route_key(<<"">>) ->
|
||
<<"default">>;
|
||
get_route_key(RouteKey) when is_binary(RouteKey) ->
|
||
RouteKey.
|
||
|
||
-spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return().
|
||
report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
|
||
TextMap = #{
|
||
0 => <<"离线"/utf8>>,
|
||
1 => <<"在线"/utf8>>
|
||
},
|
||
|
||
%% 设备的状态信息上报给中电
|
||
Timestamp = iot_util:timestamp_of_seconds(),
|
||
FieldsList = [#{
|
||
<<"key">> => <<"host_status">>,
|
||
<<"value">> => NewStatus,
|
||
<<"value_text">> => maps:get(NewStatus, TextMap),
|
||
<<"unit">> => 0,
|
||
<<"type">> => <<"DI">>,
|
||
<<"name">> => <<"主机状态"/utf8>>,
|
||
<<"timestamp">> => Timestamp
|
||
}],
|
||
%% todo 这里需要实现新的机制
|
||
% iot_router:route_uuid(UUID, FieldsList, Timestamp),
|
||
lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]).
|
||
|
||
%% 将当前的state转换成map
|
||
state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) ->
|
||
#{
|
||
host_id => HostId,
|
||
uuid => UUID,
|
||
has_session => HasSession,
|
||
heartbeat_counter => HeartbeatCounter,
|
||
channel_pid => ChannelPid,
|
||
metrics => Metrics
|
||
}. |