iot/apps/iot/src/iot_device.erl
2025-01-03 11:37:51 +08:00

405 lines
18 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%% 1. 终端是否授权 => 1: 授权0: 未授权
%%%
%%% @end
%%% Created : 11. 7月 2024 11:33
%%%-------------------------------------------------------------------
-module(iot_device).
-author("anlicheng").
-include("iot.hrl").
-behaviour(gen_server).
%% API
-export([get_name/1, get_pid/1, serialize/1]).
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/6]).
-export([ai_event/3, handle_data/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
device_uuid :: binary(),
%% 用来保存数据作为存储在influxdb里面的数据的备份; 数据是按照<$field>来分组的, 格式: {field => queue:queue()}
metrics = #{},
%% 最大数据缓存量
cache_size = 200,
%% 设备是否授权
auth_status :: integer(),
%% 事件触发周期
ai_event_throttle = #{},
%% 记录事件的ttl值
ai_event_ttl = #{},
%% 设备类型
model_id = 0,
status = ?DEVICE_OFFLINE
}).
%%%===================================================================
%%% API
%%%===================================================================
%% 格式化efka上传的数据格式
%"fields": [
%{
% "key": "test"
% "value": 124,
% "unit": "U",
% "type": "AI遥测值DI遥信值SOE事件",
% "timestamp": int
%}
%],
%% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的
-spec serialize(FieldsList :: [map()]) -> [{Key :: binary(), Values :: map()}].
serialize(FieldsList) when is_list(FieldsList) ->
lists:flatmap(fun serialize0/1, FieldsList).
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
Values = maps:remove(<<"key">>, Fields),
%S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
%[<<"base64:", S/binary>>];
[{Key, Values}];
serialize0(_) ->
[].
-spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}.
is_alive(DeviceUUID) when is_binary(DeviceUUID) ->
case iot_device:get_pid(DeviceUUID) of
undefined ->
error;
DevicePid when is_pid(DevicePid) ->
case iot_device:is_activated(DevicePid) of
true ->
{ok, DevicePid};
false ->
error
end
end.
-spec get_pid(DeviceUUID :: binary()) -> Pid :: pid() | undefined.
get_pid(DeviceUUID) when is_binary(DeviceUUID) ->
whereis(get_name(DeviceUUID)).
-spec get_name(DeviceUUID :: binary()) -> atom().
get_name(DeviceUUID) when is_binary(DeviceUUID) ->
binary_to_atom(<<"iot_device:", DeviceUUID/binary>>).
-spec is_activated(Pid :: pid() | undefined) -> boolean().
is_activated(undefined) ->
false;
is_activated(Pid) when is_pid(Pid) ->
gen_server:call(Pid, is_activated).
-spec change_status(Pid :: pid() | undefined, NewStatus :: integer()) -> no_return().
change_status(undefined, _) ->
ok;
change_status(Pid, NewStatus) when is_pid(Pid), is_integer(NewStatus) ->
gen_server:cast(Pid, {change_status, NewStatus}).
-spec reload(Pid :: pid()) -> no_return().
reload(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, reload).
-spec auth(Pid :: pid(), Auth :: boolean()) -> no_return().
auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
gen_server:cast(Pid, {auth, Auth}).
-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return().
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
gen_server:cast(Pid, {data, DataList}).
-spec query(Pid :: pid(), Fields :: list(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [{Field :: binary(), #device_data{}}]}.
query(Pid, Fields, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_list(Fields), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 ->
gen_server:call(Pid, {query, Fields, Tags, StartTs, StopTs, Limit}).
-spec ai_event(Pid :: pid(), EventType :: integer(), Params :: map()) -> no_return().
ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_map(Params) ->
gen_server:cast(Pid, {ai_event, EventType, Params}).
-spec handle_data(Pid :: pid(), Fields :: list(), Timestamp :: integer()) -> no_return().
handle_data(Pid, Fields, Timestamp) when is_pid(Pid), is_list(Fields), is_integer(Timestamp) ->
gen_server:cast(Pid, {handle_data, Fields, Timestamp}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), DeviceUUID :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) ->
gen_server:start_link({local, Name}, ?MODULE, [DeviceUUID], []);
start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) ->
gen_server:start_link({local, Name}, ?MODULE, [DeviceInfo], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([DeviceUUID]) when is_binary(DeviceUUID) ->
case device_bo:get_device_by_uuid(DeviceUUID) of
{ok, DeviceInfo} ->
init([DeviceInfo]);
undefined ->
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
ignore
end;
init([#{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status, <<"model_id">> := ModelId}]) ->
{ok, CacheSize} = application:get_env(iot, device_cache_size),
{ok, AiEventThrottle} = application:get_env(iot, ai_event_throttle),
{ok, #state{device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, cache_size = CacheSize,
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 判断当前设备是否是激活状态
handle_call(is_activated, _From, State = #state{auth_status = AuthStatus}) ->
{reply, AuthStatus =:= 1, State};
%% 查询当前设备中产生的数据,缓存在内存中
handle_call({query, Fields, Tags, StartTs, StopTs, Limit}, _From, State = #state{metrics = Metrics}) ->
LMetrics = maps:to_list(Metrics),
NLMetrics = case length(Fields) > 0 of
true ->
lists:filter(fun({F0, _}) -> lists:member(F0, Fields) end, LMetrics);
false ->
LMetrics
end,
DataList = lists:map(fun({Field, Q}) ->
L = queue:to_list(Q),
%% 过滤
L1 = filter_timestamp(L, StartTs, StopTs),
L2 = filter_tags(L1, Tags),
%% 处理数据截取
L3 = lists:reverse(L2),
L4 = case Limit =:= 0 of
true ->
L3;
false ->
lists:sublist(L3, 1, Limit)
end,
{Field, L4}
end, NLMetrics),
{reply, {ok, DataList}, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({change_status, ?DEVICE_ONLINE}, State = #state{status = ?DEVICE_ONLINE}) ->
{noreply, State};
handle_cast({change_status, ?DEVICE_ONLINE}, State = #state{device_uuid = DeviceUUID}) ->
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE),
report_event(DeviceUUID, ?DEVICE_ONLINE),
{noreply, State#state{status = ?DEVICE_ONLINE}};
handle_cast({change_status, ?DEVICE_OFFLINE}, State = #state{device_uuid = DeviceUUID}) ->
{ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID),
case Status of
?DEVICE_NOT_JOINED ->
lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]),
{noreply, State#state{status = ?DEVICE_NOT_JOINED}};
?DEVICE_OFFLINE ->
lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]),
{noreply, State#state{status = ?DEVICE_OFFLINE}};
?DEVICE_ONLINE ->
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
report_event(DeviceUUID, ?DEVICE_OFFLINE),
{noreply, State#state{status = ?DEVICE_OFFLINE}}
end;
%% 重新加载数据库数据
handle_cast(reload, State = #state{device_uuid = DeviceUUID}) ->
lager:debug("[iot_device] will reload: ~p", [DeviceUUID]),
case device_bo:get_device_by_uuid(DeviceUUID) of
{ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} ->
{noreply, State#state{status = as_integer(Status), auth_status = as_integer(AuthorizeStatus)}};
undefined ->
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
{stop, normal, State}
end;
%% 向设备中追加数据
handle_cast({data, DataList}, State = #state{metrics = Metrics, cache_size = CacheSize}) ->
NMetrics = lists:foldl(fun(Data = #device_data{field = Field}, Metrics0) ->
Q0 = maps:get(Field, Metrics0, queue:new()),
NQ = case queue:len(Q0) > CacheSize of
true ->
{_, Q1} = queue:out(Q0),
queue:in(Data, Q1);
false ->
queue:in(Data, Q0)
end,
Metrics0#{Field => NQ}
end, Metrics, DataList),
{noreply, State#state{metrics = NMetrics}};
%% 处理授权
handle_cast({auth, true}, State = #state{device_uuid = DeviceUUID}) ->
lager:debug("[iot_device] device_uuid: ~p, auth: true", [DeviceUUID]),
{noreply, State#state{auth_status = 1}};
handle_cast({auth, false}, State = #state{device_uuid = DeviceUUID}) ->
lager:debug("[iot_device] device_uuid: ~p, auth: false", [DeviceUUID]),
{noreply, State#state{auth_status = 0}};
%% ai事件的延迟整流逻辑, 保证在间隔事件内只发送一次
handle_cast({ai_event, EventType, Params}, State = #state{device_uuid = DeviceUUID, ai_event_throttle = EventThrottle, ai_event_ttl = EventTTL}) ->
case maps:find(EventType, EventThrottle) of
{ok, Interval} ->
LastTimestamp = maps:get(EventType, EventTTL, 0),
Timestamp = iot_util:current_time(),
case Timestamp >= LastTimestamp + Interval of
true ->
lager:debug("[iot_device] device_uuid: ~p, ai_event triggered last_timestamp: ~p, current_timestamp: ~p, interval is: ~p",
[DeviceUUID, LastTimestamp, Timestamp, Interval]),
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
{noreply, State#state{ai_event_ttl = maps:put(EventType, Timestamp, EventTTL)}};
false ->
lager:debug("[iot_device] device_uuid: ~p, ai_event trigger less than interval: ~p, last_timestamp: ~p, current_timestamp: ~p",
[DeviceUUID, Interval, LastTimestamp, Timestamp]),
{noreply, State}
end;
error ->
lager:debug("[iot_device] device_uuid: ~p, ai_event type: ~p not limited", [DeviceUUID, EventType]),
iot_ai_router:route_uuid(DeviceUUID, EventType, Params),
{noreply, State}
end;
%% 设备数据转发
handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = DeviceUUID, model_id = ModelId}) ->
%% 查找终端设备对应的点位信息
case redis_client:hgetall(DeviceUUID) of
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
%% 智慧照明设备的数据需要归类到建筑物下面
case ModelId =:= 20 of
true ->
NLocationCode = extract_build_location_code(LocationCode),
NDynamicLocationCode = extract_build_location_code(DynamicLocationCode),
lager:debug("[iot_device] light device: ~p, location_code: ~p", [DeviceUUID, NLocationCode]),
iot_zd_endpoint:forward(NLocationCode, NDynamicLocationCode, Fields, Timestamp);
false ->
iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp)
end;
{ok, _} ->
lager:warning("[iot_device] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [DeviceUUID, Fields]);
{error, Reason} ->
lager:warning("[iot_device] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [DeviceUUID, Reason, Fields])
end,
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{device_uuid = DeviceUUID}) ->
lager:notice("[iot_device] device_uuid: ~p, terminate with reason: ~p", [DeviceUUID, Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec report_event(DeviceUUID :: binary(), NewStatus :: integer()) -> no_return().
report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewStatus) ->
TextMap = #{
0 => <<"离线"/utf8>>,
1 => <<"在线"/utf8>>
},
%% 设备的状态信息上报给中电
Timestamp = iot_util:timestamp_of_seconds(),
FieldsList = [#{
<<"key">> => <<"device_status">>,
<<"value">> => NewStatus,
<<"value_text">> => maps:get(NewStatus, TextMap),
<<"unit">> => 0,
<<"type">> => <<"DI">>,
<<"name">> => <<"设备状态"/utf8>>,
<<"timestamp">> => Timestamp
}],
case redis_client:hgetall(DeviceUUID) of
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, FieldsList, Timestamp);
{ok, _} ->
lager:warning("[iot_device] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [DeviceUUID, FieldsList]);
{error, Reason} ->
lager:warning("[iot_device] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [DeviceUUID, Reason, FieldsList])
end,
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
%% 过滤时间
-spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list().
filter_timestamp(L, 0, 0) ->
L;
filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs ->
[];
filter_timestamp(L, StartTs0, StopTs0) ->
%% 时间的比较统一到毫秒级别
StartTs = StartTs0 * 1000,
StopTs = StopTs0 * 1000,
lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L).
%% 过滤标签
-spec filter_tags(L :: list(), Tags :: map()) -> list().
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
L;
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
lists:filter(fun(#device_data{tags = Tags0}) ->
lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags))
end, L).
%% 转换成整数,从数据读取的数据有时候不一定都是整数
-spec as_integer(Val :: integer() | binary()) -> integer().
as_integer(Val) when is_integer(Val) ->
Val;
as_integer(Val) when is_binary(Val) ->
binary_to_integer(Val).
-spec extract_build_location_code(binary()) -> binary().
extract_build_location_code(<<BuildLocationCode:17/binary, Suffix/binary>>) ->
Len = byte_size(Suffix),
NSuffix = iolist_to_binary(lists:map(fun(_) -> <<"0">> end, lists:seq(1, Len))),
<<BuildLocationCode/binary, NSuffix/binary>>;
extract_build_location_code(Code) when is_binary(Code) ->
Code.