调整一致性

This commit is contained in:
anlicheng 2023-08-17 11:34:27 +08:00
parent 7df30eaf75
commit 97be797566
6 changed files with 51 additions and 164 deletions

View File

@ -8,6 +8,14 @@
%%%-------------------------------------------------------------------
-author("licheng5").
%% 线
-define(HOST_OFFLINE, 0).
-define(HOST_ONLINE, 1).
%% 线
-define(DEVICE_OFFLINE, 0).
-define(DEVICE_ONLINE, 1).
%%
-define(TASK_STATUS_INIT, -1). %%
-define(TASK_STATUS_FAILED, 0). %% 线

View File

@ -8,6 +8,7 @@
%%%-------------------------------------------------------------------
-module(iot_device).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
@ -21,10 +22,6 @@
%%
-define(RELOAD_TICKER, 100 * 1000).
%% 线
-define(DEVICE_STATUS_OFFLINE, 0).
-define(DEVICE_STATUS_ONLINE, 1).
%%
-define(DEVICE_AUTH_DENIED, 0).
-define(DEVICE_AUTH_AUTHED, 1).
@ -36,7 +33,7 @@
-record(state, {
device_id :: integer(),
device_uuid :: binary(),
status = ?DEVICE_STATUS_ONLINE
status = ?DEVICE_ONLINE
}).
%%%===================================================================
@ -105,11 +102,10 @@ init0(#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStat
false -> ?STATE_DENIED;
true -> ?STATE_ACTIVATED
end,
%% 线
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE),
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_OFFLINE),
lager:debug("[iot_device] started device: ~p, state_name: ~p", [DeviceUUID, StateName]),
{ok, StateName, #state{device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_ONLINE}}.
{ok, StateName, #state{device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_OFFLINE}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
@ -134,16 +130,16 @@ handle_event({call, From}, is_activated, StateName, State = #state{}) ->
{keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]};
%% , 线(线)
handle_event(cast, {change_status, ?DEVICE_STATUS_OFFLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) ->
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
{keep_state, State#state{status = ?DEVICE_STATUS_OFFLINE}};
handle_event(cast, {change_status, ?DEVICE_OFFLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) ->
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_OFFLINE),
{keep_state, State#state{status = ?DEVICE_OFFLINE}};
%% 线线
handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_STATUS_ONLINE}) ->
handle_event(cast, {change_status, ?DEVICE_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_ONLINE}) ->
{keep_state, State};
%% 线
handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) ->
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE),
{keep_state, State#state{status = ?DEVICE_STATUS_ONLINE}};
handle_event(cast, {change_status, ?DEVICE_ONLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) ->
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_ONLINE),
{keep_state, State#state{status = ?DEVICE_ONLINE}};
%% 线
handle_event(cast, {change_status, _}, _, State = #state{}) ->
{keep_state, State};
@ -158,9 +154,9 @@ handle_event(cast, {auth, Auth}, ?STATE_DENIED, State = #state{device_id = Devic
case Auth of
true ->
%% status字段的值
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_ONLINE),
lager:debug("[iot_device] device_uuid: ~p, auth: true, state_name from ~p, to: ~p", [DeviceUUID, ?STATE_DENIED, ?STATE_ACTIVATED]),
{next_state, ?STATE_ACTIVATED, State#state{status = ?DEVICE_STATUS_OFFLINE}};
{next_state, ?STATE_ACTIVATED, State#state{status = ?DEVICE_ONLINE}};
false ->
lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]),
{keep_state, State}
@ -173,10 +169,10 @@ handle_event(cast, {auth, Auth}, ?STATE_ACTIVATED, State = #state{device_id = De
{keep_state, State};
false ->
%% status字段的值
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE),
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_OFFLINE),
lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]),
{next_state, ?STATE_DENIED, State#state{status = ?DEVICE_STATUS_ONLINE}}
{next_state, ?STATE_DENIED, State#state{status = ?DEVICE_OFFLINE}}
end;
handle_event(info, {timeout, _, reload_ticker}, _, State) ->

View File

@ -15,10 +15,7 @@ start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
{ok, Devices} = device_bo:get_all_devices(),
Specs = lists:map(fun child_spec/1, Devices),
{ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}.
{ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, []}}.
-spec start_device(UUID :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
start_device(DeviceUUID) when is_binary(DeviceUUID) ->

View File

@ -12,10 +12,6 @@
-behaviour(gen_statem).
%% 线
-define(HOST_OFFLINE, 0).
-define(HOST_ONLINE, 1).
%%
-define(HOST_DENIED, 0).
-define(HOST_AUTHED, 1).
@ -124,14 +120,22 @@ init([UUID]) ->
case host_bo:get_host_by_uuid(UUID) of
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := HostId}} ->
%%
iot_host_monitor:register(self(), HostId),
Aes = list_to_binary(iot_util:rand_bytes(32)),
StateName = case AuthorizeStatus =:= ?HOST_AUTHED of
false -> denied;
true -> activated
end,
%% devicedevice的状态为离线状态
{ok, Devices} = device_bo:get_host_devices(HostId),
lists:foreach(fun(Device = #{<<"device_uuid">> := DeviceUUID}) ->
case iot_device_sup:start_device(Device) of
{ok, DevicePid} ->
iot_device:change_status(DevicePid, ?DEVICE_OFFLINE);
{error, Reason} ->
lager:notice("[iot_host] host: ~p, start_device: ~p, get error: ~p", [UUID, DeviceUUID, Reason])
end
end, Devices),
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
@ -194,8 +198,10 @@ handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
end;
%%
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID}) ->
handle_event({call, From}, {activate, false}, _, State = #state{host_id = HostId, uuid = UUID}) ->
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_OFFLINE),
{next_state, denied, State, [{reply, From, ok}]};
%%
handle_event({call, From}, {activate, true}, denied, State) ->
@ -225,11 +231,12 @@ handle_event({call, From}, {create_session, PubKey}, denied, State = #state{uuid
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{keep_state, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{uuid = UUID, aes = Aes}) ->
handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{host_id = HostId, uuid = UUID, aes = Aes}) ->
Reply = #{<<"a">> => true, <<"aes">> => Aes},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
lager:debug("[iot_host] host_id(~p) uuid: ~p, create_session, will change status, affected_row: ~p", [StateName, UUID, AffectedRow]),
change_devices_status(HostId, ?DEVICE_ONLINE),
{next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
@ -333,9 +340,10 @@ handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES}
{keep_state, State};
%% websocket断开的时候线;
handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) ->
handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{host_id = HostId, uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) ->
lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]),
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_OFFLINE),
%% activated状态
case StateName =:= session of
@ -355,9 +363,10 @@ handle_event(EventType, EventContent, StateName, State) ->
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, StateName, _State = #state{uuid = UUID}) ->
terminate(Reason, StateName, _State = #state{host_id = HostId, uuid = UUID}) ->
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]),
host_bo:change_status(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_OFFLINE),
ok.
%% @private
@ -369,6 +378,13 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) ->
{ok, Devices} = device_bo:get_host_devices(HostId),
lists:foreach(fun(#{<<"device_uuid">> := DeviceUUID}) ->
Pid = iot_device:get_pid(DeviceUUID),
iot_device:change_status(Pid, NewStatus)
end, Devices).
%%
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID})
when is_binary(DeviceUUID), DeviceUUID /= <<>> ->

