From feebc5079345f2938f9fccec6a272eb22e382d70 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Wed, 16 Aug 2023 12:24:03 +0800 Subject: [PATCH] fix device --- apps/iot/src/database/device_bo.erl | 6 +- apps/iot/src/iot_device.erl | 29 +++++++--- apps/iot/src/iot_device_sup.erl | 53 +++++++++++++++++ apps/iot/src/iot_host.erl | 90 ++++++++++------------------- apps/iot/src/iot_sup.erl | 9 +++ 5 files changed, 119 insertions(+), 68 deletions(-) create mode 100644 apps/iot/src/iot_device_sup.erl diff --git a/apps/iot/src/database/device_bo.erl b/apps/iot/src/database/device_bo.erl index e7600cd..b7266b5 100644 --- a/apps/iot/src/database/device_bo.erl +++ b/apps/iot/src/database/device_bo.erl @@ -11,7 +11,11 @@ -include("iot.hrl"). %% API --export([get_host_devices/1, get_device_by_uuid/1, change_status/2, get_host_by_uuid/1]). +-export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2, get_host_by_uuid/1]). + +-spec get_all_devices() -> {ok, Devices :: [map()]} | {error, Reason :: any()}. +get_all_devices() -> + mysql_pool:get_all(mysql_iot, <<"SELECT * FROM device WHERE device_uuid != ''">>). -spec get_host_devices(HostId :: integer()) -> {ok, Devices::list()} | {error, Reason::any()}. get_host_devices(HostId) when is_integer(HostId) -> diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index d31cda1..ea66a66 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -12,6 +12,7 @@ -behaviour(gen_statem). %% API +-export([get_name/1, get_pid/1]). -export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2, stop/1]). %% gen_statem callbacks @@ -33,7 +34,6 @@ -define(STATE_ACTIVATED, activated). -record(state, { - parent_pid :: pid(), device_id :: integer(), device_uuid :: binary(), status = ?DEVICE_STATUS_OFFLINE @@ -43,6 +43,14 @@ %%% API %%%=================================================================== +-spec get_pid(DeviceUUID :: binary()) -> Pid :: pid() | undefined. +get_pid(DeviceUUID) when is_binary(DeviceUUID) -> + whereis(get_name(DeviceUUID)). + +-spec get_name(DeviceUUID :: binary()) -> atom(). +get_name(DeviceUUID) when is_binary(DeviceUUID) -> + binary_to_atom(<<"iot_device:", DeviceUUID/binary>>). + -spec is_activated(Pid :: pid()) -> boolean(). is_activated(Pid) when is_pid(Pid) -> gen_statem:call(Pid, is_activated). @@ -66,8 +74,8 @@ stop(Pid) when is_pid(Pid) -> %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(ParentPid, DeviceUUID) when is_pid(ParentPid), is_binary(DeviceUUID) -> - gen_statem:start_link(?MODULE, [ParentPid, DeviceUUID], []). +start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) -> + gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []). %%%=================================================================== %%% gen_statem callbacks @@ -77,11 +85,18 @@ start_link(ParentPid, DeviceUUID) when is_pid(ParentPid), is_binary(DeviceUUID) %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([ParentPid, DeviceUUID]) -> +init([DeviceUUID]) when is_binary(DeviceUUID) -> + case device_bo:get_device_by_uuid(DeviceUUID) of + {ok, DeviceInfo} -> + init0(DeviceInfo); + undefined -> + ignore + end; +init([DeviceInfo]) when is_map(DeviceInfo) -> + init0(DeviceInfo). +init0(#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"id">> := DeviceId}) -> %% 定期同步数据库的状态信息 erlang:start_timer(?RELOAD_TICKER, self(), reload_ticker), - - {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := DeviceId}} = device_bo:get_device_by_uuid(DeviceUUID), StateName = case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of false -> ?STATE_DENIED; true -> ?STATE_ACTIVATED @@ -90,7 +105,7 @@ init([ParentPid, DeviceUUID]) -> {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), lager:debug("[iot_device] started device: ~p, state_name: ~p", [DeviceUUID, StateName]), - {ok, StateName, #state{parent_pid = ParentPid, device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_OFFLINE}}. + {ok, StateName, #state{device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_OFFLINE}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out %% the callback mode of the callback module. diff --git a/apps/iot/src/iot_device_sup.erl b/apps/iot/src/iot_device_sup.erl new file mode 100644 index 0000000..6bfc133 --- /dev/null +++ b/apps/iot/src/iot_device_sup.erl @@ -0,0 +1,53 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% @end +%%%------------------------------------------------------------------- +-module(iot_device_sup). +-include("iot.hrl"). + +-behaviour(supervisor). + +-export([start_link/0, init/1, delete_device/1, start_device/1]). + +start_link() -> + supervisor:start_link({local, ?MODULE}, ?MODULE, []). + +init([]) -> + {ok, Devices} = device_bo:get_all_devices(), + Specs = lists:map(fun child_spec/1, Devices), + + {ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}. + +-spec start_device(UUID :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_device(DeviceUUID) when is_binary(DeviceUUID) -> + case supervisor:start_child(?MODULE, child_spec(DeviceUUID)) of + {ok, Pid} when is_pid(Pid) -> + {ok, Pid}; + {error, {'already_started', Pid}} when is_pid(Pid) -> + {ok, Pid}; + {error, Error} -> + {error, Error} + end. + +delete_device(UUID) when is_binary(UUID) -> + Id = iot_device:get_name(UUID), + supervisor:terminate_child(?MODULE, Id). + +child_spec(Device = #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) -> + Name = iot_device:get_name(DeviceUUID), + #{id => Name, + start => {iot_device, start_link, [Name, Device]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_device']}; +child_spec(DeviceUUID) when is_binary(DeviceUUID) -> + Name = iot_device:get_name(DeviceUUID), + #{id => Name, + start => {iot_device, start_link, [Name, DeviceUUID]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_device']}. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index c644dc6..0e3b485 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -32,8 +32,6 @@ host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), - %% 建立到和device之间的映射关系 - device_map = #{}, %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), @@ -147,16 +145,7 @@ init([UUID]) -> false -> denied; true -> activated end, - - %% 启动所有的device - {ok, Devices} = device_bo:get_host_devices(HostId), - lager:debug("[iot_host] uuid: ~p, loaded devices: ~p", [UUID, Devices]), - DeviceMap = lists:foldl(fun(#{<<"device_uuid">> := DeviceUUID}, M) -> - {ok, Pid} = iot_device:start_link(self(), DeviceUUID), - maps:put(DeviceUUID, Pid, M) - end, #{}, Devices), - - {ok, StateName, #state{host_id = HostId, device_map = DeviceMap, uuid = UUID, aes = Aes}}; + {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -258,38 +247,44 @@ handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{u {next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; %% 处理设备相关 -handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) -> - case maps:get(DeviceUUID, DeviceMap, undefined) of +handle_event({call, From}, {reload_device, DeviceUUID}, _, State) -> + case iot_device:get_pid(DeviceUUID) of undefined -> - {ok, Pid} = iot_device:start_link(self(), DeviceUUID), - NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap), - {keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]}; - DevicePid -> + case iot_device_sup:start_device(DeviceUUID) of + {ok, DevicePid} when is_pid(DevicePid) -> + {keep_state, State, [{reply, From, ok}]}; + {error, Reason} -> + {keep_state, State, [{reply, From, {error, Reason}}]} + end; + DevicePid when is_pid(DevicePid) -> iot_device:reload(DevicePid), {keep_state, State, [{reply, From, ok}]} end; -handle_event({call, From}, {auth_device, DeviceUUID, Auth}, _, State = #state{device_map = DeviceMap}) -> - case maps:get(DeviceUUID, DeviceMap, undefined) of +handle_event({call, From}, {auth_device, DeviceUUID, Auth}, _, State = #state{uuid = UUID}) -> + case iot_device:get_pid(DeviceUUID) of undefined -> - {ok, Pid} = iot_device:start_link(self(), DeviceUUID), - iot_device:auth(Pid, Auth), - - NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap), - {keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]}; + case iot_device_sup:start_device(DeviceUUID) of + {ok, Pid} -> + iot_device:auth(Pid, Auth), + {keep_state, State, [{reply, From, ok}]}; + {error, Reason} -> + lager:notice("[iot_host] host: ~p, auth_device, try start_device get error: ~p", [UUID, Reason]), + {keep_state, State, [{reply, From, {error, Reason}}]} + end; DevicePid -> iot_device:auth(DevicePid, Auth), {keep_state, State, [{reply, From, ok}]} end; -handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{uuid = UUID, device_map = DeviceMap}) -> - case maps:take(DeviceUUID, DeviceMap) of - error -> +handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{uuid = UUID}) -> + case iot_device:get_pid(DeviceUUID) of + undefined -> lager:notice("[iot_host] uuid: ~p, delete device: ~p, not found", [UUID, DeviceUUID]), {keep_state, State, [{reply, From, ok}]}; - {DevicePid, NDeviceMap} -> - iot_device:stop(DevicePid), - {keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]} + DevicePid when is_pid(DevicePid) -> + iot_device_sup:delete_device(DeviceUUID), + {keep_state, State, [{reply, From, ok}]} end; %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 @@ -378,11 +373,11 @@ handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{a end, {keep_state, State}; -handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES, device_map = DeviceMap}) -> +handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES, devices = Devices}) -> EventText = iot_cipher_aes:decrypt(AES, Event0), case catch jiffy:decode(EventText, [return_maps]) of #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} -> - case maps:get(DeviceUUID, DeviceMap, undefined) of + case lists:member(DeviceUUID, Devices) of undefined -> ok; Pid -> @@ -408,31 +403,6 @@ handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State {keep_state, State#state{channel_pid = undefined}} end; -%% 处理device进程的退出 -handle_event(info, {'EXIT', Pid, Reason}, StateName, State = #state{uuid = UUID, device_map = DeviceMap}) -> - lager:warning("[iot_host] uuid: ~p, device pid: ~p, exit with reason: ~p, state name: ~p, state: ~p", [UUID, Pid, Reason, StateName, State]), - case lists:search(fun({_, Pid0}) -> Pid =:= Pid0 end, maps:to_list(DeviceMap)) of - false -> - {keep_state, State}; - {value, {DeviceUUID, _}} -> - %% 避免数据异常,导致一直重启 - erlang:start_timer(5000, self(), {restart_device, DeviceUUID}), - {keep_state, State#state{device_map = maps:remove(DeviceUUID, DeviceMap)}} - end; - -handle_event(info, {timeout, _, {restart_device, DeviceUUID}}, _, State = #state{uuid = UUID, device_map = DeviceMap}) -> - %% 重启device进程不一定能成功,可能重启时,数据库已经删除了 - case catch iot_device:start_link(self(), DeviceUUID) of - {ok, DevicePid} -> - lager:debug("[iot_host] uuid: ~p, restart device: ~p, success", [UUID, DeviceUUID]), - %% 采用的是同样的DeviceUUID,因此老的值会被覆盖 - NDeviceMap = maps:put(DeviceUUID, DevicePid, DeviceMap), - {keep_state, State#state{device_map = NDeviceMap}}; - Error -> - lager:warning("[iot_host] uuid: ~p, restart device: ~p, get error: ~p", [UUID, DeviceUUID, Error]), - {keep_state, State} - end; - handle_event(EventType, EventContent, StateName, State) -> lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), @@ -458,10 +428,10 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== %% 处理相关数据 -handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID, device_map = DeviceMap}) +handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) when is_binary(DeviceUUID), DeviceUUID /= <<>> -> - case maps:get(DeviceUUID, DeviceMap, undefined) of + case iot_device:get_pid(DeviceUUID) of undefined -> ok; Pid -> diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 02f967d..96707ee 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -46,6 +46,15 @@ init([]) -> modules => ['iot_endpoint_sup'] }, + #{ + id => 'iot_device_sup', + start => {'iot_device_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_device_sup'] + }, + #{ id => 'iot_host_sup', start => {'iot_host_sup', start_link, []},