From 97be797566f27de0bd7a74ad3bd6edf1bb414b52 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Thu, 17 Aug 2023 11:34:27 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4=E4=B8=80=E8=87=B4=E6=80=A7?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 8 ++ apps/iot/src/iot_device.erl | 34 ++++----- apps/iot/src/iot_device_sup.erl | 5 +- apps/iot/src/iot_host.erl | 38 +++++++--- apps/iot/src/iot_host_monitor.erl | 121 ------------------------------ apps/iot/src/iot_sup.erl | 9 --- 6 files changed, 51 insertions(+), 164 deletions(-) delete mode 100644 apps/iot/src/iot_host_monitor.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 6f3ba99..aa426ca 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -8,6 +8,14 @@ %%%------------------------------------------------------------------- -author("licheng5"). +%% 主机是否在线 +-define(HOST_OFFLINE, 0). +-define(HOST_ONLINE, 1). + +%% 设备是否在线状态 +-define(DEVICE_OFFLINE, 0). +-define(DEVICE_ONLINE, 1). + %% 下发的任务状态 -define(TASK_STATUS_INIT, -1). %% 未接入 -define(TASK_STATUS_FAILED, 0). %% 离线 diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 1f83735..597b0d9 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -8,6 +8,7 @@ %%%------------------------------------------------------------------- -module(iot_device). -author("aresei"). +-include("iot.hrl"). -behaviour(gen_statem). @@ -21,10 +22,6 @@ %% 周期性的同步设备状态 -define(RELOAD_TICKER, 100 * 1000). -%% 终端是否在线 --define(DEVICE_STATUS_OFFLINE, 0). --define(DEVICE_STATUS_ONLINE, 1). - %% 终端是否授权 -define(DEVICE_AUTH_DENIED, 0). -define(DEVICE_AUTH_AUTHED, 1). @@ -36,7 +33,7 @@ -record(state, { device_id :: integer(), device_uuid :: binary(), - status = ?DEVICE_STATUS_ONLINE + status = ?DEVICE_ONLINE }). %%%=================================================================== @@ -105,11 +102,10 @@ init0(#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStat false -> ?STATE_DENIED; true -> ?STATE_ACTIVATED end, - %% 重启时,离线状态 - {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE), + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_OFFLINE), lager:debug("[iot_device] started device: ~p, state_name: ~p", [DeviceUUID, StateName]), - {ok, StateName, #state{device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_ONLINE}}. + {ok, StateName, #state{device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_OFFLINE}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out %% the callback mode of the callback module. @@ -134,16 +130,16 @@ handle_event({call, From}, is_activated, StateName, State = #state{}) -> {keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]}; %% 改变数据库的状态, 离线状态事件必须执行(主动触发离线的情况很少,不会造成数据库眼里) -handle_event(cast, {change_status, ?DEVICE_STATUS_OFFLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) -> - {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), - {keep_state, State#state{status = ?DEVICE_STATUS_OFFLINE}}; +handle_event(cast, {change_status, ?DEVICE_OFFLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) -> + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_OFFLINE), + {keep_state, State#state{status = ?DEVICE_OFFLINE}}; %% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略 -handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_STATUS_ONLINE}) -> +handle_event(cast, {change_status, ?DEVICE_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_ONLINE}) -> {keep_state, State}; %% 其他情况下需要修改数据状态为在线状态 -handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) -> - {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE), - {keep_state, State#state{status = ?DEVICE_STATUS_ONLINE}}; +handle_event(cast, {change_status, ?DEVICE_ONLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) -> + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_ONLINE), + {keep_state, State#state{status = ?DEVICE_ONLINE}}; %% 其他状态下不存在在线状态的变化 handle_event(cast, {change_status, _}, _, State = #state{}) -> {keep_state, State}; @@ -158,9 +154,9 @@ handle_event(cast, {auth, Auth}, ?STATE_DENIED, State = #state{device_id = Devic case Auth of true -> %% 需要矫正status字段的值 - {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_ONLINE), lager:debug("[iot_device] device_uuid: ~p, auth: true, state_name from ~p, to: ~p", [DeviceUUID, ?STATE_DENIED, ?STATE_ACTIVATED]), - {next_state, ?STATE_ACTIVATED, State#state{status = ?DEVICE_STATUS_OFFLINE}}; + {next_state, ?STATE_ACTIVATED, State#state{status = ?DEVICE_ONLINE}}; false -> lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]), {keep_state, State} @@ -173,10 +169,10 @@ handle_event(cast, {auth, Auth}, ?STATE_ACTIVATED, State = #state{device_id = De {keep_state, State}; false -> %% 需要矫正status字段的值 - {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE), + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_OFFLINE), lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]), - {next_state, ?STATE_DENIED, State#state{status = ?DEVICE_STATUS_ONLINE}} + {next_state, ?STATE_DENIED, State#state{status = ?DEVICE_OFFLINE}} end; handle_event(info, {timeout, _, reload_ticker}, _, State) -> diff --git a/apps/iot/src/iot_device_sup.erl b/apps/iot/src/iot_device_sup.erl index 6bfc133..b863da3 100644 --- a/apps/iot/src/iot_device_sup.erl +++ b/apps/iot/src/iot_device_sup.erl @@ -15,10 +15,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - {ok, Devices} = device_bo:get_all_devices(), - Specs = lists:map(fun child_spec/1, Devices), - - {ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}. + {ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, []}}. -spec start_device(UUID :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. start_device(DeviceUUID) when is_binary(DeviceUUID) -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 1a24843..4841c7c 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -12,10 +12,6 @@ -behaviour(gen_statem). -%% 主机是否在线 --define(HOST_OFFLINE, 0). --define(HOST_ONLINE, 1). - %% 主机是否授权 -define(HOST_DENIED, 0). -define(HOST_AUTHED, 1). @@ -124,14 +120,22 @@ init([UUID]) -> case host_bo:get_host_by_uuid(UUID) of {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := HostId}} -> - %% 注册监听事件 - iot_host_monitor:register(self(), HostId), - Aes = list_to_binary(iot_util:rand_bytes(32)), StateName = case AuthorizeStatus =:= ?HOST_AUTHED of false -> denied; true -> activated end, + %% 启动主机相关的device,此时device的状态为离线状态 + {ok, Devices} = device_bo:get_host_devices(HostId), + lists:foreach(fun(Device = #{<<"device_uuid">> := DeviceUUID}) -> + case iot_device_sup:start_device(Device) of + {ok, DevicePid} -> + iot_device:change_status(DevicePid, ?DEVICE_OFFLINE); + {error, Reason} -> + lager:notice("[iot_host] host: ~p, start_device: ~p, get error: ~p", [UUID, DeviceUUID, Reason]) + end + end, Devices), + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), @@ -194,8 +198,10 @@ handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) -> end; %% 关闭授权 -handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID}) -> +handle_event({call, From}, {activate, false}, _, State = #state{host_id = HostId, uuid = UUID}) -> {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_OFFLINE), + {next_state, denied, State, [{reply, From, ok}]}; %% 开启授权 handle_event({call, From}, {activate, true}, denied, State) -> @@ -225,11 +231,12 @@ handle_event({call, From}, {create_session, PubKey}, denied, State = #state{uuid EncReply = iot_cipher_rsa:encode(Reply, PubKey), {keep_state, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; -handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{uuid = UUID, aes = Aes}) -> +handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{host_id = HostId, uuid = UUID, aes = Aes}) -> Reply = #{<<"a">> => true, <<"aes">> => Aes}, EncReply = iot_cipher_rsa:encode(Reply, PubKey), {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [StateName, UUID, AffectedRow]), + change_devices_status(HostId, ?DEVICE_ONLINE), {next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; @@ -333,9 +340,10 @@ handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES} {keep_state, State}; %% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 -handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) -> +handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{host_id = HostId, uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) -> lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]), {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_OFFLINE), %% 会话状态如果链接丢失,需要切换到activated状态,其他情况保持不变 case StateName =:= session of @@ -355,9 +363,10 @@ handle_event(EventType, EventContent, StateName, State) -> %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. -terminate(Reason, StateName, _State = #state{uuid = UUID}) -> +terminate(Reason, StateName, _State = #state{host_id = HostId, uuid = UUID}) -> lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]), host_bo:change_status(UUID, ?HOST_OFFLINE), + change_devices_status(HostId, ?DEVICE_OFFLINE), ok. %% @private @@ -369,6 +378,13 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) -> + {ok, Devices} = device_bo:get_host_devices(HostId), + lists:foreach(fun(#{<<"device_uuid">> := DeviceUUID}) -> + Pid = iot_device:get_pid(DeviceUUID), + iot_device:change_status(Pid, NewStatus) + end, Devices). + %% 处理相关数据 handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) when is_binary(DeviceUUID), DeviceUUID /= <<>> -> diff --git a/apps/iot/src/iot_host_monitor.erl b/apps/iot/src/iot_host_monitor.erl deleted file mode 100644 index 858c5e7..0000000 --- a/apps/iot/src/iot_host_monitor.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% 观察主机,当主机下线时;主机下面的所有设备均需要处于离线状态 -%%% @end -%%% Created : 16. 8月 2023 14:47 -%%%------------------------------------------------------------------- --module(iot_host_monitor). --author("aresei"). - --behaviour(gen_server). - -%% API --export([start_link/0]). --export([register/2]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - register_map = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -register(HostPid, HostId) when is_pid(HostPid), is_integer(HostId) -> - gen_server:call(?MODULE, {register, HostPid, HostId}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call({register, HostPid, HostId}, _From, State = #state{register_map = RegisterMap}) -> - erlang:monitor(process, HostPid), - {reply, ok, State#state{register_map = maps:put(HostPid, HostId, RegisterMap)}}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 -handle_info({'DOWN', _Ref, process, HostPid, Reason}, State = #state{register_map = RegisterMap}) -> - lager:warning("[iot_host_monitor] host: ~p, down with reason: ~p", [HostPid, Reason]), - case maps:take(HostPid, RegisterMap) of - error -> - {noreply, State}; - {HostId, NRegisterMap} -> - case device_bo:get_host_devices(HostId) of - {ok, Devices} -> - DevicesUUIDs = lists:map(fun(#{<<"device_uuid">> := DeviceUUID}) -> DeviceUUID end, Devices), - lists:foreach(fun(DeviceUUID) -> - DevicePid = iot_device:get_pid(DeviceUUID), - iot_device:change_status(DevicePid, 0) - end, DevicesUUIDs); - {error, _} -> - ok - end, - {noreply, State#state{register_map = NRegisterMap}} - end. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index e11c993..96707ee 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -37,15 +37,6 @@ init([]) -> modules => ['iot_endpoint_monitor'] }, - #{ - id => iot_host_monitor, - start => {'iot_host_monitor', start_link, []}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => ['iot_host_monitor'] - }, - #{ id => 'iot_endpoint_sup', start => {'iot_endpoint_sup', start_link, []},