iot/apps/iot/src/iot_host.erl
2023-07-28 11:23:47 +08:00

392 lines
18 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 19. 6月 2023 10:32
%%%-------------------------------------------------------------------
-module(iot_host).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
-export([get_metric/1, publish_message/3, get_aes/1, rsa_encode/3]).
-export([has_session/1, create_session/2, attach_channel/2]).
%% gen_statem callbacks
-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
-record(state, {
host_id :: integer(),
%% 从数据库里面读取到的数据
uuid :: binary(),
%% 当前的状态
status :: integer(),
%% rsa公钥
pub_key = <<>> :: binary(),
%% aes的key, 后续通讯需要基于这个加密
aes = <<>> :: binary(),
%% websocket相关
channel_pid :: undefined | pid(),
monitor_ref :: undefined | reference(),
%% 主机的相关信息
metrics = #{} :: map()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(UUID :: binary()) -> undefined | pid().
get_pid(UUID) when is_binary(UUID) ->
Name = get_name(UUID),
whereis(Name).
-spec get_name(UUID :: binary()) -> atom().
get_name(UUID) when is_binary(UUID) ->
binary_to_atom(<<"iot_host:", UUID/binary>>).
%% 处理消息
-spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return().
handle(Pid, Packet) when is_pid(Pid) ->
gen_statem:cast(Pid, {handle, Packet}).
%% 重新加载主机的基本信息
-spec reload(Pid :: pid()) -> ok | {error, Reason :: any()}.
reload(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, reload).
-spec get_aes(Pid :: pid()) -> Aes :: binary().
get_aes(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_aes).
%% 激活主机, true 表示激活; false表示关闭激活
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
gen_statem:call(Pid, {activate, Auth}).
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
get_metric(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_metric).
-spec has_session(Pid :: pid()) -> boolean().
has_session(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, has_session).
-spec attach_channel(pid(), pid()) -> ok.
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_statem:call(Pid, {attach_channel, ChannelPid}).
-spec create_session(Pid :: pid(), PubKey :: binary()) -> {ok, Reply :: binary()}.
create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) ->
gen_statem:call(Pid, {create_session, PubKey}).
%% 基于rsa加密的指令都是不需要会话存在的
-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) ->
{ok, EncText :: binary()} | {error, Reason :: binary()}.
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}).
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}) ->
{ok, Command :: binary()} | {error, Reason :: any()}.
publish_message(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType) ->
gen_statem:call(Pid, {publish_message, self(), CommandType, Params}).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([UUID]) ->
case host_bo:get_host_by_uuid(UUID) of
{ok, #{<<"status">> := Status, <<"id">> := HostId}} ->
Aes = list_to_binary(iot_util:rand_bytes(32)),
StateName = case Status =:= ?HOST_STATUS_INACTIVE of
true ->
denied;
false ->
%% 重启时,认为主机是离线状态; 等待主机主动建立连接
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
activated
end,
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore
end.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or
%% (2) when gen_statem terminates abnormally.
%% This callback is optional.
format_status(_Opt, [_PDict, _StateName, _State]) ->
Status = some_term,
Status.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event({call, From}, get_metric, session, State = #state{metrics = Metrics}) ->
{keep_state, State, [{reply, From, {ok, Metrics}}]};
handle_event({call, From}, get_metric, _, State) ->
{keep_state, State, [{reply, From, {ok, #{}}}]};
handle_event({call, From}, get_aes, _, State = #state{aes = Aes}) ->
{keep_state, State, [{reply, From, {ok, Aes}}]};
handle_event({call, From}, has_session, StateName, State) ->
HasSession = (StateName =:= session) orelse false,
{keep_state, State, [{reply, From, HasSession}]};
%% 基于rsa加密
handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) ->
Reply = iot_cipher_rsa:encode(PlainText, PubKey),
{keep_state, State, [{reply, From, {ok, <<CommandType:8, Reply/binary>>}}]};
handle_event({call, From}, {rsa_encode, _, _}, _, State) ->
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
%% 发送普通格式的消息
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, session, State = #state{aes = AES, channel_pid = ChannelPid}) ->
SendCommand = case Command of
{aes, Command0} ->
iot_cipher_aes:encrypt(AES, Command0);
Command0 ->
Command0
end,
%% 通过websocket发送请求
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, SendCommand/binary>>),
{keep_state, State, [{reply, From, {ok, Ref}}]};
handle_event({call, From}, {publish_message, _, _, _}, _, State) ->
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
%% 重新加载主机信息
case host_bo:get_host_by_uuid(UUID) of
{ok, Host = #{<<"status">> := Status}} ->
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of
true ->
{next_state, activated, State#state{status = Status}, [{reply, From, ok}]};
false ->
{keep_state, State#state{status = Status}, [{reply, From, ok}]}
end;
undefined ->
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
{keep_state, State, [{reply, From, {error, <<"host not found">>}}]}
end;
%% 关闭授权
handle_event({call, From}, {activate, false}, _, State) ->
{next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]};
%% 开启授权
handle_event({call, From}, {activate, true}, denied, State) ->
{next_state, activated, State, [{reply, From, ok}]};
handle_event({call, From}, {activate, true}, _, State) ->
{keep_state, State, [{reply, From, ok}]};
%% 绑定channel
handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
lager:debug("[iot_host] attach_channel host_id uuid: ~p, channel: ~p", [UUID, ChannelPid]),
MRef = erlang:monitor(process, ChannelPid),
{keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]};
handle_event({call, From}, {attach_channel, ChannelPid}, _, State = #state{uuid = UUID, monitor_ref = MRef0, channel_pid = ChannelPid0}) when is_pid(ChannelPid0) ->
lager:debug("[iot_host] attach_channel host_id uuid: ~p, old channel: ~p replace with: ~p", [UUID, ChannelPid0, ChannelPid]),
%% 取消之前的monitor
erlang:demonitor(MRef0),
ws_channel:stop(ChannelPid0, closed),
%% 建立到新的channel的monitor
MRef = erlang:monitor(process, ChannelPid),
{keep_state, State#state{channel_pid = ChannelPid, monitor_ref = MRef}, [{reply, From, ok}]};
%% 授权通过后,才能将主机的状态设置为在线状态
handle_event({call, From}, {create_session, PubKey}, denied, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{keep_state, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
handle_event({call, From}, {create_session, PubKey}, _StateName, State = #state{uuid = UUID, aes = Aes}) ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => true, <<"aes">> => Aes},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_ONLINE),
{next_state, session, State#state{status = ?HOST_STATUS_ONLINE}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props服务端暂时未用到
handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
PlainData = iot_cipher_aes:decrypt(AES, Data),
case catch jiffy:decode(PlainData, [return_maps]) of
Info = #{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags} when is_binary(ServiceName) ->
%% 查找终端设备对应的点位信息
RouterUUID = router_uuid(Info, UUID),
lager:debug("[iot_host] host: ~p, router_uuid: ~p, get data: ~p", [UUID, RouterUUID, Info]),
case mnesia_kv:hget(RouterUUID, <<"location_code">>) of
none ->
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, not found", [UUID, RouterUUID]);
{error, Reason} ->
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]);
{ok, LocationCode} ->
iot_router:route(LocationCode, FieldsList)
end,
%% 数据写入influxdb
NTags = with_device_uuid(Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, Info),
%% 按照设备的uuid进行分组
Points = lists:map(fun(Fields) -> influx_point:new(RouterUUID, NTags, Fields, Timestamp) end, FieldsList),
Precision = influx_client:get_precision(Timestamp),
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end);
Other ->
lager:debug("[iot_host] the data is invalid json: ~p", [Other])
end,
{keep_state, State};
handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
case catch jiffy:decode(MetricsInfo, [return_maps]) of
Metrics when is_map(Metrics) ->
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
Other ->
lager:debug("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
{keep_state, State}
end;
handle_event(cast, {handle, {inform, Info0}}, session, State = #state{host_id = HostId, aes = AES}) ->
lager:debug("[iot_host] handler inform, aes: ~p", [AES]),
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
lager:debug("[iot_host] service infos is: ~p", [ServiceInforms]),
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
%% props 主机id:场景id:微服务id
[_, SceneId0, MicroId0] = binary:split(Props, <<":">>, [global]),
SceneId = binary_to_integer(SceneId0),
MicroId = binary_to_integer(MicroId0),
micro_inform_log:insert(#{
<<"host_id">> => HostId,
<<"scene_id">> => SceneId,
<<"service_name">> => Name,
<<"version">> => Version,
<<"version_copy">> => VersionCopy,
<<"status">> => Status,
<<"created_at">> => At
}),
micro_set_bo:change_status(HostId, SceneId, MicroId, Status)
end, ServiceInforms);
Error ->
lager:warning("[iot_host] inform get error: ~p", [Error])
end,
{keep_state, State};
handle_event(cast, {handle, {feedback_step, Info0}}, session, State = #state{aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
Data = #{<<"task_id">> := TaskId, <<"code">> := Code} ->
Result = scene_feedback_step:insert(#{
<<"task_id">> => TaskId,
<<"code">> => Code,
<<"created_at">> => iot_util:current_time()
}),
lager:debug("[iot_host] feedback_step info: ~p, insert result: ~p", [Data, Result]);
Other ->
lager:warning("[iot_host] feedback_step error: ~p", [Other])
end,
{keep_state, State};
handle_event(cast, {handle, {feedback_result, Info0}}, session, State = #state{aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
scene_feedback:insert(#{
<<"task_id">> => TaskId,
<<"task_type">> => Type,
<<"code">> => Code,
<<"reason">> => Reason,
<<"error">> => Error,
<<"created_at">> => Time
});
Other ->
lager:warning("[iot_host] feedback_result error: ~p", [Other])
end,
{keep_state, State};
%% 当websocket断开的时候则设置主机状态为下线状态; 主机的状态需要转换
handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State = #state{uuid = UUID, monitor_ref = Ref, channel_pid = ChannelPid}) ->
lager:warning("[iot_host] channel: ~p, down with reason: ~p, state name: ~p, state: ~p", [ChannelPid, Reason, StateName, State]),
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
%% 会话状态如果链接丢失需要切换到activated状态其他情况保持不变
case StateName =:= session of
true ->
{next_state, activated, State#state{status = ?HOST_STATUS_OFFLINE, channel_pid = undefined}};
false ->
{keep_state, State#state{status = ?HOST_STATUS_OFFLINE, channel_pid = undefined}}
end;
handle_event(EventType, EventContent, StateName, State) ->
lager:warning("[iot_host] unknown event_type: ~p, event: ~p, state name: ~p, state: ~p", [EventType, EventContent, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, _State = #state{uuid = UUID}) ->
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 获取到分发的路由
router_uuid(#{<<"device_uuid">> := DeviceUUID}, _) when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
DeviceUUID;
router_uuid(_, UUID) ->
UUID.
-spec with_device_uuid(Tags :: #{}, Info :: #{}) -> #{}.
with_device_uuid(Tags, #{<<"device_uuid">> := DeviceUUID}) when DeviceUUID /= <<>> ->
Tags#{<<"device_uuid">> => DeviceUUID};
with_device_uuid(Tags, _) ->
Tags.