diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 6a90232..6f3ba99 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -31,6 +31,9 @@ -define(PACKET_PUBLISH, 16#03). -define(PACKET_PUBLISH_RESPONSE, 16#04). +%% 事件类型 +-define(EVENT_DEVICE, 16#01). + %% 缓存数据库表 -record(kv, { key :: binary(), diff --git a/apps/iot/src/database/device_bo.erl b/apps/iot/src/database/device_bo.erl index 36ed236..e7600cd 100644 --- a/apps/iot/src/database/device_bo.erl +++ b/apps/iot/src/database/device_bo.erl @@ -11,7 +11,26 @@ -include("iot.hrl"). %% API --export([get_device_by_uuid/1]). +-export([get_host_devices/1, get_device_by_uuid/1, change_status/2, get_host_by_uuid/1]). -get_device_by_uuid(UUID) when is_binary(UUID) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE uuid = ? LIMIT 1">>, [UUID]). \ No newline at end of file +-spec get_host_devices(HostId :: integer()) -> {ok, Devices::list()} | {error, Reason::any()}. +get_host_devices(HostId) when is_integer(HostId) -> + mysql_pool:get_all(mysql_iot, <<"SELECT * FROM device WHERE host_id = ? AND device_uuid != ''">>, [HostId]). + +-spec get_device_by_uuid(DeviceUUID :: binary()) -> {ok, DeviceInfo :: map()} | undefined. +get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) -> + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]). + +%% 修改主机的状态 +-spec change_status(DeviceId :: integer(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +change_status(DeviceId, Status) when is_integer(DeviceId), is_integer(Status) -> + mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE id = ? LIMIT 1">>, [Status, DeviceId]). + +-spec get_host_by_uuid(DeviceUUID :: binary()) -> undefined | {ok, HostInfo :: map()}. +get_host_by_uuid(DeviceUUID) when is_binary(DeviceUUID) -> + case get_device_by_uuid(DeviceUUID) of + undefined -> + undefined; + {ok, #{<<"host_id">> := HostId}} -> + host_bo:get_host_by_id(HostId) + end. \ No newline at end of file diff --git a/apps/iot/src/database/host_bo.erl b/apps/iot/src/database/host_bo.erl index 8204534..d159e88 100644 --- a/apps/iot/src/database/host_bo.erl +++ b/apps/iot/src/database/host_bo.erl @@ -11,7 +11,7 @@ -include("iot.hrl"). %% API --export([get_all_hosts/0, change_status/2, get_host_by_uuid/1]). +-export([get_all_hosts/0, change_status/2, get_host_by_uuid/1, get_host_by_id/1]). -spec get_all_hosts() -> UUIDList :: [binary()]. get_all_hosts() -> @@ -22,9 +22,14 @@ get_all_hosts() -> [] end. +-spec get_host_by_uuid(UUID :: binary()) -> undefined | {ok, HostInfo :: map()}. get_host_by_uuid(UUID) when is_binary(UUID) -> mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]). +-spec get_host_by_id(HostId :: integer()) -> undefined | {ok, HostInfo :: map()}. +get_host_by_id(HostId) when is_integer(HostId) -> + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]). + %% 修改主机的状态 -spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. change_status(UUID, Status) when is_binary(UUID), is_integer(Status) -> diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl new file mode 100644 index 0000000..7861f80 --- /dev/null +++ b/apps/iot/src/http_handler/device_handler.erl @@ -0,0 +1,57 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(device_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% 重新加载对应的主机信息 +handle_request("POST", "/device/reload", _, #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) -> + lager:debug("[device_handler] will reload device uuid: ~p", [DeviceUUID]), + case device_bo:get_host_by_uuid(DeviceUUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"device not found, reload error">>)}; + {ok, #{<<"uuid">> := UUID}} -> + Pid = iot_host:get_pid(UUID), + ok = iot_host:reload_device(Pid, DeviceUUID), + {ok, 200, iot_util:json_data(<<"success">>)} + end; + +%% 删除对应的主机信息 +handle_request("POST", "/device/delete", _, #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) -> + case device_bo:get_host_by_uuid(DeviceUUID) of + undefined -> + ok; + {ok, #{<<"uuid">> := UUID}} -> + Pid = iot_host:get_pid(UUID), + ok = iot_host:delete_device(Pid, DeviceUUID) + end, + {ok, 200, iot_util:json_data(<<"success">>)}; + +%% 处理主机的授权的激活 +handle_request("POST", "/device/activate", _, #{<<"device_uuid">> := DeviceUUID, <<"auth">> := Auth}) when is_binary(DeviceUUID) -> + case device_bo:get_host_by_uuid(DeviceUUID) of + undefined -> + ok; + {ok, #{<<"uuid">> := UUID}} -> + Pid = iot_host:get_pid(UUID), + ok = iot_host:auth_device(Pid, DeviceUUID, Auth) + end, + {ok, 200, iot_util:json_data(<<"success">>)}; + + +handle_request(_, Path, _, _) -> + Path1 = list_to_binary(Path), + {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file diff --git a/apps/iot/src/influxdb/influx_client.erl b/apps/iot/src/influxdb/influx_client.erl index 9ba04d3..19f9a34 100644 --- a/apps/iot/src/influxdb/influx_client.erl +++ b/apps/iot/src/influxdb/influx_client.erl @@ -12,7 +12,7 @@ -behaviour(gen_server). %% API --export([start_link/1, write/4, write/5]). +-export([start_link/1, write/4, write/5, write_data/4]). -export([get_precision/1]). %% gen_server callbacks @@ -22,6 +22,9 @@ -define(INFLUX_POOl, influx_pool). +-define(DEFAULT_BUCKET, <<"metric">>). +-define(DEFAULT_ORG, <<"nannong">>). + -record(state, { host, port, @@ -48,6 +51,22 @@ get_precision(Timestamp) when is_integer(Timestamp) -> <<"ms">> end. +-spec write_data(Measurement :: binary(), Tags :: map(), FieldsList :: list(), Timestamp :: integer()) -> no_return(). +write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement), is_map(Tags), is_list(FieldsList), is_integer(Timestamp) -> + %% 按照设备的uuid进行分组 + Points = lists:map(fun(Fields) -> + NFields = case Fields of + #{<<"key">> := Key, <<"value">> := Value, <<"unit">> := Unit} -> + #{Key => jiffy:encode(#{<<"value">> => Value, <<"unit">> => Unit}, [force_utf8])}; + #{<<"key">> := Key, <<"value">> := Value} -> + #{Key => #{<<"value">> => Value}} + end, + influx_point:new(Measurement, Tags, NFields, Timestamp) + end, FieldsList), + Precision = influx_client:get_precision(Timestamp), + + poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end). + -spec write(Pid :: pid(), Bucket :: binary(), Org :: binary(), Points :: list()) -> no_return(). write(Pid, Bucket, Org, Points) when is_pid(Pid), is_binary(Bucket), is_binary(Org), is_list(Points) -> write(Pid, Bucket, Org, <<"ms">>, Points). diff --git a/apps/iot/src/influxdb/influx_point.erl b/apps/iot/src/influxdb/influx_point.erl index 80851b0..8579aec 100644 --- a/apps/iot/src/influxdb/influx_point.erl +++ b/apps/iot/src/influxdb/influx_point.erl @@ -19,6 +19,7 @@ %% API -export([new/4, normalized/1]). +-spec new(Measurement :: binary(), Tags :: list() | map(), Fields :: list() | map(), Timestamp :: integer()) -> #point{}. new(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_list(Tags); is_map(Tags), is_list(Fields); is_map(Fields), is_integer(Timestamp) -> #point{measurement = Measurement, tags = as_list(Tags), fields = as_list(Fields), time = Timestamp}. diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 944829c..59acba0 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -44,6 +44,7 @@ start_http_server() -> Dispatcher = cowboy_router:compile([ {'_', [ {"/host/[...]", http_protocol, [host_handler]}, + {"/device/[...]", http_protocol, [device_handler]}, {"/endpoint/[...]", http_protocol, [endpoint_handler]}, {"/test/[...]", http_protocol, [test_handler]}, {"/ws", ws_channel, []} diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl new file mode 100644 index 0000000..d31cda1 --- /dev/null +++ b/apps/iot/src/iot_device.erl @@ -0,0 +1,196 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 14. 8月 2023 11:40 +%%%------------------------------------------------------------------- +-module(iot_device). +-author("aresei"). + +-behaviour(gen_statem). + +%% API +-export([start_link/2, is_activated/1, change_status/2, reload/1, auth/2, stop/1]). + +%% gen_statem callbacks +-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +%% 周期性的同步设备状态 +-define(RELOAD_TICKER, 100 * 1000). + +%% 终端是否在线 +-define(DEVICE_STATUS_OFFLINE, 0). +-define(DEVICE_STATUS_ONLINE, 1). + +%% 终端是否授权 +-define(DEVICE_AUTH_DENIED, 0). +-define(DEVICE_AUTH_AUTHED, 1). + +%% 状态 +-define(STATE_DENIED, denied). +-define(STATE_ACTIVATED, activated). + +-record(state, { + parent_pid :: pid(), + device_id :: integer(), + device_uuid :: binary(), + status = ?DEVICE_STATUS_OFFLINE +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec is_activated(Pid :: pid()) -> boolean(). +is_activated(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, is_activated). + +-spec change_status(Pid :: pid(), NewStatus :: integer()) -> no_return(). +change_status(Pid, NewStatus) when is_pid(Pid), is_integer(NewStatus) -> + gen_statem:cast(Pid, {change_status, NewStatus}). + +-spec reload(Pid :: pid()) -> no_return(). +reload(Pid) when is_pid(Pid) -> + gen_statem:cast(Pid, reload). + +-spec auth(Pid :: pid(), Auth :: boolean()) -> no_return(). +auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> + gen_statem:cast(Pid, {auth, Auth}). + +-spec stop(Pid :: pid()) -> no_return(). +stop(Pid) when is_pid(Pid) -> + gen_statem:stop(Pid). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(ParentPid, DeviceUUID) when is_pid(ParentPid), is_binary(DeviceUUID) -> + gen_statem:start_link(?MODULE, [ParentPid, DeviceUUID], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([ParentPid, DeviceUUID]) -> + %% 定期同步数据库的状态信息 + erlang:start_timer(?RELOAD_TICKER, self(), reload_ticker), + + {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"id">> := DeviceId}} = device_bo:get_device_by_uuid(DeviceUUID), + StateName = case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of + false -> ?STATE_DENIED; + true -> ?STATE_ACTIVATED + end, + %% 重启时,离线状态 + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), + lager:debug("[iot_device] started device: ~p, state_name: ~p", [DeviceUUID, StateName]), + + {ok, StateName, #state{parent_pid = ParentPid, device_id = DeviceId, device_uuid = DeviceUUID, status = ?DEVICE_STATUS_OFFLINE}}. +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or +%% (2) when gen_statem terminates abnormally. +%% This callback is optional. +format_status(_Opt, [_PDict, _StateName, _State]) -> + Status = some_term, + Status. + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. + +%% 判断是否是激活状态 +handle_event({call, From}, is_activated, StateName, State = #state{}) -> + {keep_state, State, [{reply, From, StateName =:= ?STATE_ACTIVATED}]}; + +%% 改变数据库的状态, 离线状态事件必须执行(主动触发离线的情况很少,不会造成数据库眼里) +handle_event(cast, {change_status, ?DEVICE_STATUS_OFFLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) -> + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), + {keep_state, State#state{status = ?DEVICE_STATUS_OFFLINE}}; +%% 改变为在线状态,但是数据库中的状态已经是在线状态,忽略 +handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{status = ?DEVICE_STATUS_ONLINE}) -> + {keep_state, State}; +%% 其他情况下需要修改数据状态为在线状态 +handle_event(cast, {change_status, ?DEVICE_STATUS_ONLINE}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId}) -> + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_ONLINE), + {keep_state, State#state{status = ?DEVICE_STATUS_ONLINE}}; +%% 其他状态下不存在在线状态的变化 +handle_event(cast, {change_status, _}, _, State = #state{}) -> + {keep_state, State}; + +%% 重新加载数据库数据 +handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) -> + lager:debug("[iot_device] will reload: ~p", [DeviceUUID]), + reload_database(State); + +%% 处理授权 +handle_event(cast, {auth, Auth}, ?STATE_DENIED, State = #state{device_id = DeviceId, device_uuid = DeviceUUID}) -> + case Auth of + true -> + %% 需要矫正status字段的值 + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), + lager:debug("[iot_device] device_uuid: ~p, auth: true, state_name from ~p, to: ~p", [DeviceUUID, ?STATE_DENIED, ?STATE_ACTIVATED]), + {next_state, ?STATE_ACTIVATED, State#state{status = ?DEVICE_STATUS_OFFLINE}}; + false -> + lager:debug("[iot_device] device_uuid: ~p, auth: false, will keep state_name: ~p", [DeviceUUID, ?STATE_DENIED]), + {keep_state, State} + end; + +handle_event(cast, {auth, Auth}, ?STATE_ACTIVATED, State = #state{device_id = DeviceId, device_uuid = DeviceUUID}) -> + case Auth of + true -> + lager:debug("[iot_device] device_uuid: ~p, auth: true, will keep state_name: ~p", [DeviceUUID, ?STATE_ACTIVATED]), + {keep_state, State}; + false -> + %% 需要矫正status字段的值 + {ok, _} = device_bo:change_status(DeviceId, ?DEVICE_STATUS_OFFLINE), + lager:debug("[iot_device] device_uuid: ~p, auth: false, state_name from: ~p, to: ~p", [DeviceUUID, ?STATE_ACTIVATED, ?STATE_DENIED]), + + {next_state, ?STATE_DENIED, State#state{status = ?DEVICE_STATUS_OFFLINE}} + end; + +handle_event(info, {timeout, _, reload_ticker}, _, State) -> + erlang:start_timer(?RELOAD_TICKER + rand:uniform(10) * 1000, self(), reload_ticker), + reload_database(State). + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, StateName, #state{device_uuid = DeviceUUID}) -> + lager:notice("[iot_device] device_uuid: ~p, state_name: ~p, terminate with reason: ~p", [DeviceUUID, StateName, Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +reload_database(State = #state{device_uuid = DeviceUUID}) -> + case device_bo:get_device_by_uuid(DeviceUUID) of + {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"id">> := DeviceId}} -> + StateName = case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of + false -> ?STATE_DENIED; + true -> ?STATE_ACTIVATED + end, + {next_state, StateName, State#state{device_id = DeviceId, status = Status}}; + undefined -> + lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), + {keep_state, State} + end. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index b63e2c2..a519935 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,7 +21,7 @@ -define(HOST_AUTHED, 1). %% API --export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). +-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2, reload_device/2, delete_device/2, auth_device/3]). -export([get_metric/1, publish_message/4, get_aes/1]). -export([create_session/2, attach_channel/2]). @@ -32,6 +32,8 @@ host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), + %% 建立到和device之间的映射关系 + device_map = #{}, %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), @@ -65,6 +67,20 @@ handle(Pid, Packet) when is_pid(Pid) -> reload(Pid) when is_pid(Pid) -> gen_statem:call(Pid, reload). +%% 重新加载主机的基本信息 +-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. +reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> + gen_statem:call(Pid, {reload_device, DeviceUUID}). + +%% 重新加载主机的基本信息 +-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. +delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> + gen_statem:call(Pid, {delete_device, DeviceUUID}). + +-spec auth_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}. +auth_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) -> + gen_statem:call(Pid, {auth_device, DeviceUUID, Auth}). + -spec get_aes(Pid :: pid()) -> {ok, Aes :: binary()}. get_aes(Pid) when is_pid(Pid) -> gen_statem:call(Pid, get_aes). @@ -130,7 +146,15 @@ init([UUID]) -> true -> activated end, - {ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}}; + %% 启动所有的device + {ok, Devices} = device_bo:get_host_devices(HostId), + lager:debug("[iot_host] uuid: ~p, loaded devices: ~p", [UUID, Devices]), + DeviceMap = lists:foldl(fun(#{<<"device_uuid">> := DeviceUUID}, M) -> + {ok, Pid} = iot_device:start_link(self(), DeviceUUID), + maps:put(DeviceUUID, Pid, M) + end, #{}, Devices), + + {ok, StateName, #state{host_id = HostId, device_map = DeviceMap, uuid = UUID, aes = Aes}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -231,39 +255,56 @@ handle_event({call, From}, {create_session, PubKey}, StateName, State = #state{u {next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; +%% 处理设备相关 +handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) -> + case maps:get(DeviceUUID, DeviceMap, undefined) of + undefined -> + {ok, Pid} = iot_device:start_link(self(), DeviceUUID), + NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap), + {keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]}; + DevicePid -> + iot_device:reload(DevicePid), + {keep_state, State, [{reply, From, ok}]} + end; + +handle_event({call, From}, {auth_device, DeviceUUID, Auth}, _, State = #state{device_map = DeviceMap}) -> + case maps:get(DeviceUUID, DeviceMap, undefined) of + undefined -> + {ok, Pid} = iot_device:start_link(self(), DeviceUUID), + iot_device:auth(Pid, Auth), + + NDeviceMap = maps:put(DeviceUUID, Pid, DeviceMap), + {keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]}; + DevicePid -> + iot_device:auth(DevicePid, Auth), + {keep_state, State, [{reply, From, ok}]} + end; + +handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{uuid = UUID, device_map = DeviceMap}) -> + case maps:take(DeviceUUID, DeviceMap) of + error -> + lager:notice("[iot_host] uuid: ~p, delete device: ~p, not found", [UUID, DeviceUUID]), + {keep_state, State, [{reply, From, ok}]}; + {DevicePid, NDeviceMap} -> + iot_device:stop(DevicePid), + {keep_state, State#state{device_map = NDeviceMap}, [{reply, From, ok}]} + end; + %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 -handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) -> +handle_event(cast, {handle, {data, Data}}, session, State = #state{aes = AES}) -> PlainData = iot_cipher_aes:decrypt(AES, Data), case catch jiffy:decode(PlainData, [return_maps]) of - Info = #{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags} when is_binary(ServiceName) -> - %% 查找终端设备对应的点位信息 - RouterUUID = router_uuid(Info, UUID), - lager:debug("[iot_host] host: ~p, router_uuid: ~p, get data: ~p", [UUID, RouterUUID, Info]), - case mnesia_kv:hget(RouterUUID, <<"location_code">>) of - none -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, not found", [UUID, RouterUUID]); - {error, Reason} -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]); - {ok, LocationCode} -> - iot_router:route(LocationCode, FieldsList, Timestamp) - end, - - %% 数据写入influxdb - NTags = with_device_uuid(Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, Info), - - %% 按照设备的uuid进行分组 - Points = lists:map(fun(Fields) -> - NFields = convert_fields(Fields), - influx_point:new(RouterUUID, NTags, NFields, Timestamp) - end, FieldsList), - Precision = influx_client:get_precision(Timestamp), - - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"metric">>, <<"nannong">>, Precision, Points) end); + Info when is_map(Info) -> + handle_data(Info, State); Other -> lager:debug("[iot_host] the data is invalid json: ~p", [Other]) end, {keep_state, State}; +%% 其他情况丢弃数据 +handle_event(cast, {handle, {data, _}}, _, State) -> + {keep_state, State}; + %% 任意状态下都可以ping handle_event(cast, {handle, {ping, CipherMetric}}, _, State = #state{uuid = UUID, aes = AES}) -> MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), @@ -335,8 +376,21 @@ handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{a end, {keep_state, State}; -%% 其他情况丢弃数据 -handle_event(cast, {handle, {data, _}}, _, State) -> +handle_event(cast, {handle, {event, Event0}}, session, State = #state{aes = AES, device_map = DeviceMap}) -> + EventText = iot_cipher_aes:decrypt(AES, Event0), + case catch jiffy:decode(EventText, [return_maps]) of + #{<<"event_type">> := ?EVENT_DEVICE, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} -> + case maps:get(DeviceUUID, DeviceMap, undefined) of + undefined -> + ok; + Pid -> + iot_device:change_status(Pid, Status) + end; + Event when is_map(Event) -> + lager:warning("[iot_host] event: ~p, not supported", [Event]); + Other -> + lager:warning("[iot_host] event error: ~p", [Other]) + end, {keep_state, State}; %% 当websocket断开的时候,则设置主机状态为下线状态; 主机的状态需要转换 @@ -352,6 +406,25 @@ handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State {keep_state, State#state{channel_pid = undefined}} end; +%% 处理device进程的退出 +handle_event(info, {'EXIT', Pid, Reason}, StateName, State = #state{uuid = UUID, device_map = DeviceMap}) -> + lager:warning("[iot_host] uuid: ~p, device pid: ~p, exit with reason: ~p, state name: ~p, state: ~p", [UUID, Pid, Reason, StateName, State]), + case lists:search(fun({_, Pid0}) -> Pid =:= Pid0 end, maps:to_list(DeviceMap)) of + false -> + {keep_state, State}; + {value, {DeviceUUID, _}} -> + %% 重启device进程不一定能成功,可能重启时,数据库已经删除了 + case iot_device:start_link(self(), DeviceUUID) of + {ok, DevicePid} -> + %% 采用的是同样的DeviceUUID,因此老的值会被覆盖 + NDeviceMap = maps:put(DeviceUUID, DevicePid, DeviceMap), + {keep_state, State#state{device_map = NDeviceMap}}; + Error -> + lager:warning("[iot_host] uuid: ~p, restart device: ~p, get error: ~p", [UUID, DeviceUUID, Error]), + {keep_state, State} + end + end; + handle_event(EventType, EventContent, StateName, State) -> lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]), @@ -376,19 +449,33 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -convert_fields(#{<<"key">> := Key, <<"value">> := Value, <<"unit">> := Unit}) -> - #{Key => jiffy:encode(#{<<"value">> => Value, <<"unit">> => Unit}, [force_utf8])}; -convert_fields(#{<<"key">> := Key, <<"value">> := Value}) -> - #{Key => #{<<"value">> => Value}}. +%% 处理相关数据 +handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID, device_map = DeviceMap}) + when is_binary(DeviceUUID), DeviceUUID /= <<>> -> -%% 获取到分发的路由 -router_uuid(#{<<"device_uuid">> := DeviceUUID}, _) when is_binary(DeviceUUID), DeviceUUID /= <<>> -> - DeviceUUID; -router_uuid(_, UUID) -> - UUID. + case maps:get(DeviceUUID, DeviceMap, undefined) of + undefined -> + ok; + Pid -> + case iot_device:is_activated(Pid) of + true -> + %% 查找终端设备对应的点位信息 + iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), --spec with_device_uuid(Tags :: #{}, Info :: #{}) -> #{}. -with_device_uuid(Tags, #{<<"device_uuid">> := DeviceUUID}) when DeviceUUID /= <<>> -> - Tags#{<<"device_uuid">> => DeviceUUID}; -with_device_uuid(Tags, _) -> - Tags. \ No newline at end of file + %% 数据写入influxdb + NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID}, + influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp), + + iot_device:change_status(Pid, 1); + false -> + lager:notice("[iot_host] uuid: ~p, device_uuid: ~p, device is not activated", [UUID, DeviceUUID]) + end + end; + +handle_data(#{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags}, #state{uuid = UUID}) -> + %% 查找终端设备对应的点位信息 + iot_router:route_uuid(UUID, FieldsList, Timestamp), + + %% 数据写入influxdb + NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, + influx_client:write_data(UUID, NTags, FieldsList, Timestamp). \ No newline at end of file diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index 31edf2e..bc605c4 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -11,7 +11,19 @@ -include("iot.hrl"). %% API --export([route/3]). +-export([route/3, route_uuid/3]). + +-spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) -> + %% 查找终端设备对应的点位信息 + case mnesia_kv:hget(RouterUUID, <<"location_code">>) of + none -> + lager:debug("[iot_host] the north_data hget location_code, uuid: ~p, not found", [RouterUUID]); + {error, Reason} -> + lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]); + {ok, LocationCode} -> + route(LocationCode, Fields, Timestamp) + end. -spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok. route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) -> diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index aa121c3..b6be843 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -100,6 +100,10 @@ websocket_handle({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {event, CipherEvent}), + {ok, State}; + %% 主机端的消息响应 websocket_handle({binary, <>}, State = #state{uuid = UUID, inflight = Inflight}) when PacketId > 0 -> lager:debug("[ws_channel] uuid: ~p, get publish response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), diff --git a/docs/websocket.md b/docs/websocket.md index 4d288a6..2f2396a 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -67,12 +67,25 @@ Body: 公钥信息 PacketId: 4字节整数, 值为0; Body: 公钥信息 -### 主机上传终端设备的在线或离线事件 -<<0x01, PacketId:4, 0x07, DeviceUUID:32/binary, Status:1>> +### 主机上传终端设备的相关事件 +<<0x01, PacketId:4, 0x07, Body:任意长度>> PacketId: 4字节整数, 值为0; -DeviceUUID: 设备32位UUID -Status: 1字节整数,1表示在线,0表示离线 +Body: 事件内容,AES加密 + +```text + +设备的离在线状态 + +{ + "event_type": 1, + "params": { + "device_uuid": "", + "status": 0 // 1在线, 0离线 + } +} + +``` ### data北向数据上传 (无响应)