diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index e594496..fba356f 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -11,11 +11,12 @@ %% 主机是否在线 -define(HOST_OFFLINE, 0). -define(HOST_ONLINE, 1). +-define(HOST_NOT_JOINED, -1). %% 设备是否在线状态 -define(DEVICE_OFFLINE, 0). -define(DEVICE_ONLINE, 1). --define(DEVICE_UNKNOWN, 2). +-define(DEVICE_NOT_JOINED, -1). %% 下发的任务状态 -define(TASK_STATUS_INIT, -1). %% 未接入 diff --git a/apps/iot/src/database/device_bo.erl b/apps/iot/src/database/device_bo.erl index 1fa657b..40875ac 100644 --- a/apps/iot/src/database/device_bo.erl +++ b/apps/iot/src/database/device_bo.erl @@ -32,11 +32,23 @@ get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) -> %% 修改主机的状态 -spec change_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. -change_status(DeviceUUID, Status) when is_binary(DeviceUUID), is_integer(Status) -> - case mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [Status, DeviceUUID]) of +change_status(DeviceUUID, NStatus) when is_binary(DeviceUUID), is_integer(NStatus) -> + case change_status0(DeviceUUID, NStatus) of Result = {ok, _} -> - event_logs_bo:insert(?EVENT_DEVICE, DeviceUUID, Status), + event_logs_bo:insert(?EVENT_DEVICE, DeviceUUID, NStatus), Result; Error -> Error - end. \ No newline at end of file + end. +change_status0(DeviceUUID, ?DEVICE_ONLINE) when is_binary(DeviceUUID) -> + Timestamp = calendar:local_time(), + case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]) of + {ok, #{<<"status">> := -1}} -> + mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, access_at = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, Timestamp, DeviceUUID]); + {ok, _} -> + mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, DeviceUUID]); + undefined -> + {error, <<"device not found">>} + end; +change_status0(DeviceUUID, ?DEVICE_OFFLINE) when is_binary(DeviceUUID) -> + mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]). \ No newline at end of file diff --git a/apps/iot/src/database/host_bo.erl b/apps/iot/src/database/host_bo.erl index 2d1cce3..bb4b2b0 100644 --- a/apps/iot/src/database/host_bo.erl +++ b/apps/iot/src/database/host_bo.erl @@ -32,11 +32,24 @@ get_host_by_id(HostId) when is_integer(HostId) -> %% 修改主机的状态 -spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. -change_status(UUID, Status) when is_binary(UUID), is_integer(Status) -> - case mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]) of - Result = {ok, _} -> - event_logs_bo:insert(?EVENT_HOST, UUID, Status), - Result; - Error -> - Error - end. +change_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) -> + case change_status0(UUID, NStatus) of + Result = {ok, _} -> + event_logs_bo:insert(?EVENT_HOST, UUID, NStatus), + Result; + Error -> + Error + end. +change_status0(UUID, ?HOST_ONLINE) when is_binary(UUID) -> + Timestamp = calendar:local_time(), + case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM host WHERE uuid = ? LIMIT 1">>, [UUID]) of + %% 第一次更新激活 + {ok, #{<<"status">> := -1}} -> + mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, access_at = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, Timestamp, UUID]); + {ok, _} -> + mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, UUID]); + undefined -> + {error, <<"host not found">>} + end; +change_status0(UUID, ?HOST_OFFLINE) when is_binary(UUID) -> + mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [?HOST_OFFLINE, UUID]). \ No newline at end of file diff --git a/apps/iot/src/iot_database_buffer.erl b/apps/iot/src/iot_database_buffer.erl new file mode 100644 index 0000000..fc1cdff --- /dev/null +++ b/apps/iot/src/iot_database_buffer.erl @@ -0,0 +1,109 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 25. 9月 2023 10:05 +%%%------------------------------------------------------------------- +-module(iot_database_buffer). +-author("aresei"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([find_device/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + devices_map = #{} :: map() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec find_device(DeviceUUID :: binary()) -> error | {ok, Device :: map()}. +find_device(DeviceUUID) when is_binary(DeviceUUID) -> + gen_server:call(?MODULE, {find_device, DeviceUUID}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, Devices} = device_bo:get_all_devices(), + DevicesMap = maps:from_list(lists:map(fun(Device = #{<<"device_uuid">> := DeviceUUID}) -> {DeviceUUID, Device} end, Devices)), + + erlang:start_timer(300 * 1000, self(), clean_up), + + {ok, #state{devices_map = DevicesMap}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({find_device, DeviceUUID}, _From, State = #state{devices_map = DevicesMap}) -> + {reply, maps:find(DeviceUUID, DevicesMap), State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, clean_up}, State = #state{}) -> + {noreply, State#state{devices_map = #{}}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 1b04ae0..774ddb9 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -78,8 +78,25 @@ start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([DeviceUUID]) when is_binary(DeviceUUID) -> - gen_server:cast(self(), reload), - {ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID}}. + DeviceInfo1 = case iot_database_buffer:find_device(DeviceUUID) of + error -> + device_bo:get_device_by_uuid(DeviceUUID); + {ok, DeviceInfo} -> + {ok, DeviceInfo} + end, + + case DeviceInfo1 of + {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> + case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of + true -> + {ok, ?STATE_ACTIVATED, #state{device_uuid = DeviceUUID, status = Status}}; + false -> + {ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID, status = Status}} + end; + undefined -> + lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), + ignore + end. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -97,13 +114,28 @@ handle_event({call, From}, is_activated, StateName, State = #state{}) -> {keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]}; %% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略 -handle_event(cast, {change_status, ?DEVICE_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_ONLINE}) -> +handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{status = ?DEVICE_ONLINE}) -> {keep_state, State}; %% 改变数据库的状态, 其他情况下执行次数都很少 -handle_event(cast, {change_status, NewStatus}, _, State = #state{device_uuid = DeviceUUID}) -> - {ok, _} = device_bo:change_status(DeviceUUID, NewStatus), - report_event(DeviceUUID, NewStatus), - {keep_state, State#state{status = NewStatus}}; +handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{device_uuid = DeviceUUID}) -> + {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE), + report_event(DeviceUUID, ?DEVICE_ONLINE), + {keep_state, State#state{status = ?DEVICE_ONLINE}}; + +handle_event(cast, {change_status, ?DEVICE_OFFLINE}, _, State = #state{device_uuid = DeviceUUID}) -> + {ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID), + case Status of + ?DEVICE_NOT_JOINED -> + lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]), + {keep_state, State#state{status = ?DEVICE_NOT_JOINED}}; + ?DEVICE_OFFLINE -> + lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]), + {keep_state, State#state{status = ?DEVICE_OFFLINE}}; + ?DEVICE_ONLINE -> + {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), + report_event(DeviceUUID, ?DEVICE_OFFLINE), + {keep_state, State#state{status = ?DEVICE_OFFLINE}} + end; %% 重新加载数据库数据 handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) -> @@ -114,8 +146,7 @@ handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) -> true -> {next_state, ?STATE_ACTIVATED, State#state{status = Status}}; false -> - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), - {next_state, ?STATE_DENIED, State#state{status = ?DEVICE_OFFLINE}} + {next_state, ?STATE_DENIED, State#state{status = Status}} end; undefined -> lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), @@ -133,8 +164,7 @@ handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceU {?STATE_ACTIVATED, false} -> lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]), - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), - {next_state, ?STATE_DENIED, State#state{status = ?DEVICE_OFFLINE}}; + {next_state, ?STATE_DENIED, State}; {?STATE_ACTIVATED, true} -> lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]), {keep_state, State} @@ -159,19 +189,21 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== -spec report_event(DeviceUUID :: binary(), NewStatus :: integer()) -> no_return(). -report_event(DeviceUUID, ?DEVICE_UNKNOWN) -> - lager:notice("[iot_device] device_uuid: ~p, status: unknown, not report", [DeviceUUID]), - ok; report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewStatus) -> + TextMap = #{ + 0 => <<"离线"/utf8>>, + 1 => <<"在线"/utf8>> + }, %% 设备的状态信息上报给中电 Timestamp = iot_util:timestamp_of_seconds(), FieldsList = [#{ <<"key">> => <<"device_status">>, <<"value">> => NewStatus, + <<"value_text">> => maps:get(NewStatus, TextMap), <<"unit">> => 0, <<"type">> => <<"DI">>, <<"name">> => <<"设备状态"/utf8>>, <<"timestamp">> => Timestamp }], iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), - lager:debug("[iot_host] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). \ No newline at end of file + lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). \ No newline at end of file diff --git a/apps/iot/src/iot_device_sup.erl b/apps/iot/src/iot_device_sup.erl index 7f41d66..c7dc12d 100644 --- a/apps/iot/src/iot_device_sup.erl +++ b/apps/iot/src/iot_device_sup.erl @@ -35,7 +35,8 @@ ensured_device_started(DeviceUUID) when is_binary(DeviceUUID) -> delete_device(UUID) when is_binary(UUID) -> Id = iot_device:get_name(UUID), - supervisor:terminate_child(?MODULE, Id). + ok = supervisor:terminate_child(?MODULE, Id), + supervisor:delete_child(?MODULE, Id). child_spec(DeviceUUID) when is_binary(DeviceUUID) -> Name = iot_device:get_name(DeviceUUID), diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index b99e8e2..acad4e2 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -4,17 +4,21 @@ %%% @doc %%% %%% @end -%%% Created : 19. 6月 2023 10:32 +%%% Created : 22. 9月 2023 16:38 %%%------------------------------------------------------------------- -module(iot_host). -author("aresei"). -include("iot.hrl"). --behaviour(gen_server). +-behaviour(gen_statem). %% 心跳包检测时间间隔, 15分钟检测一次 -define(HEARTBEAT_INTERVAL, 900 * 1000). +%% 状态 +-define(STATE_DENIED, denied). +-define(STATE_ACTIVATED, activated). + %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). -export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). @@ -22,15 +26,13 @@ -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_statem callbacks +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). -record(state, { host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), - %% 记录最后的状态信息 - last_status :: integer(), %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), has_session = false :: boolean(), @@ -45,7 +47,6 @@ %%%=================================================================== %%% API %%%=================================================================== - -spec get_pid(UUID :: binary()) -> undefined | pid(). get_pid(UUID) when is_binary(UUID) -> Name = get_name(UUID), @@ -63,38 +64,38 @@ get_alias_name(HostId0) when is_integer(HostId0) -> %% 处理消息 -spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return(). handle(Pid, Packet) when is_pid(Pid) -> - gen_server:cast(Pid, {handle, Packet}). + gen_statem:cast(Pid, {handle, Packet}). -spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}. get_aes(Pid) when is_pid(Pid) -> - gen_server:call(Pid, get_aes). + gen_statem:call(Pid, get_aes). -spec get_status(Pid :: pid()) -> {ok, Status :: map()}. get_status(Pid) when is_pid(Pid) -> - gen_server:call(Pid, get_status). + gen_statem:call(Pid, get_status). %% 激活主机, true 表示激活; false表示关闭激活 -spec activate(Pid :: pid(), Auth :: boolean()) -> ok. activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> - gen_server:call(Pid, {activate, Auth}). + gen_statem:call(Pid, {activate, Auth}). -spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}. get_metric(Pid) when is_pid(Pid) -> - gen_server:call(Pid, get_metric). + gen_statem:call(Pid, get_metric). -spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> - gen_server:call(Pid, {attach_channel, ChannelPid}). + gen_statem:call(Pid, {attach_channel, ChannelPid}). -spec create_session(Pid :: pid(), PubKey :: binary()) -> {ok, Reply :: binary()}. create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) -> - gen_server:call(Pid, {create_session, PubKey}). + gen_statem:call(Pid, {create_session, PubKey}). %% 这里占用的的调用进程的时间 -spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) -> ok | {ok, Response :: binary()} | {error, Reason :: any()}. publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) -> - case gen_server:call(Pid, {publish_message, self(), CommandType, Params}) of + case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of {ok, Ref} -> receive {ws_response, Ref} -> @@ -112,247 +113,227 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer( -spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_server:call(Pid, {reload_device, DeviceUUID}). + gen_statem:call(Pid, {reload_device, DeviceUUID}). -spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok. delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_server:call(Pid, {delete_device, DeviceUUID}). + gen_statem:call(Pid, {delete_device, DeviceUUID}). -spec activate_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}. activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) -> - gen_server:call(Pid, {activate_device, DeviceUUID, Auth}). + gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}). -spec heartbeat(Pid :: pid()) -> no_return(). heartbeat(undefined) -> ok; heartbeat(Pid) when is_pid(Pid) -> - gen_server:cast(Pid, heartbeat). + gen_statem:cast(Pid, heartbeat). %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> - gen_server:start_link({local, Name}, ?MODULE, [UUID], []). + gen_statem:start_link({local, Name}, ?MODULE, [UUID], []). %%%=================================================================== %%% gen_statem callbacks %%%=================================================================== %% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. init([UUID]) -> - host_bo:change_status(UUID, ?HOST_OFFLINE), - report_event(UUID, ?HOST_OFFLINE), - case host_bo:get_host_by_uuid(UUID) of - {ok, #{<<"id">> := HostId}} -> + {ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} -> %% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid AliasName = get_alias_name(HostId), global:register_name(AliasName, self()), Aes = list_to_binary(iot_util:rand_bytes(32)), + %% 心跳检测机制 + erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), + %% 启动主机相关的devices {ok, Devices} = device_bo:get_host_devices(HostId), lists:foreach(fun iot_device_sup:ensured_device_started/1, Devices), - %% 心跳检测机制 - erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), - - {ok, #state{host_id = HostId, uuid = UUID, last_status = ?HOST_OFFLINE, aes = Aes, has_session = false}}; + StateName = case AuthorizeStatus =:= 1 of + true -> ?STATE_ACTIVATED; + false -> ?STATE_DENIED + end, + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore end. %% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(get_metric, _From, State = #state{metrics = Metrics}) -> - {reply, {ok, Metrics}, State}; +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. -handle_call(get_aes, _From, State = #state{aes = Aes}) -> - {reply, {ok, Aes}, State}; +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. + +handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) -> + {keep_state, State, [{reply, From, {ok, Metrics}}]}; + +handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) -> + {keep_state, State, [{reply, From, {ok, Aes}}]}; %% 获取主机的状态 -handle_call(get_status, _From, State = #state{host_id = HostId, last_status = LastStatus, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> +handle_event({call, From}, get_status, _, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> %% 启动主机相关的devices {ok, Devices} = device_bo:get_host_devices(HostId), DeviceInfos = lists:map(fun(DeviceUUID) -> DevicePid = iot_device:get_pid(DeviceUUID), case iot_device:is_activated(DevicePid) of - true -> - {DeviceUUID, <<"online">>}; - false -> - {DeviceUUID, <<"offline">>} + true -> {DeviceUUID, <<"activated">>}; + false -> {DeviceUUID, <<"denied">>} end end, Devices), HasChannel = (ChannelPid /= undefined), Reply = #{ - <<"last_status">> => LastStatus, <<"has_channel">> => HasChannel, <<"has_session">> => HasSession, <<"heartbeat_counter">> => HeartbeatCounter, <<"metrics">> => Metrics, <<"device_infos">> => DeviceInfos }, - {reply, {ok, Reply}, State}; + {keep_state, State, [{reply, From, {ok, Reply}}]}; %% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的 -handle_call({publish_message, ReceiverPid, CommandType, {aes, Command0}}, _From, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> +handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> + lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]), Command = iot_cipher_aes:encrypt(AES, Command0), %% 通过websocket发送请求 Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), - {reply, {ok, Ref}, State}; + {keep_state, State, [{reply, From, {ok, Ref}}]}; %% 只要channel存在,就负责将消息推送到边缘端主机 -handle_call({publish_message, ReceiverPid, CommandType, Command}, _From, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> +handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, + State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) -> + %% 通过websocket发送请求 lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]), Ref = ws_channel:publish(ChannelPid, ReceiverPid, <>), - {reply, {ok, Ref}, State}; + {keep_state, State, [{reply, From, {ok, Ref}}]}; -handle_call({publish_message, _, _, _}, _From, State = #state{uuid = UUID}) -> +handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = UUID}) -> lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]), - {reply, {error, <<"主机状态错误,发送命令失败"/utf8>>}, State}; + {keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]}; %% 激活主机 -handle_call({activate, true}, _From, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> +handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]), - {reply, ok, State}; -handle_call({activate, true}, _From, State = #state{uuid = UUID, channel_pid = undefined}) -> + {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; +handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]), - {reply, ok, State}; + {next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]}; %% 关闭授权 -handle_call({activate, false}, _From, State = #state{uuid = UUID, host_id = HostId, channel_pid = ChannelPid}) -> - case is_pid(ChannelPid) of - true -> - BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), - ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), - ws_channel:stop(ChannelPid, closed), - lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]); - false -> - lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]) - end, +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> + BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), + ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>), + ws_channel:stop(ChannelPid, closed), + lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]), + {next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]}; - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - report_event(UUID, ?HOST_OFFLINE), - change_devices_status(HostId, ?DEVICE_UNKNOWN), - - {reply, ok, State#state{last_status = ?HOST_OFFLINE, channel_pid = undefined, has_session = false}}; +handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> + lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]), + {next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]}; %% 绑定channel -handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> +handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) -> lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]), - case OldChannelPid =:= undefined of - true -> - erlang:monitor(process, ChannelPid), - {reply, ok, State#state{channel_pid = ChannelPid}}; - false -> - lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]), - {reply, {error, <<"channel existed">>}, State#state{channel_pid = ChannelPid}} - end; + erlang:monitor(process, ChannelPid), + {keep_state, State#state{channel_pid = ChannelPid}, [{reply, From, ok}]}; + +handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> + lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]), + {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; %% 授权通过后,才能将主机的状态设置为在线状态 -handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = Aes}) -> - {ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID), - case AuthorizeStatus =:= 1 of - true -> - Reply = #{<<"a">> => true, <<"aes">> => Aes}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), - report_event(UUID, ?HOST_ONLINE), +handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes}) -> + Reply = #{<<"a">> => true, <<"aes">> => Aes}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), + report_event(UUID, ?HOST_ONLINE), + lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), + {keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; - lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), - - {reply, {ok, <<10:8, EncReply/binary>>}, State#state{last_status = ?HOST_ONLINE, has_session = true}}; - false -> - lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), - Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - - {reply, {ok, <<10:8, EncReply/binary>>}, State#state{has_session = false}} - end; +handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), + Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + {keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; %% 重新加载设备信息 -handle_call({reload_device, DeviceUUID}, _From, State) -> +handle_event({call, From}, {reload_device, DeviceUUID}, _, State) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:reload(DevicePid), - {reply, ok, State}; + {keep_state, State, [{reply, From, ok}]}; {error, Reason} -> - {reply, {error, Reason}, State} + {keep_state, State, [{reply, From, {error, Reason}}]} end; %% 删除设备 -handle_call({delete_device, DeviceUUID}, _From, State) -> +handle_event({call, From}, {delete_device, DeviceUUID}, _, State) -> case iot_device:get_pid(DeviceUUID) of undefined -> ok; DevicePid when is_pid(DevicePid) -> iot_device_sup:delete_device(DeviceUUID) end, - {reply, ok, State}; + {keep_state, State, [{reply, From, ok}]}; %% 激活设备 -handle_call({activate_device, DeviceUUID, Auth}, _From, State) -> +handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) -> case iot_device_sup:ensured_device_started(DeviceUUID) of {ok, DevicePid} -> iot_device:auth(DevicePid, Auth), - {reply, ok, State}; + {keep_state, State, [{reply, From, ok}]}; {error, Reason} -> - {reply, {error, Reason}, State} - end. + {keep_state, State, [{reply, From, {error, Reason}}]} + end; --spec handle_cast(Request :: term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | - {stop, Reason :: term(), NewState :: term()}. %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 -handle_cast({handle, {data, Data}}, State = #state{aes = AES, has_session = true}) -> +handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) -> PlainData = iot_cipher_aes:decrypt(AES, Data), case catch jiffy:decode(PlainData, [return_maps]) of Info when is_map(Info) -> handle_data(Info, State); Other -> - lager:debug("[iot_host] the data is invalid json: ~p", [Other]) + lager:notice("[iot_host] the data is invalid json: ~p", [Other]) end, - {noreply, State}; + {keep_state, State}; -%% 其他情况丢弃数据 -handle_cast({handle, {data, _}}, State = #state{has_session = false}) -> - {noreply, State}; - -%% 任意状态下都可以ping -handle_cast({handle, {ping, CipherMetric}}, State = #state{uuid = UUID, aes = AES}) -> +%% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 +handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) -> MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), case catch jiffy:decode(MetricsInfo, [return_maps]) of Metrics when is_map(Metrics) -> lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), - {noreply, State#state{metrics = Metrics}}; + {keep_state, State#state{metrics = Metrics}}; Other -> lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]), - {noreply, State} + {keep_state, State} end; -handle_cast({handle, {inform, Info0}}, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) -> +handle_event(cast, {handle, {inform, Info0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"at">> := At, <<"services">> := ServiceInforms} -> @@ -377,9 +358,9 @@ handle_cast({handle, {inform, Info0}}, State = #state{uuid = UUID, host_id = Hos Error -> lager:warning("[iot_host] inform get error: ~p", [Error]) end, - {noreply, State}; + {keep_state, State}; -handle_cast({handle, {feedback_step, Info0}}, State = #state{aes = AES, has_session = true}) -> +handle_event(cast, {handle, {feedback_step, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of Data = #{<<"task_id">> := TaskId, <<"code">> := Code} -> @@ -392,9 +373,9 @@ handle_cast({handle, {feedback_step, Info0}}, State = #state{aes = AES, has_sess Other -> lager:warning("[iot_host] feedback_step error: ~p", [Other]) end, - {noreply, State}; + {keep_state, State}; -handle_cast({handle, {feedback_result, Info0}}, State = #state{aes = AES, has_session = true}) -> +handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) -> Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> @@ -409,9 +390,9 @@ handle_cast({handle, {feedback_result, Info0}}, State = #state{aes = AES, has_se Other -> lager:warning("[iot_host] feedback_result error: ~p", [Other]) end, - {noreply, State}; + {keep_state, State}; -handle_cast({handle, {event, Event0}}, State = #state{uuid = UUID, aes = AES, has_session = true}) -> +handle_event(cast, {handle, {event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) -> EventText = iot_cipher_aes:decrypt(AES, Event0), lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, EventText]), case catch jiffy:decode(EventText, [return_maps]) of @@ -423,75 +404,68 @@ handle_cast({handle, {event, Event0}}, State = #state{uuid = UUID, aes = AES, ha Other -> lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other]) end, - {noreply, State}; + {keep_state, State}; %% 心跳机制 -handle_cast(heartbeat, State = #state{heartbeat_counter = HeartbeatCounter}) -> - {noreply, State#state{heartbeat_counter = HeartbeatCounter + 1}}. +handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) -> + {keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}}; --spec handle_info(Info :: timeout | term(), State :: term()) -> - {noreply, NewState :: term()} | - {noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} | - {stop, Reason :: term(), NewState :: term()}. -%% 没有收到心跳包,主机下线,设备状态变为: "未知" -handle_info({timeout, _, heartbeat_ticker}, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = 0, has_session = false}) -> +%% 没有收到心跳包,主机下线, 设备状态不变 +handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID, heartbeat_counter = 0, channel_pid = ChannelPid}) -> lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]), - change_devices_status(HostId, ?DEVICE_UNKNOWN), - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), - report_event(UUID, ?HOST_OFFLINE), + {ok, #{<<"status">> := Status}} = host_bo:get_host_by_uuid(UUID), + case Status of + ?HOST_NOT_JOINED -> + lager:debug("[iot_host] host: ~p, host_maybe_offline, host not joined, can not change to offline", [UUID]); + ?HOST_OFFLINE -> + lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]); + ?HOST_ONLINE -> + {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + report_event(UUID, ?HOST_OFFLINE) + end, + %% 关闭channel,主机需要重新连接,才能保存状态的一致 + is_pid(ChannelPid) andalso ws_channel:stop(ChannelPid, closed), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), - {noreply, State#state{last_status = ?HOST_OFFLINE, heartbeat_counter = 0}}; + + {keep_state, State#state{channel_pid = undefined, has_session = false, heartbeat_counter = 0}}; + %% 其他情况下需要重置系统计数器 -handle_info({timeout, _, heartbeat_ticker}, State = #state{}) -> +handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{}) -> erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), - {noreply, State#state{heartbeat_counter = 0}}; + {keep_state, State#state{heartbeat_counter = 0}}; %% 当websocket断开的时候,主机的状态不一定改变;主机的状态改变通过心跳机制,会话状态需要改变 -handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> +handle_event(info, {'DOWN', _Ref, process, ChannelPid, Reason}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]), - {noreply, State#state{channel_pid = undefined, has_session = false}}; + {keep_state, State#state{channel_pid = undefined, has_session = false}}; -handle_info({'DOWN', _Ref, process, Pid, Reason}, State = #state{uuid = UUID}) -> +handle_event(info, {'DOWN', _Ref, process, Pid, Reason}, _, State = #state{uuid = UUID}) -> lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]), - {noreply, State}; + {keep_state, State}; -handle_info(Info, State = #state{has_session = HasSession}) -> - lager:warning("[iot_host] unknown info: ~p, state: ~p", [Info, HasSession]), - - {noreply, State}. +handle_event(Event, Info, StateName, State = #state{uuid = UUID}) -> + lager:warning("[iot_host] host: ~p, event: ~p, unknown message: ~p, state_name: ~p, state: ~p", [UUID, Event, Info, StateName, state_map(State)]), + {keep_state, State}. %% @private %% @doc This function is called by a gen_statem when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. -terminate(Reason, #state{host_id = HostId, uuid = UUID, has_session = HasSession}) -> - lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p, status: offline", [UUID, Reason, HasSession]), - host_bo:change_status(UUID, ?HOST_OFFLINE), - report_event(UUID, ?HOST_OFFLINE), - - change_devices_status(HostId, ?DEVICE_UNKNOWN), +terminate(Reason, _StateName, _State = #state{uuid = UUID, has_session = HasSession}) -> + lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]), ok. %% @private %% @doc Convert process state when code is changed -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. %%%=================================================================== %%% Internal functions %%%=================================================================== -%% 修改设备的状态 -change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) -> - {ok, Devices} = device_bo:get_host_devices(HostId), - lager:debug("[iot_host] host_id: ~p, devices new status: ~p, devices: ~p", [HostId, NewStatus, Devices]), - lists:foreach(fun(DeviceUUID) -> - Pid = iot_device:get_pid(DeviceUUID), - iot_device:change_status(Pid, NewStatus) - end, Devices). - %% 处理相关数据 handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) when is_binary(DeviceUUID), DeviceUUID /= <<>> -> @@ -527,11 +501,17 @@ handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"field -spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return(). report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> + TextMap = #{ + 0 => <<"离线"/utf8>>, + 1 => <<"在线"/utf8>> + }, + %% 设备的状态信息上报给中电 Timestamp = iot_util:timestamp_of_seconds(), FieldsList = [#{ <<"key">> => <<"host_status">>, <<"value">> => NewStatus, + <<"value_text">> => maps:get(NewStatus, TextMap), <<"unit">> => 0, <<"type">> => <<"DI">>, <<"name">> => <<"主机状态"/utf8>>, diff --git a/apps/iot/src/iot_host_sup.erl b/apps/iot/src/iot_host_sup.erl index c3e5b4a..76c28bc 100644 --- a/apps/iot/src/iot_host_sup.erl +++ b/apps/iot/src/iot_host_sup.erl @@ -37,7 +37,8 @@ ensured_host_started(UUID) when is_binary(UUID) -> delete_host(UUID) when is_binary(UUID) -> Id = iot_host:get_name(UUID), - supervisor:terminate_child(?MODULE, Id). + ok = supervisor:terminate_child(?MODULE, Id), + supervisor:delete_child(?MODULE, Id). child_spec(UUID) -> Id = iot_host:get_name(UUID), diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index a61c3b9..2e16bf4 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -39,6 +39,15 @@ init([]) -> modules => ['iot_logger'] }, + #{ + id => 'iot_database_buffer', + start => {'iot_database_buffer', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_database_buffer'] + }, + #{ id => 'iot_device_sup', start => {'iot_device_sup', start_link, []}, diff --git a/config/sys-dev.config b/config/sys-dev.config index c38304f..6e49305 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -65,7 +65,8 @@ [{size, 10}, {max_overflow, 20}, {worker_module, eredis}], [ {host, "127.0.0.1"}, - {port, 26379} + {port, 26379}, + {database, 1} ] }, diff --git a/config/sys-prod.config b/config/sys-prod.config index 03124d9..8d015b2 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -54,7 +54,8 @@ [{size, 10}, {max_overflow, 20}, {worker_module, eredis}], [ {host, "172.19.0.7"}, - {port, 6379} + {port, 6379}, + {database, 1} ] },