View File

@ -1,121 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% 线线
%%% @end
%%% Created : 16. 8 2023 14:47
%%%-------------------------------------------------------------------
-module(iot_host_monitor).
-author("aresei").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([register/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
register_map = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
register(HostPid, HostId) when is_pid(HostPid), is_integer(HostId) ->
gen_server:call(?MODULE, {register, HostPid, HostId}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({register, HostPid, HostId}, _From, State = #state{register_map = RegisterMap}) ->
erlang:monitor(process, HostPid),
{reply, ok, State#state{register_map = maps:put(HostPid, HostId, RegisterMap)}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% websocket断开的时候线;
handle_info({'DOWN', _Ref, process, HostPid, Reason}, State = #state{register_map = RegisterMap}) ->
lager:warning("[iot_host_monitor] host: ~p, down with reason: ~p", [HostPid, Reason]),
case maps:take(HostPid, RegisterMap) of
error ->
{noreply, State};
{HostId, NRegisterMap} ->
case device_bo:get_host_devices(HostId) of
{ok, Devices} ->
DevicesUUIDs = lists:map(fun(#{<<"device_uuid">> := DeviceUUID}) -> DeviceUUID end, Devices),
lists:foreach(fun(DeviceUUID) ->
DevicePid = iot_device:get_pid(DeviceUUID),
iot_device:change_status(DevicePid, 0)
end, DevicesUUIDs);
{error, _} ->
ok
end,
{noreply, State#state{register_map = NRegisterMap}}
end.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -37,15 +37,6 @@ init([]) ->
modules => ['iot_endpoint_monitor']
},
#{
id => iot_host_monitor,
start => {'iot_host_monitor', start_link, []},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => ['iot_host_monitor']
},
#{
id => 'iot_endpoint_sup',
start => {'iot_endpoint_sup', start_link, []},