support device
This commit is contained in:
parent
de77d77176
commit
1c4fbf6415
@ -31,6 +31,9 @@
|
||||
-define(PACKET_PUBLISH, 16#03).
|
||||
-define(PACKET_PUBLISH_RESPONSE, 16#04).
|
||||
|
||||
%% 事件类型
|
||||
-define(EVENT_DEVICE, 16#01).
|
||||
|
||||
%% 缓存数据库表
|
||||
-record(kv, {
|
||||
key :: binary(),
|
||||
|
||||
@ -11,7 +11,26 @@
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_device_by_uuid/1]).
|
||||
-export([get_host_devices/1, get_device_by_uuid/1, change_status/2, get_host_by_uuid/1]).
|
||||
|
||||
get_device_by_uuid(UUID) when is_binary(UUID) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE uuid = ? LIMIT 1">>, [UUID]).
|
||||
-spec get_host_devices(HostId :: integer()) -> {ok, Devices::list()} | {error, Reason::any()}.
|
||||
get_host_devices(HostId) when is_integer(HostId) ->
|
||||
mysql_pool:get_all(mysql_iot, <<"SELECT * FROM device WHERE host_id = ? AND device_uuid != ''">>, [HostId]).
|
||||
|
||||
-spec get_device_by_uuid(DeviceUUID :: binary()) -> {ok, DeviceInfo :: map()} | undefined.
|
||||
get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(DeviceId :: integer(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_status(DeviceId, Status) when is_integer(DeviceId), is_integer(Status) ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE id = ? LIMIT 1">>, [Status, DeviceId]).
|
||||
|
||||
-spec get_host_by_uuid(DeviceUUID :: binary()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_uuid(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
case get_device_by_uuid(DeviceUUID) of
|
||||
undefined ->
|
||||
undefined;
|
||||
{ok, #{<<"host_id">> := HostId}} ->
|
||||
host_bo:get_host_by_id(HostId)
|
||||
end.
|
||||
@ -11,7 +11,7 @@
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_all_hosts/0, change_status/2, get_host_by_uuid/1]).
|
||||
-export([get_all_hosts/0, change_status/2, get_host_by_uuid/1, get_host_by_id/1]).
|
||||
|
||||
-spec get_all_hosts() -> UUIDList :: [binary()].
|
||||
get_all_hosts() ->
|
||||
@ -22,9 +22,14 @@ get_all_hosts() ->
|
||||
[]
|
||||
end.
|
||||
|
||||
-spec get_host_by_uuid(UUID :: binary()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_uuid(UUID) when is_binary(UUID) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]).
|
||||
|
||||
-spec get_host_by_id(HostId :: integer()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_id(HostId) when is_integer(HostId) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_status(UUID, Status) when is_binary(UUID), is_integer(Status) ->
|
||||
|
||||
57
apps/iot/src/http_handler/device_handler.erl
Normal file
57
apps/iot/src/http_handler/device_handler.erl
Normal file
@ -0,0 +1,57 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author licheng5
|
||||
%%% @copyright (C) 2020, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 26. 4月 2020 3:36 下午
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(device_handler).
|
||||
-author("licheng5").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([handle_request/4]).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% helper methods
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
%% 重新加载对应的主机信息
|
||||
handle_request("POST", "/device/reload", _, #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) ->
|
||||
lager:debug("[device_handler] will reload device uuid: ~p", [DeviceUUID]),
|
||||
case device_bo:get_host_by_uuid(DeviceUUID) of
|
||||
undefined ->
|
||||
{ok, 200, iot_util:json_error(404, <<"device not found, reload error">>)};
|
||||
{ok, #{<<"uuid">> := UUID}} ->
|
||||
Pid = iot_host:get_pid(UUID),
|
||||
ok = iot_host:reload_device(Pid, DeviceUUID),
|
||||
{ok, 200, iot_util:json_data(<<"success">>)}
|
||||
end;
|
||||
|
||||
%% 删除对应的主机信息
|
||||
handle_request("POST", "/device/delete", _, #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) ->
|
||||
case device_bo:get_host_by_uuid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
{ok, #{<<"uuid">> := UUID}} ->
|
||||
Pid = iot_host:get_pid(UUID),
|
||||
ok = iot_host:delete_device(Pid, DeviceUUID)
|
||||
end,
|
||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
||||
|
||||
%% 处理主机的授权的激活
|
||||
handle_request("POST", "/device/activate", _, #{<<"device_uuid">> := DeviceUUID, <<"auth">> := Auth}) when is_binary(DeviceUUID) ->
|
||||
case device_bo:get_host_by_uuid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
{ok, #{<<"uuid">> := UUID}} ->
|
||||
Pid = iot_host:get_pid(UUID),
|
||||
ok = iot_host:auth_device(Pid, DeviceUUID, Auth)
|
||||
end,
|
||||
{ok, 200, iot_util:json_data(<<"success">>)};
|
||||
|
||||
|
||||
handle_request(_, Path, _, _) ->
|
||||
Path1 = list_to_binary(Path),
|
||||
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
||||
@ -12,7 +12,7 @@
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/1, write/4, write/5]).
|
||||
-export([start_link/1, write/4, write/5, write_data/4]).
|
||||
-export([get_precision/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
@ -22,6 +22,9 @@
|
||||
|
||||
-define(INFLUX_POOl, influx_pool).
|
||||
|
||||
-define(DEFAULT_BUCKET, <<"metric">>).
|
||||
-define(DEFAULT_ORG, <<"nannong">>).
|
||||
|
||||
-record(state, {
|
||||
host,
|
||||
port,
|
||||
@ -48,6 +51,22 @@ get_precision(Timestamp) when is_integer(Timestamp) ->
|
||||
<<"ms">>
|
||||
end.
|
||||
|
||||
-spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return().
|
||||
write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) ->
|
||||
%% 按照设备的uuid进行分组
|
||||
Points = lists:map(fun(Fields) ->
|
||||
NFields = case Fields of
|
||||
#{<<"key">> := Key, <<"value">> := Value, <<"unit">> := Unit} ->
|
||||
#{Key => jiffy:encode(#{<<"value">> => Value, <<"unit">> => Unit}, [force_utf8])};
|
||||
#{<<"key">> := Key, <<"value">> := Value} ->
|
||||
#{Key => #{<<"value">> => Value}}
|
||||
end,
|
||||
influx_point:new(Measurement, Tags, NFields, Timestamp)
|
||||
end, FieldsList),
|
||||
Precision = influx_client:get_precision(Timestamp),
|
||||
|
||||
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end).
|
||||
|
||||
-spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Points :: list()) -> no_return().
|
||||
write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) ->
|
||||
write(Pid, Bucket, Org, <<"ms">>, Points).
|
||||
|
||||
@ -19,6 +19,7 @@
|
||||
%% API
|
||||
-export([new/4, normalized/1]).
|
||||
|
||||
-spec new(Measurement :: binary(), Tags :: list() | map(), Fields :: list() | map(), Timestamp :: integer()) -> #point{}.
|
||||
new(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_list(Tags); is_map(Tags), is_list(Fields); is_map(Fields), is_integer(Timestamp) ->
|
||||
#point{measurement = Measurement, tags = as_list(Tags), fields = as_list(Fields), time = Timestamp}.
|
||||
|
||||
|
||||
@ -44,6 +44,7 @@ start_http_server() ->
|
||||
Dispatcher = cowboy_router:compile([
|
||||
{'_', [
|
||||
{"/host/[...]", http_protocol, [host_handler]},
|
||||
{"/device/[...]", http_protocol, [device_handler]},
|
||||
{"/endpoint/[...]", http_protocol, [endpoint_handler]},
|
||||
{"/test/[...]", http_protocol, [test_handler]},
|
||||
{"/ws", ws_channel, []}
|
||||
|
||||
196
apps/iot/src/iot_device.erl
Normal file
196
apps/iot/src/iot_device.erl
Normal file
@ -0,0 +1,196 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 14. 8月 2023 11:40
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_device).
|
||||
-author("aresei").
|
||||
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2, stop/1]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
|
||||
%% 周期性的同步设备状态
|
||||
-define(RELOAD_TICKER, 100 * 1000).
|
||||
|
||||
%% 终端是否在线
|
||||
-define(DEVICE_STATUS_OFFLINE, 0).
|
||||
-define(DEVICE_STATUS_ONLINE, 1).
|
||||
|
||||
%% 终端是否授权
|
||||
-define(DEVICE_AUTH_DENIED, 0).
|
||||
-define(DEVICE_AUTH_AUTHED, 1).
|
||||
|
||||
%% 状态
|
||||
-define(STATE_DENIED, denied).
|
||||
-define(STATE_ACTIVATED, activated).
|
||||
|
||||
-record(state, {
|
||||
parent_pid :: pid(),
|
||||
device_id :: integer(),
|
||||
device_uuid :: binary(),
|
||||
status = ?DEVICE_STATUS_OFFLINE
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec is_activated(Pid :: pid()) -> boolean().
|
||||
is_activated(Pid) when is_pid(Pid) ->
|
||||
gen_statem:call(Pid, is_activated).
|
||||
|
||||
-spec change_status(Pid :: pid(), NewStatus :: integer()) -> no_return().
|
||||
change_status(Pid, NewStatus) when is_pid(Pid), is_integer(NewStatus) ->
|
||||
gen_statem:cast(Pid, {change_status, NewStatus}).
|
||||
|
||||
-spec reload(Pid :: pid()) -> no_return().
|
||||
reload(Pid) when is_pid(Pid) ->
|
||||
gen_statem:cast(Pid, reload).
|
||||
|
||||
-spec auth(Pid :: pid(), Auth :: boolean()) -> no_return().
|
||||
auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||
gen_statem:cast(Pid, {auth, Auth}).
|
||||
|
||||
-spec stop(Pid :: pid()) -> no_return().
|
||||
stop(Pid) when is_pid(Pid) ->
|
||||
gen_statem:stop(Pid).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link(ParentPid, DeviceUUID) when is_pid(ParentPid), is_binary(DeviceUUID) ->
|
||||
gen_statem:start_link(?MODULE, [ParentPid, DeviceUUID], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([ParentPid, DeviceUUID]) ->
|
||||
%% 定期同步数据库的状态信息
|
||||
erlang:start_timer(?RELOAD_TICKER, self(), reload_ticker),
|
||||
|
||||
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := DeviceId}} = device_bo:get_device_by_uuid(DeviceUUID),
|
||||
StateName = case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
|
||||
false -> ?STATE_DENIED;
|
||||
true -> ?STATE_ACTIVATED
|
||||
end,
|
||||
%% 重启时,离线状态
|
||||
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
|
||||
lager:debug("[iot_device] started device: ~p, state_name: ~p", [DeviceUUID, StateName]),
|
||||
|
||||
{ok, StateName, #state{parent_pid = ParentPid, device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_OFFLINE}}.
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it needs to find out
|
||||
%% the callback mode of the callback module.
|
||||
callback_mode() ->
|
||||
handle_event_function.
|
||||
|
||||
%% @private
|
||||
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
|
||||
%% (2) when gen_statem terminates abnormally.
|
||||
%% This callback is optional.
|
||||
format_status(_Opt, [_PDict, _StateName, _State]) ->
|
||||
Status = some_term,
|
||||
Status.
|
||||
|
||||
%% @private
|
||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||
%% process message, this function is called.
|
||||
|
||||
%% 判断是否是激活状态
|
||||
handle_event({call, From}, is_activated, StateName, State = #state{}) ->
|
||||
{keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]};
|
||||
|
||||
%% 改变数据库的状态, 离线状态事件必须执行(主动触发离线的情况很少,不会造成数据库眼里)
|
||||
handle_event(cast, {change_status, ?DEVICE_STATUS_OFFLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) ->
|
||||
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
|
||||
{keep_state, State#state{status = ?DEVICE_STATUS_OFFLINE}};
|
||||
%% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略
|
||||
handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_STATUS_ONLINE}) ->
|
||||
{keep_state, State};
|
||||
%% 其他情况下需要修改数据状态为在线状态
|
||||
handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) ->
|
||||
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE),
|
||||
{keep_state, State#state{status = ?DEVICE_STATUS_ONLINE}};
|
||||
%% 其他状态下不存在在线状态的变化
|
||||
handle_event(cast, {change_status, _}, _, State = #state{}) ->
|
||||
{keep_state, State};
|
||||
|
||||
%% 重新加载数据库数据
|
||||
handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||
lager:debug("[iot_device] will reload: ~p", [DeviceUUID]),
|
||||
reload_database(State);
|
||||
|
||||
%% 处理授权
|
||||
handle_event(cast, {auth, Auth}, ?STATE_DENIED, State = #state{device_id = DeviceId, device_uuid = DeviceUUID}) ->
|
||||
case Auth of
|
||||
true ->
|
||||
%% 需要矫正status字段的值
|
||||
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: true, state_name from ~p, to: ~p", [DeviceUUID, ?STATE_DENIED, ?STATE_ACTIVATED]),
|
||||
{next_state, ?STATE_ACTIVATED, State#state{status = ?DEVICE_STATUS_OFFLINE}};
|
||||
false ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]),
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
handle_event(cast, {auth, Auth}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId, device_uuid = DeviceUUID}) ->
|
||||
case Auth of
|
||||
true ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]),
|
||||
{keep_state, State};
|
||||
false ->
|
||||
%% 需要矫正status字段的值
|
||||
{ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE),
|
||||
lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]),
|
||||
|
||||
{next_state, ?STATE_DENIED, State#state{status = ?DEVICE_STATUS_OFFLINE}}
|
||||
end;
|
||||
|
||||
handle_event(info, {timeout, _, reload_ticker}, _, State) ->
|
||||
erlang:start_timer(?RELOAD_TICKER + rand:uniform(10) * 1000, self(), reload_ticker),
|
||||
reload_database(State).
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_statem when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, StateName, #state{device_uuid = DeviceUUID}) ->
|
||||
lager:notice("[iot_device] device_uuid: ~p, state_name: ~p, terminate with reason: ~p", [DeviceUUID, StateName, Reason]),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
{ok, StateName, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
reload_database(State = #state{device_uuid = DeviceUUID}) ->
|
||||
case device_bo:get_device_by_uuid(DeviceUUID) of
|
||||
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"id">> := DeviceId}} ->
|
||||
StateName = case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
|
||||
false -> ?STATE_DENIED;
|
||||
true -> ?STATE_ACTIVATED
|
||||
end,
|
||||
{next_state, StateName, State#state{device_id = DeviceId, status = Status}};
|
||||
undefined ->
|
||||
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
|
||||
{keep_state, State}
|
||||
end.
|
||||
@ -21,7 +21,7 @@
|
||||
-define(HOST_AUTHED, 1).
|
||||
|
||||
%% API
|
||||
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
|
||||
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2, reload_device/2, delete_device/2, auth_device/3]).
|
||||
-export([get_metric/1, publish_message/4, get_aes/1]).
|
||||
-export([create_session/2, attach_channel/2]).
|
||||
|
||||
@ -32,6 +32,8 @@
|
||||
host_id :: integer(),
|
||||
%% 从数据库里面读取到的数据
|
||||
uuid :: binary(),
|
||||
%% 建立到和device之间的映射关系
|
||||
device_map = #{},
|
||||
%% aes的key, 后续通讯需要基于这个加密
|
||||
aes = <<>> :: binary(),
|
||||
|
||||
@ -65,6 +67,20 @@ handle(Pid, Packet) when is_pid(Pid) ->
|
||||
reload(Pid) when is_pid(Pid) ->
|
||||
gen_statem:call(Pid, reload).
|
||||
|
||||
%% 重新加载主机的基本信息
|
||||
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
||||
reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||
gen_statem:call(Pid, {reload_device, DeviceUUID}).
|
||||
|
||||
%% 重新加载主机的基本信息
|
||||
-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
||||
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
|
||||
gen_statem:call(Pid, {delete_device, DeviceUUID}).
|
||||
|
||||
-spec auth_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}.
|
||||
auth_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
|
||||
gen_statem:call(Pid, {auth_device, DeviceUUID, Auth}).
|
||||
|
||||
-spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}.
|
||||
get_aes(Pid) when is_pid(Pid) ->
|
||||
gen_statem:call(Pid, get_aes).
|
||||
@ -130,7 +146,15 @@ init([UUID]) ->
|
||||
true -> activated
|
||||
end,
|
||||
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}};
|
||||
%% 启动所有的device
|
||||
{ok, Devices} = device_bo:get_host_devices(HostId),
|
||||
lager:debug("[iot_host] uuid: ~p, loaded devices: ~p", [UUID, Devices]),
|
||||
DeviceMap = lists:foldl(fun(#{<<"device_uuid">> := DeviceUUID}, M) ->
|
||||
{ok, Pid} = iot_device:start_link(self(), DeviceUUID),
|
||||
maps:put(DeviceUUID, Pid, M)
|
||||
end, #{}, Devices),
|
||||
|
||||
{ok, StateName, #state{host_id = HostId, device_map = DeviceMap, uuid = UUID, aes = Aes}};
|
||||
undefined ->
|
||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||
ignore
|
||||
@ -231,39 +255,56 @@ handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{u
|
||||
|
||||
{next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
|
||||
%% 处理设备相关
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
undefined ->
|
||||
{ok, Pid} = iot_device:start_link(self(), DeviceUUID),
|
||||
NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap),
|
||||
{keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]};
|
||||
DevicePid ->
|
||||
iot_device:reload(DevicePid),
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
|
||||
handle_event({call, From}, {auth_device, DeviceUUID, Auth}, _, State = #state{device_map = DeviceMap}) ->
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
undefined ->
|
||||
{ok, Pid} = iot_device:start_link(self(), DeviceUUID),
|
||||
iot_device:auth(Pid, Auth),
|
||||
|
||||
NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap),
|
||||
{keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]};
|
||||
DevicePid ->
|
||||
iot_device:auth(DevicePid, Auth),
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{uuid = UUID, device_map = DeviceMap}) ->
|
||||
case maps:take(DeviceUUID, DeviceMap) of
|
||||
error ->
|
||||
lager:notice("[iot_host] uuid: ~p, delete device: ~p, not found", [UUID, DeviceUUID]),
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{DevicePid, NDeviceMap} ->
|
||||
iot_device:stop(DevicePid),
|
||||
{keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]}
|
||||
end;
|
||||
|
||||
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到
|
||||
handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||
handle_event(cast, {handle, {data, Data}}, session, State = #state{aes = AES}) ->
|
||||
PlainData = iot_cipher_aes:decrypt(AES, Data),
|
||||
case catch jiffy:decode(PlainData, [return_maps]) of
|
||||
Info = #{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags} when is_binary(ServiceName) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
RouterUUID = router_uuid(Info, UUID),
|
||||
lager:debug("[iot_host] host: ~p, router_uuid: ~p, get data: ~p", [UUID, RouterUUID, Info]),
|
||||
case mnesia_kv:hget(RouterUUID, <<"location_code">>) of
|
||||
none ->
|
||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, not found", [UUID, RouterUUID]);
|
||||
{error, Reason} ->
|
||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]);
|
||||
{ok, LocationCode} ->
|
||||
iot_router:route(LocationCode, FieldsList, Timestamp)
|
||||
end,
|
||||
|
||||
%% 数据写入influxdb
|
||||
NTags = with_device_uuid(Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, Info),
|
||||
|
||||
%% 按照设备的uuid进行分组
|
||||
Points = lists:map(fun(Fields) ->
|
||||
NFields = convert_fields(Fields),
|
||||
influx_point:new(RouterUUID, NTags, NFields, Timestamp)
|
||||
end, FieldsList),
|
||||
Precision = influx_client:get_precision(Timestamp),
|
||||
|
||||
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"metric">>, <<"nannong">>, Precision, Points) end);
|
||||
Info when is_map(Info) ->
|
||||
handle_data(Info, State);
|
||||
Other ->
|
||||
lager:debug("[iot_host] the data is invalid json: ~p", [Other])
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
%% 其他情况丢弃数据
|
||||
handle_event(cast, {handle, {data, _}}, _, State) ->
|
||||
{keep_state, State};
|
||||
|
||||
%% 任意状态下都可以ping
|
||||
handle_event(cast, {handle, {ping, CipherMetric}}, _, State = #state{uuid = UUID, aes = AES}) ->
|
||||
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
|
||||
@ -335,8 +376,21 @@ handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{a
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
%% 其他情况丢弃数据
|
||||
handle_event(cast, {handle, {data, _}}, _, State) ->
|
||||
handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES, device_map = DeviceMap}) ->
|
||||
EventText = iot_cipher_aes:decrypt(AES, Event0),
|
||||
case catch jiffy:decode(EventText, [return_maps]) of
|
||||
#{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} ->
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
iot_device:change_status(Pid, Status)
|
||||
end;
|
||||
Event when is_map(Event) ->
|
||||
lager:warning("[iot_host] event: ~p, not supported", [Event]);
|
||||
Other ->
|
||||
lager:warning("[iot_host] event error: ~p", [Other])
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
%% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换
|
||||
@ -352,6 +406,25 @@ handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State
|
||||
{keep_state, State#state{channel_pid = undefined}}
|
||||
end;
|
||||
|
||||
%% 处理device进程的退出
|
||||
handle_event(info, {'EXIT', Pid, Reason}, StateName, State = #state{uuid = UUID, device_map = DeviceMap}) ->
|
||||
lager:warning("[iot_host] uuid: ~p, device pid: ~p, exit with reason: ~p, state name: ~p, state: ~p", [UUID, Pid, Reason, StateName, State]),
|
||||
case lists:search(fun({_, Pid0}) -> Pid =:= Pid0 end, maps:to_list(DeviceMap)) of
|
||||
false ->
|
||||
{keep_state, State};
|
||||
{value, {DeviceUUID, _}} ->
|
||||
%% 重启device进程不一定能成功,可能重启时,数据库已经删除了
|
||||
case iot_device:start_link(self(), DeviceUUID) of
|
||||
{ok, DevicePid} ->
|
||||
%% 采用的是同样的DeviceUUID,因此老的值会被覆盖
|
||||
NDeviceMap = maps:put(DeviceUUID, DevicePid, DeviceMap),
|
||||
{keep_state, State#state{device_map = NDeviceMap}};
|
||||
Error ->
|
||||
lager:warning("[iot_host] uuid: ~p, restart device: ~p, get error: ~p", [UUID, DeviceUUID, Error]),
|
||||
{keep_state, State}
|
||||
end
|
||||
end;
|
||||
|
||||
handle_event(EventType, EventContent, StateName, State) ->
|
||||
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
|
||||
|
||||
@ -376,19 +449,33 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
convert_fields(#{<<"key">> := Key, <<"value">> := Value, <<"unit">> := Unit}) ->
|
||||
#{Key => jiffy:encode(#{<<"value">> => Value, <<"unit">> => Unit}, [force_utf8])};
|
||||
convert_fields(#{<<"key">> := Key, <<"value">> := Value}) ->
|
||||
#{Key => #{<<"value">> => Value}}.
|
||||
%% 处理相关数据
|
||||
handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID, device_map = DeviceMap})
|
||||
when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
|
||||
|
||||
%% 获取到分发的路由
|
||||
router_uuid(#{<<"device_uuid">> := DeviceUUID}, _) when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
|
||||
DeviceUUID;
|
||||
router_uuid(_, UUID) ->
|
||||
UUID.
|
||||
case maps:get(DeviceUUID, DeviceMap, undefined) of
|
||||
undefined ->
|
||||
ok;
|
||||
Pid ->
|
||||
case iot_device:is_activated(Pid) of
|
||||
true ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
|
||||
|
||||
-spec with_device_uuid(Tags :: #{}, Info :: #{}) -> #{}.
|
||||
with_device_uuid(Tags, #{<<"device_uuid">> := DeviceUUID}) when DeviceUUID /= <<>> ->
|
||||
Tags#{<<"device_uuid">> => DeviceUUID};
|
||||
with_device_uuid(Tags, _) ->
|
||||
Tags.
|
||||
%% 数据写入influxdb
|
||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID},
|
||||
influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
|
||||
|
||||
iot_device:change_status(Pid, 1);
|
||||
false ->
|
||||
lager:notice("[iot_host] uuid: ~p, device_uuid: ~p, device is not activated", [UUID, DeviceUUID])
|
||||
end
|
||||
end;
|
||||
|
||||
handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
iot_router:route_uuid(UUID, FieldsList, Timestamp),
|
||||
|
||||
%% 数据写入influxdb
|
||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
|
||||
influx_client:write_data(UUID, NTags, FieldsList, Timestamp).
|
||||
@ -11,7 +11,19 @@
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([route/3]).
|
||||
-export([route/3, route_uuid/3]).
|
||||
|
||||
-spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
case mnesia_kv:hget(RouterUUID, <<"location_code">>) of
|
||||
none ->
|
||||
lager:debug("[iot_host] the north_data hget location_code, uuid: ~p, not found", [RouterUUID]);
|
||||
{error, Reason} ->
|
||||
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]);
|
||||
{ok, LocationCode} ->
|
||||
route(LocationCode, Fields, Timestamp)
|
||||
end.
|
||||
|
||||
-spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok.
|
||||
route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) ->
|
||||
|
||||
@ -100,6 +100,10 @@ websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_FEEDBACK_RESU
|
||||
iot_host:handle(HostPid, {feedback_result, CipherInfo}),
|
||||
{ok, State};
|
||||
|
||||
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_EVENT:8, CipherEvent/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
|
||||
iot_host:handle(HostPid, {event, CipherEvent}),
|
||||
{ok, State};
|
||||
|
||||
%% 主机端的消息响应
|
||||
websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>}, State = #state{uuid = UUID, inflight = Inflight}) when PacketId > 0 ->
|
||||
lager:debug("[ws_channel] uuid: ~p, get publish response message: ~p, packet_id: ~p", [UUID, Body, PacketId]),
|
||||
|
||||
@ -67,12 +67,25 @@ Body: 公钥信息
|
||||
PacketId: 4字节整数, 值为0;
|
||||
Body: 公钥信息
|
||||
|
||||
### 主机上传终端设备的在线或离线事件
|
||||
<<0x01, PacketId:4, 0x07, DeviceUUID:32/binary, Status:1>>
|
||||
### 主机上传终端设备的相关事件
|
||||
<<0x01, PacketId:4, 0x07, Body:任意长度>>
|
||||
|
||||
PacketId: 4字节整数, 值为0;
|
||||
DeviceUUID: 设备32位UUID
|
||||
Status: 1字节整数,1表示在线,0表示离线
|
||||
Body: 事件内容,AES加密
|
||||
|
||||
```text
|
||||
|
||||
设备的离在线状态
|
||||
|
||||
{
|
||||
"event_type": 1,
|
||||
"params": {
|
||||
"device_uuid": "",
|
||||
"status": 0 // 1在线, 0离线
|
||||
}
|
||||
}
|
||||
|
||||
```
|
||||
|
||||
### data北向数据上传 (无响应)
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user