fix device
This commit is contained in:
parent
5cc6665a3d
commit
feebc50793
@ -11,7 +11,11 @@
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_host_devices/1, get_device_by_uuid/1, change_status/2, get_host_by_uuid/1]).
|
||||
-export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2, get_host_by_uuid/1]).
|
||||
|
||||
-spec get_all_devices() -> {ok, Devices :: [map()]} | {error, Reason :: any()}.
|
||||
get_all_devices() ->
|
||||
mysql_pool:get_all(mysql_iot, <<"SELECT * FROM device WHERE device_uuid != ''">>).
|
||||
|
||||
-spec get_host_devices(HostId :: integer()) -> {ok, Devices::list()} | {error, Reason::any()}.
|
||||
get_host_devices(HostId) when is_integer(HostId) ->
|
||||
|
||||
@ -12,6 +12,7 @@
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([get_name/1, get_pid/1]).
|
||||
-export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2, stop/1]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
@ -33,7 +34,6 @@
|
||||
-define(STATE_ACTIVATED, activated).
|
||||
|
||||
-record(state, {
|
||||
parent_pid :: pid(),
|
||||
device_id :: integer(),
|
||||
device_uuid :: binary(),
|
||||
status = ?DEVICE_STATUS_OFFLINE
|
||||
@ -43,6 +43,14 @@
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec get_pid(DeviceUUID :: binary()) -> Pid :: pid() | undefined.
|
||||
get_pid(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
whereis(get_name(DeviceUUID)).
|
||||
|
||||
-spec get_name(DeviceUUID :: binary()) -> atom().
|
||||
get_name(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
binary_to_atom(<<"iot_device:", DeviceUUID/binary>>).
|
||||
|
||||
-spec is_activated(Pid :: pid()) -> boolean().
|
||||
is_activated(Pid) when is_pid(Pid) ->
|
||||
gen_statem:call(Pid, is_activated).
|
||||
@ -66,8 +74,8 @@ stop(Pid) when is_pid(Pid) ->
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link(ParentPid, DeviceUUID) when is_pid(ParentPid), is_binary(DeviceUUID) ->
|
||||
gen_statem:start_link(?MODULE, [ParentPid, DeviceUUID], []).
|
||||
start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) ->
|
||||
gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
@ -77,11 +85,18 @@ start_link(ParentPid, DeviceUUID) when is_pid(ParentPid), is_binary(DeviceUUID)
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([ParentPid, DeviceUUID]) ->
|
||||
init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
||||
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||
{ok, DeviceInfo} ->
|
||||
init0(DeviceInfo);
|
||||
undefined ->
|
||||
ignore
|
||||
end;
|
||||
init([DeviceInfo]) when is_map(DeviceInfo) ->
|
||||
init0(DeviceInfo).
|
||||
init0(#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"id">> := DeviceId}) ->
|
||||
%% 定期同步数据库的状态信息
|
||||
erlang:start_timer(?RELOAD_TICKER, self(), reload_ticker),
|
||||
|
||||
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := DeviceId}} = device_bo:get_device_by_uuid(DeviceUUID),
|
||||
StateName = case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
|
||||
false -> ?STATE_DENIED;
|
||||
true -> ?STATE_ACTIVATED
|
||||
@ -90,7 +105,7 @@ init([ParentPid, DeviceUUID]) ->
|
||||
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
|
||||
lager:debug("[iot_device] started device: ~p, state_name: ~p", [DeviceUUID, StateName]),
|
||||
|
||||
{ok, StateName, #state{parent_pid = ParentPid, device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_OFFLINE}}.
|
||||
{ok, StateName, #state{device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_OFFLINE}}.
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
|
||||
53
apps/iot/src/iot_device_sup.erl
Normal file
53
apps/iot/src/iot_device_sup.erl
Normal file
@ -0,0 +1,53 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%% @end
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_device_sup).
|
||||
-include("iot.hrl").
|
||||
|
||||
-behaviour(supervisor).
|
||||
|
||||
-export([start_link/0, init/1, delete_device/1, start_device/1]).
|
||||
|
||||
start_link() ->
|
||||
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
|
||||
|
||||
init([]) ->
|
||||
{ok, Devices} = device_bo:get_all_devices(),
|
||||
Specs = lists:map(fun child_spec/1, Devices),
|
||||
|
||||
{ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}.
|
||||
|
||||
-spec start_device(UUID :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
|
||||
start_device(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
case supervisor:start_child(?MODULE, child_spec(DeviceUUID)) of
|
||||
{ok, Pid} when is_pid(Pid) ->
|
||||
{ok, Pid};
|
||||
{error, {'already_started', Pid}} when is_pid(Pid) ->
|
||||
{ok, Pid};
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
delete_device(UUID) when is_binary(UUID) ->
|
||||
Id = iot_device:get_name(UUID),
|
||||
supervisor:terminate_child(?MODULE, Id).
|
||||
|
||||
child_spec(Device = #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) ->
|
||||
Name = iot_device:get_name(DeviceUUID),
|
||||
#{id => Name,
|
||||
start => {iot_device, start_link, [Name, Device]},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['iot_device']};
|
||||
child_spec(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
Name = iot_device:get_name(DeviceUUID),
|
||||
#{id => Name,
|
||||
start => {iot_device, start_link, [Name, DeviceUUID]},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['iot_device']}.
|
||||
@ -32,8 +32,6 @@
|
||||
host_id :: integer(),
|
||||
%% 从数据库里面读取到的数据
|
||||
uuid :: binary(),
|
||||
%% 建立到和device之间的映射关系
|
||||
device_map = #{},
|
||||
%% aes的key, 后续通讯需要基于这个加密
|
||||
aes = <<>> :: binary(),
|
||||
|
||||
@ -147,16 +145,7 @@ init([UUID]) ->
|
||||
false -> denied;
|
||||
true -> activated
|
||||
end,
|
||||
|
||||
%% 启动所有的device
|
||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||
lager:debug("[iot_host] uuid: ~p, loaded devices: ~p", [UUID, Devices]),
|
||||
DeviceMap = lists:foldl(fun(#{<<"device_uuid">> := DeviceUUID}, M) ->
|
||||
{ok, Pid} = iot_device:start_link(self(), DeviceUUID),
|
||||
maps:put(DeviceUUID, Pid, M)
|
||||
end, #{}, Devices),
|
||||
|
||||
{ok, StateName, #state{host_id = HostId, device_map = DeviceMap, uuid = UUID, aes = Aes}};
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}};
|
||||
undefined ->
|
||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||
ignore
|
||||
@ -258,38 +247,44 @@ handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{u
|
||||
{next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
|
||||
%% 处理设备相关
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
{ok, Pid} = iot_device:start_link(self(), DeviceUUID),
|
||||
NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap),
|
||||
{keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]};
|
||||
DevicePid ->
|
||||
case iot_device_sup:start_device(DeviceUUID) of
|
||||
{ok, DevicePid} when is_pid(DevicePid) ->
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||
end;
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
iot_device:reload(DevicePid),
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
|
||||
handle_event({call, From}, {auth_device, DeviceUUID, Auth}, _, State = #state{device_map = DeviceMap}) ->
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
handle_event({call, From}, {auth_device, DeviceUUID, Auth}, _, State = #state{uuid = UUID}) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
{ok, Pid} = iot_device:start_link(self(), DeviceUUID),
|
||||
case iot_device_sup:start_device(DeviceUUID) of
|
||||
{ok, Pid} ->
|
||||
iot_device:auth(Pid, Auth),
|
||||
|
||||
NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap),
|
||||
{keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]};
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
lager:notice("[iot_host] host: ~p, auth_device, try start_device get error: ~p", [UUID, Reason]),
|
||||
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||
end;
|
||||
DevicePid ->
|
||||
iot_device:auth(DevicePid, Auth),
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{uuid = UUID, device_map = DeviceMap}) ->
|
||||
case maps:take(DeviceUUID, DeviceMap) of
|
||||
error ->
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{uuid = UUID}) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
lager:notice("[iot_host] uuid: ~p, delete device: ~p, not found", [UUID, DeviceUUID]),
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{DevicePid, NDeviceMap} ->
|
||||
iot_device:stop(DevicePid),
|
||||
{keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]}
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
iot_device_sup:delete_device(DeviceUUID),
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
|
||||
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到
|
||||
@ -378,11 +373,11 @@ handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{a
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES, device_map = DeviceMap}) ->
|
||||
handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES, devices = Devices}) ->
|
||||
EventText = iot_cipher_aes:decrypt(AES, Event0),
|
||||
case catch jiffy:decode(EventText, [return_maps]) of
|
||||
#{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} ->
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
case lists:member(DeviceUUID, Devices) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
@ -408,31 +403,6 @@ handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State
|
||||
{keep_state, State#state{channel_pid = undefined}}
|
||||
end;
|
||||
|
||||
%% 处理device进程的退出
|
||||
handle_event(info, {'EXIT', Pid, Reason}, StateName, State = #state{uuid = UUID, device_map = DeviceMap}) ->
|
||||
lager:warning("[iot_host] uuid: ~p, device pid: ~p, exit with reason: ~p, state name: ~p, state: ~p", [UUID, Pid, Reason, StateName, State]),
|
||||
case lists:search(fun({_, Pid0}) -> Pid =:= Pid0 end, maps:to_list(DeviceMap)) of
|
||||
false ->
|
||||
{keep_state, State};
|
||||
{value, {DeviceUUID, _}} ->
|
||||
%% 避免数据异常,导致一直重启
|
||||
erlang:start_timer(5000, self(), {restart_device, DeviceUUID}),
|
||||
{keep_state, State#state{device_map = maps:remove(DeviceUUID, DeviceMap)}}
|
||||
end;
|
||||
|
||||
handle_event(info, {timeout, _, {restart_device, DeviceUUID}}, _, State = #state{uuid = UUID, device_map = DeviceMap}) ->
|
||||
%% 重启device进程不一定能成功,可能重启时,数据库已经删除了
|
||||
case catch iot_device:start_link(self(), DeviceUUID) of
|
||||
{ok, DevicePid} ->
|
||||
lager:debug("[iot_host] uuid: ~p, restart device: ~p, success", [UUID, DeviceUUID]),
|
||||
%% 采用的是同样的DeviceUUID,因此老的值会被覆盖
|
||||
NDeviceMap = maps:put(DeviceUUID, DevicePid, DeviceMap),
|
||||
{keep_state, State#state{device_map = NDeviceMap}};
|
||||
Error ->
|
||||
lager:warning("[iot_host] uuid: ~p, restart device: ~p, get error: ~p", [UUID, DeviceUUID, Error]),
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
handle_event(EventType, EventContent, StateName, State) ->
|
||||
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
|
||||
|
||||
@ -458,10 +428,10 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
%%%===================================================================
|
||||
|
||||
%% 处理相关数据
|
||||
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID, device_map = DeviceMap})
|
||||
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID})
|
||||
when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
|
||||
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
|
||||
@ -46,6 +46,15 @@ init([]) ->
|
||||
modules => ['iot_endpoint_sup']
|
||||
},
|
||||
|
||||
#{
|
||||
id => 'iot_device_sup',
|
||||
start => {'iot_device_sup', start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => supervisor,
|
||||
modules => ['iot_device_sup']
|
||||
},
|
||||
|
||||
#{
|
||||
id => 'iot_host_sup',
|
||||
start => {'iot_host_sup', start_link, []},
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user