修改主机和设备的状态
This commit is contained in:
parent
b797225e1c
commit
298e615794
@ -11,11 +11,12 @@
|
|||||||
%% 主机是否在线
|
%% 主机是否在线
|
||||||
-define(HOST_OFFLINE, 0).
|
-define(HOST_OFFLINE, 0).
|
||||||
-define(HOST_ONLINE, 1).
|
-define(HOST_ONLINE, 1).
|
||||||
|
-define(HOST_NOT_JOINED, -1).
|
||||||
|
|
||||||
%% 设备是否在线状态
|
%% 设备是否在线状态
|
||||||
-define(DEVICE_OFFLINE, 0).
|
-define(DEVICE_OFFLINE, 0).
|
||||||
-define(DEVICE_ONLINE, 1).
|
-define(DEVICE_ONLINE, 1).
|
||||||
-define(DEVICE_UNKNOWN, 2).
|
-define(DEVICE_NOT_JOINED, -1).
|
||||||
|
|
||||||
%% 下发的任务状态
|
%% 下发的任务状态
|
||||||
-define(TASK_STATUS_INIT, -1). %% 未接入
|
-define(TASK_STATUS_INIT, -1). %% 未接入
|
||||||
|
|||||||
@ -32,11 +32,23 @@ get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) ->
|
|||||||
|
|
||||||
%% 修改主机的状态
|
%% 修改主机的状态
|
||||||
-spec change_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
-spec change_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||||
change_status(DeviceUUID, Status) when is_binary(DeviceUUID), is_integer(Status) ->
|
change_status(DeviceUUID, NStatus) when is_binary(DeviceUUID), is_integer(NStatus) ->
|
||||||
case mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [Status, DeviceUUID]) of
|
case change_status0(DeviceUUID, NStatus) of
|
||||||
Result = {ok, _} ->
|
Result = {ok, _} ->
|
||||||
event_logs_bo:insert(?EVENT_DEVICE, DeviceUUID, Status),
|
event_logs_bo:insert(?EVENT_DEVICE, DeviceUUID, NStatus),
|
||||||
Result;
|
Result;
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
change_status0(DeviceUUID, ?DEVICE_ONLINE) when is_binary(DeviceUUID) ->
|
||||||
|
Timestamp = calendar:local_time(),
|
||||||
|
case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]) of
|
||||||
|
{ok, #{<<"status">> := -1}} ->
|
||||||
|
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, access_at = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, Timestamp, DeviceUUID]);
|
||||||
|
{ok, _} ->
|
||||||
|
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, DeviceUUID]);
|
||||||
|
undefined ->
|
||||||
|
{error, <<"device not found">>}
|
||||||
|
end;
|
||||||
|
change_status0(DeviceUUID, ?DEVICE_OFFLINE) when is_binary(DeviceUUID) ->
|
||||||
|
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]).
|
||||||
@ -32,11 +32,24 @@ get_host_by_id(HostId) when is_integer(HostId) ->
|
|||||||
|
|
||||||
%% 修改主机的状态
|
%% 修改主机的状态
|
||||||
-spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
-spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||||
change_status(UUID, Status) when is_binary(UUID), is_integer(Status) ->
|
change_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) ->
|
||||||
case mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]) of
|
case change_status0(UUID, NStatus) of
|
||||||
Result = {ok, _} ->
|
Result = {ok, _} ->
|
||||||
event_logs_bo:insert(?EVENT_HOST, UUID, Status),
|
event_logs_bo:insert(?EVENT_HOST, UUID, NStatus),
|
||||||
Result;
|
Result;
|
||||||
Error ->
|
Error ->
|
||||||
Error
|
Error
|
||||||
end.
|
end.
|
||||||
|
change_status0(UUID, ?HOST_ONLINE) when is_binary(UUID) ->
|
||||||
|
Timestamp = calendar:local_time(),
|
||||||
|
case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM host WHERE uuid = ? LIMIT 1">>, [UUID]) of
|
||||||
|
%% 第一次更新激活
|
||||||
|
{ok, #{<<"status">> := -1}} ->
|
||||||
|
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, access_at = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, Timestamp, UUID]);
|
||||||
|
{ok, _} ->
|
||||||
|
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, UUID]);
|
||||||
|
undefined ->
|
||||||
|
{error, <<"host not found">>}
|
||||||
|
end;
|
||||||
|
change_status0(UUID, ?HOST_OFFLINE) when is_binary(UUID) ->
|
||||||
|
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [?HOST_OFFLINE, UUID]).
|
||||||
109
apps/iot/src/iot_database_buffer.erl
Normal file
109
apps/iot/src/iot_database_buffer.erl
Normal file
@ -0,0 +1,109 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 25. 9月 2023 10:05
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_database_buffer).
|
||||||
|
-author("aresei").
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/0]).
|
||||||
|
-export([find_device/1]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
devices_map = #{} :: map()
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec find_device(DeviceUUID :: binary()) -> error | {ok, Device :: map()}.
|
||||||
|
find_device(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||||
|
gen_server:call(?MODULE, {find_device, DeviceUUID}).
|
||||||
|
|
||||||
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
|
-spec(start_link() ->
|
||||||
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Initializes the server
|
||||||
|
-spec(init(Args :: term()) ->
|
||||||
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term()} | ignore).
|
||||||
|
init([]) ->
|
||||||
|
{ok, Devices} = device_bo:get_all_devices(),
|
||||||
|
DevicesMap = maps:from_list(lists:map(fun(Device = #{<<"device_uuid">> := DeviceUUID}) -> {DeviceUUID, Device} end, Devices)),
|
||||||
|
|
||||||
|
erlang:start_timer(300 * 1000, self(), clean_up),
|
||||||
|
|
||||||
|
{ok, #state{devices_map = DevicesMap}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling call messages
|
||||||
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
|
State :: #state{}) ->
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_call({find_device, DeviceUUID}, _From, State = #state{devices_map = DevicesMap}) ->
|
||||||
|
{reply, maps:find(DeviceUUID, DevicesMap), State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling cast messages
|
||||||
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_cast(_Request, State = #state{}) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling all non call/cast messages
|
||||||
|
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_info({timeout, _, clean_up}, State = #state{}) ->
|
||||||
|
{noreply, State#state{devices_map = #{}}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
|
%% with Reason. The return value is ignored.
|
||||||
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
|
State :: #state{}) -> term()).
|
||||||
|
terminate(_Reason, _State = #state{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
|
Extra :: term()) ->
|
||||||
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
@ -78,8 +78,25 @@ start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) ->
|
|||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
%% process to initialize.
|
%% process to initialize.
|
||||||
init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
init([DeviceUUID]) when is_binary(DeviceUUID) ->
|
||||||
gen_server:cast(self(), reload),
|
DeviceInfo1 = case iot_database_buffer:find_device(DeviceUUID) of
|
||||||
{ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID}}.
|
error ->
|
||||||
|
device_bo:get_device_by_uuid(DeviceUUID);
|
||||||
|
{ok, DeviceInfo} ->
|
||||||
|
{ok, DeviceInfo}
|
||||||
|
end,
|
||||||
|
|
||||||
|
case DeviceInfo1 of
|
||||||
|
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} ->
|
||||||
|
case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
|
||||||
|
true ->
|
||||||
|
{ok, ?STATE_ACTIVATED, #state{device_uuid = DeviceUUID, status = Status}};
|
||||||
|
false ->
|
||||||
|
{ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID, status = Status}}
|
||||||
|
end;
|
||||||
|
undefined ->
|
||||||
|
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||||
|
ignore
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_statem when it needs to find out
|
%% @doc This function is called by a gen_statem when it needs to find out
|
||||||
@ -97,13 +114,28 @@ handle_event({call, From}, is_activated, StateName, State = #state{}) ->
|
|||||||
{keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]};
|
{keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]};
|
||||||
|
|
||||||
%% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略
|
%% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略
|
||||||
handle_event(cast, {change_status, ?DEVICE_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_ONLINE}) ->
|
handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{status = ?DEVICE_ONLINE}) ->
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
%% 改变数据库的状态, 其他情况下执行次数都很少
|
%% 改变数据库的状态, 其他情况下执行次数都很少
|
||||||
handle_event(cast, {change_status, NewStatus}, _, State = #state{device_uuid = DeviceUUID}) ->
|
handle_event(cast, {change_status, ?DEVICE_ONLINE}, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||||
{ok, _} = device_bo:change_status(DeviceUUID, NewStatus),
|
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE),
|
||||||
report_event(DeviceUUID, NewStatus),
|
report_event(DeviceUUID, ?DEVICE_ONLINE),
|
||||||
{keep_state, State#state{status = NewStatus}};
|
{keep_state, State#state{status = ?DEVICE_ONLINE}};
|
||||||
|
|
||||||
|
handle_event(cast, {change_status, ?DEVICE_OFFLINE}, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||||
|
{ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID),
|
||||||
|
case Status of
|
||||||
|
?DEVICE_NOT_JOINED ->
|
||||||
|
lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]),
|
||||||
|
{keep_state, State#state{status = ?DEVICE_NOT_JOINED}};
|
||||||
|
?DEVICE_OFFLINE ->
|
||||||
|
lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]),
|
||||||
|
{keep_state, State#state{status = ?DEVICE_OFFLINE}};
|
||||||
|
?DEVICE_ONLINE ->
|
||||||
|
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
|
||||||
|
report_event(DeviceUUID, ?DEVICE_OFFLINE),
|
||||||
|
{keep_state, State#state{status = ?DEVICE_OFFLINE}}
|
||||||
|
end;
|
||||||
|
|
||||||
%% 重新加载数据库数据
|
%% 重新加载数据库数据
|
||||||
handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||||
@ -114,8 +146,7 @@ handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
|||||||
true ->
|
true ->
|
||||||
{next_state, ?STATE_ACTIVATED, State#state{status = Status}};
|
{next_state, ?STATE_ACTIVATED, State#state{status = Status}};
|
||||||
false ->
|
false ->
|
||||||
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
|
{next_state, ?STATE_DENIED, State#state{status = Status}}
|
||||||
{next_state, ?STATE_DENIED, State#state{status = ?DEVICE_OFFLINE}}
|
|
||||||
end;
|
end;
|
||||||
undefined ->
|
undefined ->
|
||||||
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||||
@ -133,8 +164,7 @@ handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceU
|
|||||||
|
|
||||||
{?STATE_ACTIVATED, false} ->
|
{?STATE_ACTIVATED, false} ->
|
||||||
lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]),
|
lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]),
|
||||||
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
|
{next_state, ?STATE_DENIED, State};
|
||||||
{next_state, ?STATE_DENIED, State#state{status = ?DEVICE_OFFLINE}};
|
|
||||||
{?STATE_ACTIVATED, true} ->
|
{?STATE_ACTIVATED, true} ->
|
||||||
lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]),
|
lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]),
|
||||||
{keep_state, State}
|
{keep_state, State}
|
||||||
@ -159,19 +189,21 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec report_event(DeviceUUID :: binary(), NewStatus :: integer()) -> no_return().
|
-spec report_event(DeviceUUID :: binary(), NewStatus :: integer()) -> no_return().
|
||||||
report_event(DeviceUUID, ?DEVICE_UNKNOWN) ->
|
|
||||||
lager:notice("[iot_device] device_uuid: ~p, status: unknown, not report", [DeviceUUID]),
|
|
||||||
ok;
|
|
||||||
report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewStatus) ->
|
report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewStatus) ->
|
||||||
|
TextMap = #{
|
||||||
|
0 => <<"离线"/utf8>>,
|
||||||
|
1 => <<"在线"/utf8>>
|
||||||
|
},
|
||||||
%% 设备的状态信息上报给中电
|
%% 设备的状态信息上报给中电
|
||||||
Timestamp = iot_util:timestamp_of_seconds(),
|
Timestamp = iot_util:timestamp_of_seconds(),
|
||||||
FieldsList = [#{
|
FieldsList = [#{
|
||||||
<<"key">> => <<"device_status">>,
|
<<"key">> => <<"device_status">>,
|
||||||
<<"value">> => NewStatus,
|
<<"value">> => NewStatus,
|
||||||
|
<<"value_text">> => maps:get(NewStatus, TextMap),
|
||||||
<<"unit">> => 0,
|
<<"unit">> => 0,
|
||||||
<<"type">> => <<"DI">>,
|
<<"type">> => <<"DI">>,
|
||||||
<<"name">> => <<"设备状态"/utf8>>,
|
<<"name">> => <<"设备状态"/utf8>>,
|
||||||
<<"timestamp">> => Timestamp
|
<<"timestamp">> => Timestamp
|
||||||
}],
|
}],
|
||||||
iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
|
iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
|
||||||
lager:debug("[iot_host] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
|
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
|
||||||
@ -35,7 +35,8 @@ ensured_device_started(DeviceUUID) when is_binary(DeviceUUID) ->
|
|||||||
|
|
||||||
delete_device(UUID) when is_binary(UUID) ->
|
delete_device(UUID) when is_binary(UUID) ->
|
||||||
Id = iot_device:get_name(UUID),
|
Id = iot_device:get_name(UUID),
|
||||||
supervisor:terminate_child(?MODULE, Id).
|
ok = supervisor:terminate_child(?MODULE, Id),
|
||||||
|
supervisor:delete_child(?MODULE, Id).
|
||||||
|
|
||||||
child_spec(DeviceUUID) when is_binary(DeviceUUID) ->
|
child_spec(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||||
Name = iot_device:get_name(DeviceUUID),
|
Name = iot_device:get_name(DeviceUUID),
|
||||||
|
|||||||
@ -4,17 +4,21 @@
|
|||||||
%%% @doc
|
%%% @doc
|
||||||
%%%
|
%%%
|
||||||
%%% @end
|
%%% @end
|
||||||
%%% Created : 19. 6月 2023 10:32
|
%%% Created : 22. 9月 2023 16:38
|
||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
-module(iot_host).
|
-module(iot_host).
|
||||||
-author("aresei").
|
-author("aresei").
|
||||||
-include("iot.hrl").
|
-include("iot.hrl").
|
||||||
|
|
||||||
-behaviour(gen_server).
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
%% 心跳包检测时间间隔, 15分钟检测一次
|
%% 心跳包检测时间间隔, 15分钟检测一次
|
||||||
-define(HEARTBEAT_INTERVAL, 900 * 1000).
|
-define(HEARTBEAT_INTERVAL, 900 * 1000).
|
||||||
|
|
||||||
|
%% 状态
|
||||||
|
-define(STATE_DENIED, denied).
|
||||||
|
-define(STATE_ACTIVATED, activated).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
||||||
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
||||||
@ -22,15 +26,13 @@
|
|||||||
-export([reload_device/2, delete_device/2, activate_device/3]).
|
-export([reload_device/2, delete_device/2, activate_device/3]).
|
||||||
-export([heartbeat/1]).
|
-export([heartbeat/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_statem callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
host_id :: integer(),
|
host_id :: integer(),
|
||||||
%% 从数据库里面读取到的数据
|
%% 从数据库里面读取到的数据
|
||||||
uuid :: binary(),
|
uuid :: binary(),
|
||||||
%% 记录最后的状态信息
|
|
||||||
last_status :: integer(),
|
|
||||||
%% aes的key, 后续通讯需要基于这个加密
|
%% aes的key, 后续通讯需要基于这个加密
|
||||||
aes = <<>> :: binary(),
|
aes = <<>> :: binary(),
|
||||||
has_session = false :: boolean(),
|
has_session = false :: boolean(),
|
||||||
@ -45,7 +47,6 @@
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec get_pid(UUID :: binary()) -> undefined | pid().
|
-spec get_pid(UUID :: binary()) -> undefined | pid().
|
||||||
get_pid(UUID) when is_binary(UUID) ->
|
get_pid(UUID) when is_binary(UUID) ->
|
||||||
Name = get_name(UUID),
|
Name = get_name(UUID),
|
||||||
@ -63,38 +64,38 @@ get_alias_name(HostId0) when is_integer(HostId0) ->
|
|||||||
%% 处理消息
|
%% 处理消息
|
||||||
-spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return().
|
-spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return().
|
||||||
handle(Pid, Packet) when is_pid(Pid) ->
|
handle(Pid, Packet) when is_pid(Pid) ->
|
||||||
gen_server:cast(Pid, {handle, Packet}).
|
gen_statem:cast(Pid, {handle, Packet}).
|
||||||
|
|
||||||
-spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}.
|
-spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}.
|
||||||
get_aes(Pid) when is_pid(Pid) ->
|
get_aes(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, get_aes).
|
gen_statem:call(Pid, get_aes).
|
||||||
|
|
||||||
-spec get_status(Pid :: pid()) -> {ok, Status :: map()}.
|
-spec get_status(Pid :: pid()) -> {ok, Status :: map()}.
|
||||||
get_status(Pid) when is_pid(Pid) ->
|
get_status(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, get_status).
|
gen_statem:call(Pid, get_status).
|
||||||
|
|
||||||
%% 激活主机, true 表示激活; false表示关闭激活
|
%% 激活主机, true 表示激活; false表示关闭激活
|
||||||
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
|
||||||
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||||
gen_server:call(Pid, {activate, Auth}).
|
gen_statem:call(Pid, {activate, Auth}).
|
||||||
|
|
||||||
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
|
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
|
||||||
get_metric(Pid) when is_pid(Pid) ->
|
get_metric(Pid) when is_pid(Pid) ->
|
||||||
gen_server:call(Pid, get_metric).
|
gen_statem:call(Pid, get_metric).
|
||||||
|
|
||||||
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
|
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
|
||||||
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
|
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
|
||||||
gen_server:call(Pid, {attach_channel, ChannelPid}).
|
gen_statem:call(Pid, {attach_channel, ChannelPid}).
|
||||||
|
|
||||||
-spec create_session(Pid :: pid(), PubKey :: binary()) -> {ok, Reply :: binary()}.
|
-spec create_session(Pid :: pid(), PubKey :: binary()) -> {ok, Reply :: binary()}.
|
||||||
create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) ->
|
create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) ->
|
||||||
gen_server:call(Pid, {create_session, PubKey}).
|
gen_statem:call(Pid, {create_session, PubKey}).
|
||||||
|
|
||||||
%% 这里占用的的调用进程的时间
|
%% 这里占用的的调用进程的时间
|
||||||
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
|
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
|
||||||
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
||||||
publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) ->
|
publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) ->
|
||||||
case gen_server:call(Pid, {publish_message, self(), CommandType, Params}) of
|
case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of
|
||||||
{ok, Ref} ->
|
{ok, Ref} ->
|
||||||
receive
|
receive
|
||||||
{ws_response, Ref} ->
|
{ws_response, Ref} ->
|
||||||
@ -112,247 +113,227 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(
|
|||||||
|
|
||||||
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
||||||
reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||||
gen_server:call(Pid, {reload_device, DeviceUUID}).
|
gen_statem:call(Pid, {reload_device, DeviceUUID}).
|
||||||
|
|
||||||
-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok.
|
-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok.
|
||||||
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||||
gen_server:call(Pid, {delete_device, DeviceUUID}).
|
gen_statem:call(Pid, {delete_device, DeviceUUID}).
|
||||||
|
|
||||||
-spec activate_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}.
|
-spec activate_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}.
|
||||||
activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
|
activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
|
||||||
gen_server:call(Pid, {activate_device, DeviceUUID, Auth}).
|
gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}).
|
||||||
|
|
||||||
-spec heartbeat(Pid :: pid()) -> no_return().
|
-spec heartbeat(Pid :: pid()) -> no_return().
|
||||||
heartbeat(undefined) ->
|
heartbeat(undefined) ->
|
||||||
ok;
|
ok;
|
||||||
heartbeat(Pid) when is_pid(Pid) ->
|
heartbeat(Pid) when is_pid(Pid) ->
|
||||||
gen_server:cast(Pid, heartbeat).
|
gen_statem:cast(Pid, heartbeat).
|
||||||
|
|
||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
%% function does not return until Module:init/1 has returned.
|
%% function does not return until Module:init/1 has returned.
|
||||||
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
|
||||||
gen_server:start_link({local, Name}, ?MODULE, [UUID], []).
|
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_statem callbacks
|
%%% gen_statem callbacks
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Initializes the server
|
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||||
-spec(init(Args :: term()) ->
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
%% process to initialize.
|
||||||
{stop, Reason :: term()} | ignore).
|
|
||||||
init([UUID]) ->
|
init([UUID]) ->
|
||||||
host_bo:change_status(UUID, ?HOST_OFFLINE),
|
|
||||||
report_event(UUID, ?HOST_OFFLINE),
|
|
||||||
|
|
||||||
case host_bo:get_host_by_uuid(UUID) of
|
case host_bo:get_host_by_uuid(UUID) of
|
||||||
{ok, #{<<"id">> := HostId}} ->
|
{ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} ->
|
||||||
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
|
||||||
AliasName = get_alias_name(HostId),
|
AliasName = get_alias_name(HostId),
|
||||||
global:register_name(AliasName, self()),
|
global:register_name(AliasName, self()),
|
||||||
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
Aes = list_to_binary(iot_util:rand_bytes(32)),
|
||||||
|
|
||||||
|
%% 心跳检测机制
|
||||||
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
|
|
||||||
%% 启动主机相关的devices
|
%% 启动主机相关的devices
|
||||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||||
lists:foreach(fun iot_device_sup:ensured_device_started/1, Devices),
|
lists:foreach(fun iot_device_sup:ensured_device_started/1, Devices),
|
||||||
|
|
||||||
%% 心跳检测机制
|
StateName = case AuthorizeStatus =:= 1 of
|
||||||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
true -> ?STATE_ACTIVATED;
|
||||||
|
false -> ?STATE_DENIED
|
||||||
{ok, #state{host_id = HostId, uuid = UUID, last_status = ?HOST_OFFLINE, aes = Aes, has_session = false}};
|
end,
|
||||||
|
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}};
|
||||||
undefined ->
|
undefined ->
|
||||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||||
ignore
|
ignore
|
||||||
end.
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc This function is called by a gen_statem when it needs to find out
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
%% the callback mode of the callback module.
|
||||||
State :: #state{}) ->
|
callback_mode() ->
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
handle_event_function.
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_call(get_metric, _From, State = #state{metrics = Metrics}) ->
|
|
||||||
{reply, {ok, Metrics}, State};
|
|
||||||
|
|
||||||
handle_call(get_aes, _From, State = #state{aes = Aes}) ->
|
%% @private
|
||||||
{reply, {ok, Aes}, State};
|
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||||
|
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||||
|
%% process message, this function is called.
|
||||||
|
|
||||||
|
handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) ->
|
||||||
|
{keep_state, State, [{reply, From, {ok, Metrics}}]};
|
||||||
|
|
||||||
|
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
|
||||||
|
{keep_state, State, [{reply, From, {ok, Aes}}]};
|
||||||
|
|
||||||
%% 获取主机的状态
|
%% 获取主机的状态
|
||||||
handle_call(get_status, _From, State = #state{host_id = HostId, last_status = LastStatus, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) ->
|
handle_event({call, From}, get_status, _, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) ->
|
||||||
%% 启动主机相关的devices
|
%% 启动主机相关的devices
|
||||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||||
DeviceInfos = lists:map(fun(DeviceUUID) ->
|
DeviceInfos = lists:map(fun(DeviceUUID) ->
|
||||||
DevicePid = iot_device:get_pid(DeviceUUID),
|
DevicePid = iot_device:get_pid(DeviceUUID),
|
||||||
case iot_device:is_activated(DevicePid) of
|
case iot_device:is_activated(DevicePid) of
|
||||||
true ->
|
true -> {DeviceUUID, <<"activated">>};
|
||||||
{DeviceUUID, <<"online">>};
|
false -> {DeviceUUID, <<"denied">>}
|
||||||
false ->
|
|
||||||
{DeviceUUID, <<"offline">>}
|
|
||||||
end
|
end
|
||||||
end, Devices),
|
end, Devices),
|
||||||
|
|
||||||
HasChannel = (ChannelPid /= undefined),
|
HasChannel = (ChannelPid /= undefined),
|
||||||
Reply = #{
|
Reply = #{
|
||||||
<<"last_status">> => LastStatus,
|
|
||||||
<<"has_channel">> => HasChannel,
|
<<"has_channel">> => HasChannel,
|
||||||
<<"has_session">> => HasSession,
|
<<"has_session">> => HasSession,
|
||||||
<<"heartbeat_counter">> => HeartbeatCounter,
|
<<"heartbeat_counter">> => HeartbeatCounter,
|
||||||
<<"metrics">> => Metrics,
|
<<"metrics">> => Metrics,
|
||||||
<<"device_infos">> => DeviceInfos
|
<<"device_infos">> => DeviceInfos
|
||||||
},
|
},
|
||||||
{reply, {ok, Reply}, State};
|
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
||||||
|
|
||||||
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
||||||
handle_call({publish_message, ReceiverPid, CommandType, {aes, Command0}}, _From, State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, ?STATE_ACTIVATED,
|
||||||
|
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
||||||
|
|
||||||
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
|
lager:debug("[iot_host] host: ~p, will publish aes message: ~p", [UUID, Command0]),
|
||||||
Command = iot_cipher_aes:encrypt(AES, Command0),
|
Command = iot_cipher_aes:encrypt(AES, Command0),
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
||||||
|
|
||||||
{reply, {ok, Ref}, State};
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
%% 只要channel存在,就负责将消息推送到边缘端主机
|
%% 只要channel存在,就负责将消息推送到边缘端主机
|
||||||
handle_call({publish_message, ReceiverPid, CommandType, Command}, _From, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _,
|
||||||
|
State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
||||||
|
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]),
|
lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]),
|
||||||
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
|
||||||
|
|
||||||
{reply, {ok, Ref}, State};
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
handle_call({publish_message, _, _, _}, _From, State = #state{uuid = UUID}) ->
|
handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = UUID}) ->
|
||||||
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
||||||
{reply, {error, <<"主机状态错误,发送命令失败"/utf8>>}, State};
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
||||||
|
|
||||||
%% 激活主机
|
%% 激活主机
|
||||||
handle_call({activate, true}, _From, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
||||||
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
||||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
|
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
|
||||||
{reply, ok, State};
|
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||||
handle_call({activate, true}, _From, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]),
|
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]),
|
||||||
{reply, ok, State};
|
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
%% 关闭授权
|
%% 关闭授权
|
||||||
handle_call({activate, false}, _From, State = #state{uuid = UUID, host_id = HostId, channel_pid = ChannelPid}) ->
|
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
||||||
case is_pid(ChannelPid) of
|
|
||||||
true ->
|
|
||||||
BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]),
|
BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]),
|
||||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||||
ws_channel:stop(ChannelPid, closed),
|
ws_channel:stop(ChannelPid, closed),
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]);
|
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
|
||||||
false ->
|
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
|
||||||
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID])
|
|
||||||
end,
|
|
||||||
|
|
||||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||||
report_event(UUID, ?HOST_OFFLINE),
|
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID]),
|
||||||
change_devices_status(HostId, ?DEVICE_UNKNOWN),
|
{next_state, ?STATE_DENIED, State#state{has_session = false}, [{reply, From, ok}]};
|
||||||
|
|
||||||
{reply, ok, State#state{last_status = ?HOST_OFFLINE, channel_pid = undefined, has_session = false}};
|
|
||||||
|
|
||||||
%% 绑定channel
|
%% 绑定channel
|
||||||
handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) ->
|
handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||||
lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]),
|
lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]),
|
||||||
case OldChannelPid =:= undefined of
|
|
||||||
true ->
|
|
||||||
erlang:monitor(process, ChannelPid),
|
erlang:monitor(process, ChannelPid),
|
||||||
{reply, ok, State#state{channel_pid = ChannelPid}};
|
{keep_state, State#state{channel_pid = ChannelPid}, [{reply, From, ok}]};
|
||||||
false ->
|
|
||||||
lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]),
|
handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, channel_pid = OldChannelPid}) ->
|
||||||
{reply, {error, <<"channel existed">>}, State#state{channel_pid = ChannelPid}}
|
lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]),
|
||||||
end;
|
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
|
||||||
|
|
||||||
%% 授权通过后,才能将主机的状态设置为在线状态
|
%% 授权通过后,才能将主机的状态设置为在线状态
|
||||||
handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = Aes}) ->
|
handle_event({call, From}, {create_session, PubKey}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = Aes}) ->
|
||||||
{ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID),
|
|
||||||
case AuthorizeStatus =:= 1 of
|
|
||||||
true ->
|
|
||||||
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
Reply = #{<<"a">> => true, <<"aes">> => Aes},
|
||||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||||
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
|
||||||
report_event(UUID, ?HOST_ONLINE),
|
report_event(UUID, ?HOST_ONLINE),
|
||||||
|
|
||||||
lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]),
|
lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]),
|
||||||
|
{keep_state, State#state{has_session = true}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||||
|
|
||||||
{reply, {ok, <<10:8, EncReply/binary>>}, State#state{last_status = ?HOST_ONLINE, has_session = true}};
|
handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #state{uuid = UUID}) ->
|
||||||
false ->
|
|
||||||
lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]),
|
lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]),
|
||||||
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
|
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
|
||||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||||
|
{keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||||
{reply, {ok, <<10:8, EncReply/binary>>}, State#state{has_session = false}}
|
|
||||||
end;
|
|
||||||
|
|
||||||
%% 重新加载设备信息
|
%% 重新加载设备信息
|
||||||
handle_call({reload_device, DeviceUUID}, _From, State) ->
|
handle_event({call, From}, {reload_device, DeviceUUID}, _, State) ->
|
||||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||||
{ok, DevicePid} ->
|
{ok, DevicePid} ->
|
||||||
iot_device:reload(DevicePid),
|
iot_device:reload(DevicePid),
|
||||||
{reply, ok, State};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{reply, {error, Reason}, State}
|
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
%% 删除设备
|
%% 删除设备
|
||||||
handle_call({delete_device, DeviceUUID}, _From, State) ->
|
handle_event({call, From}, {delete_device, DeviceUUID}, _, State) ->
|
||||||
case iot_device:get_pid(DeviceUUID) of
|
case iot_device:get_pid(DeviceUUID) of
|
||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
DevicePid when is_pid(DevicePid) ->
|
DevicePid when is_pid(DevicePid) ->
|
||||||
iot_device_sup:delete_device(DeviceUUID)
|
iot_device_sup:delete_device(DeviceUUID)
|
||||||
end,
|
end,
|
||||||
{reply, ok, State};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
%% 激活设备
|
%% 激活设备
|
||||||
handle_call({activate_device, DeviceUUID, Auth}, _From, State) ->
|
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) ->
|
||||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||||
{ok, DevicePid} ->
|
{ok, DevicePid} ->
|
||||||
iot_device:auth(DevicePid, Auth),
|
iot_device:auth(DevicePid, Auth),
|
||||||
|
|
||||||
{reply, ok, State};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
{reply, {error, Reason}, State}
|
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||||
end.
|
end;
|
||||||
|
|
||||||
-spec handle_cast(Request :: term(), State :: term()) ->
|
|
||||||
{noreply, NewState :: term()} |
|
|
||||||
{noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
||||||
{stop, Reason :: term(), NewState :: term()}.
|
|
||||||
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到
|
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到
|
||||||
handle_cast({handle, {data, Data}}, State = #state{aes = AES, has_session = true}) ->
|
handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) ->
|
||||||
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
||||||
case catch jiffy:decode(PlainData, [return_maps]) of
|
case catch jiffy:decode(PlainData, [return_maps]) of
|
||||||
Info when is_map(Info) ->
|
Info when is_map(Info) ->
|
||||||
handle_data(Info, State);
|
handle_data(Info, State);
|
||||||
Other ->
|
Other ->
|
||||||
lager:debug("[iot_host] the data is invalid json: ~p", [Other])
|
lager:notice("[iot_host] the data is invalid json: ~p", [Other])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% 其他情况丢弃数据
|
%% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行
|
||||||
handle_cast({handle, {data, _}}, State = #state{has_session = false}) ->
|
handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
|
||||||
{noreply, State};
|
|
||||||
|
|
||||||
%% 任意状态下都可以ping
|
|
||||||
handle_cast({handle, {ping, CipherMetric}}, State = #state{uuid = UUID, aes = AES}) ->
|
|
||||||
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
|
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
|
||||||
case catch jiffy:decode(MetricsInfo, [return_maps]) of
|
case catch jiffy:decode(MetricsInfo, [return_maps]) of
|
||||||
Metrics when is_map(Metrics) ->
|
Metrics when is_map(Metrics) ->
|
||||||
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
||||||
{noreply, State#state{metrics = Metrics}};
|
{keep_state, State#state{metrics = Metrics}};
|
||||||
Other ->
|
Other ->
|
||||||
lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
|
lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
|
||||||
{noreply, State}
|
{keep_state, State}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_cast({handle, {inform, Info0}}, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) ->
|
handle_event(cast, {handle, {inform, Info0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, aes = AES, has_session = true}) ->
|
||||||
Info = iot_cipher_aes:decrypt(AES, Info0),
|
Info = iot_cipher_aes:decrypt(AES, Info0),
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
case catch jiffy:decode(Info, [return_maps]) of
|
||||||
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
|
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
|
||||||
@ -377,9 +358,9 @@ handle_cast({handle, {inform, Info0}}, State = #state{uuid = UUID, host_id = Hos
|
|||||||
Error ->
|
Error ->
|
||||||
lager:warning("[iot_host] inform get error: ~p", [Error])
|
lager:warning("[iot_host] inform get error: ~p", [Error])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_cast({handle, {feedback_step, Info0}}, State = #state{aes = AES, has_session = true}) ->
|
handle_event(cast, {handle, {feedback_step, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) ->
|
||||||
Info = iot_cipher_aes:decrypt(AES, Info0),
|
Info = iot_cipher_aes:decrypt(AES, Info0),
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
case catch jiffy:decode(Info, [return_maps]) of
|
||||||
Data = #{<<"task_id">> := TaskId, <<"code">> := Code} ->
|
Data = #{<<"task_id">> := TaskId, <<"code">> := Code} ->
|
||||||
@ -392,9 +373,9 @@ handle_cast({handle, {feedback_step, Info0}}, State = #state{aes = AES, has_sess
|
|||||||
Other ->
|
Other ->
|
||||||
lager:warning("[iot_host] feedback_step error: ~p", [Other])
|
lager:warning("[iot_host] feedback_step error: ~p", [Other])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_cast({handle, {feedback_result, Info0}}, State = #state{aes = AES, has_session = true}) ->
|
handle_event(cast, {handle, {feedback_result, Info0}}, ?STATE_ACTIVATED, State = #state{aes = AES, has_session = true}) ->
|
||||||
Info = iot_cipher_aes:decrypt(AES, Info0),
|
Info = iot_cipher_aes:decrypt(AES, Info0),
|
||||||
case catch jiffy:decode(Info, [return_maps]) of
|
case catch jiffy:decode(Info, [return_maps]) of
|
||||||
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
|
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
|
||||||
@ -409,9 +390,9 @@ handle_cast({handle, {feedback_result, Info0}}, State = #state{aes = AES, has_se
|
|||||||
Other ->
|
Other ->
|
||||||
lager:warning("[iot_host] feedback_result error: ~p", [Other])
|
lager:warning("[iot_host] feedback_result error: ~p", [Other])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_cast({handle, {event, Event0}}, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
|
handle_event(cast, {handle, {event, Event0}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
|
||||||
EventText = iot_cipher_aes:decrypt(AES, Event0),
|
EventText = iot_cipher_aes:decrypt(AES, Event0),
|
||||||
lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, EventText]),
|
lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, EventText]),
|
||||||
case catch jiffy:decode(EventText, [return_maps]) of
|
case catch jiffy:decode(EventText, [return_maps]) of
|
||||||
@ -423,75 +404,68 @@ handle_cast({handle, {event, Event0}}, State = #state{uuid = UUID, aes = AES, ha
|
|||||||
Other ->
|
Other ->
|
||||||
lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
|
lager:warning("[iot_host] host: ~p, event error: ~p", [UUID, Other])
|
||||||
end,
|
end,
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
|
|
||||||
%% 心跳机制
|
%% 心跳机制
|
||||||
handle_cast(heartbeat, State = #state{heartbeat_counter = HeartbeatCounter}) ->
|
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
|
||||||
{noreply, State#state{heartbeat_counter = HeartbeatCounter + 1}}.
|
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};
|
||||||
|
|
||||||
-spec handle_info(Info :: timeout | term(), State :: term()) ->
|
%% 没有收到心跳包,主机下线, 设备状态不变
|
||||||
{noreply, NewState :: term()} |
|
handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID, heartbeat_counter = 0, channel_pid = ChannelPid}) ->
|
||||||
{noreply, NewState :: term(), timeout() | hibernate | {continue, term()}} |
|
|
||||||
{stop, Reason :: term(), NewState :: term()}.
|
|
||||||
%% 没有收到心跳包,主机下线,设备状态变为: "未知"
|
|
||||||
handle_info({timeout, _, heartbeat_ticker}, State = #state{uuid = UUID, host_id = HostId, heartbeat_counter = 0, has_session = false}) ->
|
|
||||||
lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]),
|
lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]),
|
||||||
change_devices_status(HostId, ?DEVICE_UNKNOWN),
|
{ok, #{<<"status">> := Status}} = host_bo:get_host_by_uuid(UUID),
|
||||||
|
case Status of
|
||||||
|
?HOST_NOT_JOINED ->
|
||||||
|
lager:debug("[iot_host] host: ~p, host_maybe_offline, host not joined, can not change to offline", [UUID]);
|
||||||
|
?HOST_OFFLINE ->
|
||||||
|
lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]);
|
||||||
|
?HOST_ONLINE ->
|
||||||
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
|
||||||
report_event(UUID, ?HOST_OFFLINE),
|
report_event(UUID, ?HOST_OFFLINE)
|
||||||
|
end,
|
||||||
|
|
||||||
|
%% 关闭channel,主机需要重新连接,才能保存状态的一致
|
||||||
|
is_pid(ChannelPid) andalso ws_channel:stop(ChannelPid, closed),
|
||||||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
{noreply, State#state{last_status = ?HOST_OFFLINE, heartbeat_counter = 0}};
|
|
||||||
|
{keep_state, State#state{channel_pid = undefined, has_session = false, heartbeat_counter = 0}};
|
||||||
|
|
||||||
%% 其他情况下需要重置系统计数器
|
%% 其他情况下需要重置系统计数器
|
||||||
handle_info({timeout, _, heartbeat_ticker}, State = #state{}) ->
|
handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{}) ->
|
||||||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||||
{noreply, State#state{heartbeat_counter = 0}};
|
{keep_state, State#state{heartbeat_counter = 0}};
|
||||||
|
|
||||||
%% 当websocket断开的时候,主机的状态不一定改变;主机的状态改变通过心跳机制,会话状态需要改变
|
%% 当websocket断开的时候,主机的状态不一定改变;主机的状态改变通过心跳机制,会话状态需要改变
|
||||||
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
handle_event(info, {'DOWN', _Ref, process, ChannelPid, Reason}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
||||||
lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]),
|
lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]),
|
||||||
{noreply, State#state{channel_pid = undefined, has_session = false}};
|
{keep_state, State#state{channel_pid = undefined, has_session = false}};
|
||||||
|
|
||||||
handle_info({'DOWN', _Ref, process, Pid, Reason}, State = #state{uuid = UUID}) ->
|
handle_event(info, {'DOWN', _Ref, process, Pid, Reason}, _, State = #state{uuid = UUID}) ->
|
||||||
lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]),
|
lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]),
|
||||||
{noreply, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_info(Info, State = #state{has_session = HasSession}) ->
|
handle_event(Event, Info, StateName, State = #state{uuid = UUID}) ->
|
||||||
lager:warning("[iot_host] unknown info: ~p, state: ~p", [Info, HasSession]),
|
lager:warning("[iot_host] host: ~p, event: ~p, unknown message: ~p, state_name: ~p, state: ~p", [UUID, Event, Info, StateName, state_map(State)]),
|
||||||
|
{keep_state, State}.
|
||||||
{noreply, State}.
|
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_statem when it is about to
|
%% @doc This function is called by a gen_statem when it is about to
|
||||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||||
%% Reason. The return value is ignored.
|
%% Reason. The return value is ignored.
|
||||||
terminate(Reason, #state{host_id = HostId, uuid = UUID, has_session = HasSession}) ->
|
terminate(Reason, _StateName, _State = #state{uuid = UUID, has_session = HasSession}) ->
|
||||||
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p, status: offline", [UUID, Reason, HasSession]),
|
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]),
|
||||||
host_bo:change_status(UUID, ?HOST_OFFLINE),
|
|
||||||
report_event(UUID, ?HOST_OFFLINE),
|
|
||||||
|
|
||||||
change_devices_status(HostId, ?DEVICE_UNKNOWN),
|
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Convert process state when code is changed
|
%% @doc Convert process state when code is changed
|
||||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||||
{ok, State}.
|
{ok, StateName, State}.
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
%% 修改设备的状态
|
|
||||||
change_devices_status(HostId, NewStatus) when is_integer(HostId), is_integer(NewStatus) ->
|
|
||||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
|
||||||
lager:debug("[iot_host] host_id: ~p, devices new status: ~p, devices: ~p", [HostId, NewStatus, Devices]),
|
|
||||||
lists:foreach(fun(DeviceUUID) ->
|
|
||||||
Pid = iot_device:get_pid(DeviceUUID),
|
|
||||||
iot_device:change_status(Pid, NewStatus)
|
|
||||||
end, Devices).
|
|
||||||
|
|
||||||
%% 处理相关数据
|
%% 处理相关数据
|
||||||
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID})
|
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID})
|
||||||
when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
|
when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
|
||||||
@ -527,11 +501,17 @@ handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"field
|
|||||||
|
|
||||||
-spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return().
|
-spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return().
|
||||||
report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
|
report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
|
||||||
|
TextMap = #{
|
||||||
|
0 => <<"离线"/utf8>>,
|
||||||
|
1 => <<"在线"/utf8>>
|
||||||
|
},
|
||||||
|
|
||||||
%% 设备的状态信息上报给中电
|
%% 设备的状态信息上报给中电
|
||||||
Timestamp = iot_util:timestamp_of_seconds(),
|
Timestamp = iot_util:timestamp_of_seconds(),
|
||||||
FieldsList = [#{
|
FieldsList = [#{
|
||||||
<<"key">> => <<"host_status">>,
|
<<"key">> => <<"host_status">>,
|
||||||
<<"value">> => NewStatus,
|
<<"value">> => NewStatus,
|
||||||
|
<<"value_text">> => maps:get(NewStatus, TextMap),
|
||||||
<<"unit">> => 0,
|
<<"unit">> => 0,
|
||||||
<<"type">> => <<"DI">>,
|
<<"type">> => <<"DI">>,
|
||||||
<<"name">> => <<"主机状态"/utf8>>,
|
<<"name">> => <<"主机状态"/utf8>>,
|
||||||
|
|||||||
@ -37,7 +37,8 @@ ensured_host_started(UUID) when is_binary(UUID) ->
|
|||||||
|
|
||||||
delete_host(UUID) when is_binary(UUID) ->
|
delete_host(UUID) when is_binary(UUID) ->
|
||||||
Id = iot_host:get_name(UUID),
|
Id = iot_host:get_name(UUID),
|
||||||
supervisor:terminate_child(?MODULE, Id).
|
ok = supervisor:terminate_child(?MODULE, Id),
|
||||||
|
supervisor:delete_child(?MODULE, Id).
|
||||||
|
|
||||||
child_spec(UUID) ->
|
child_spec(UUID) ->
|
||||||
Id = iot_host:get_name(UUID),
|
Id = iot_host:get_name(UUID),
|
||||||
|
|||||||
@ -39,6 +39,15 @@ init([]) ->
|
|||||||
modules => ['iot_logger']
|
modules => ['iot_logger']
|
||||||
},
|
},
|
||||||
|
|
||||||
|
#{
|
||||||
|
id => 'iot_database_buffer',
|
||||||
|
start => {'iot_database_buffer', start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 2000,
|
||||||
|
type => supervisor,
|
||||||
|
modules => ['iot_database_buffer']
|
||||||
|
},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'iot_device_sup',
|
id => 'iot_device_sup',
|
||||||
start => {'iot_device_sup', start_link, []},
|
start => {'iot_device_sup', start_link, []},
|
||||||
|
|||||||
@ -65,7 +65,8 @@
|
|||||||
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
|
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
|
||||||
[
|
[
|
||||||
{host, "127.0.0.1"},
|
{host, "127.0.0.1"},
|
||||||
{port, 26379}
|
{port, 26379},
|
||||||
|
{database, 1}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
@ -54,7 +54,8 @@
|
|||||||
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
|
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
|
||||||
[
|
[
|
||||||
{host, "172.19.0.7"},
|
{host, "172.19.0.7"},
|
||||||
{port, 6379}
|
{port, 6379},
|
||||||
|
{database, 1}
|
||||||
]
|
]
|
||||||
},
|
},
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user