iot_cloud/apps/iot/src/iot_host.erl
2025-05-07 22:14:29 +08:00

487 lines
22 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 22. 9月 2023 16:38
%%%-------------------------------------------------------------------
-module(iot_host).
-author("aresei").
-include("iot.hrl").
-include("message_pb.hrl").
-behaviour(gen_statem).
%% 心跳包检测时间间隔, 15分钟检测一次
-define(HEARTBEAT_INTERVAL, 900 * 1000).
%% 状态
-define(STATE_DENIED, denied).
-define(STATE_ACTIVATED, activated).
%% API
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
-export([get_metric/1, get_status/1]).
-export([publish_directive/4, send_directive/3]).
-export([attach_channel/2]).
-export([reload_device/2, delete_device/2, activate_device/3]).
-export([heartbeat/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
-record(state, {
host_id :: integer(),
%% 从数据库里面读取到的数据
uuid :: binary(),
has_session = false :: boolean(),
%% 心跳计数器
heartbeat_counter = 0 :: integer(),
%% websocket相关
channel_pid :: undefined | pid(),
%% 设备的关系, #{device_uuid => Device}
device_map = #{},
%% 主机的相关信息
metrics = #{} :: map()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(UUID :: binary()) -> undefined | pid().
get_pid(UUID) when is_binary(UUID) ->
Name = get_name(UUID),
whereis(Name).
-spec get_name(UUID :: binary()) -> atom().
get_name(UUID) when is_binary(UUID) ->
binary_to_atom(<<"iot_host:", UUID/binary>>).
-spec get_alias_name(HostId :: integer()) -> atom().
get_alias_name(HostId0) when is_integer(HostId0) ->
HostId = integer_to_binary(HostId0),
binary_to_atom(<<"iot_host_id:", HostId/binary>>).
%% 处理消息
-spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return().
handle(Pid, Packet) when is_pid(Pid) ->
gen_statem:cast(Pid, {handle, Packet}).
-spec get_status(Pid :: pid()) -> {ok, Status :: map()}.
get_status(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_status).
%% 激活主机, true 表示激活; false表示关闭激活
-spec activate(Pid :: pid(), Auth :: boolean()) -> ok.
activate(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
gen_statem:call(Pid, {activate, Auth}).
-spec get_metric(Pid :: pid()) -> {ok, MetricInfo :: map()}.
get_metric(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_metric).
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_statem:call(Pid, {attach_channel, ChannelPid}).
-spec publish_directive(Pid :: pid(), Topic :: binary(), Content :: binary(), Timeout :: integer()) ->
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
publish_directive(Pid, Topic, Content, Timeout) when is_pid(Pid), is_binary(Topic), is_binary(Content), is_integer(Timeout) ->
case gen_statem:call(Pid, {publish_directive, self(), Topic, Content}) of
{ok, Ref} ->
receive
{ws_response, Ref} ->
ok;
{ws_response, Ref, Response} ->
{ok, Response}
after Timeout ->
{error, timeout}
end;
{error, Reason} ->
{error, Reason}
end.
-spec send_directive(Pid :: pid(), Topic :: binary(), Content :: binary()) ->
ok | {error, Reason :: any()}.
send_directive(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) ->
gen_statem:call(Pid, {send_directive, Topic, Content}).
%% 设备管理相关
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
gen_statem:call(Pid, {reload_device, DeviceUUID}).
-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok.
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
gen_statem:call(Pid, {delete_device, DeviceUUID}).
-spec activate_device(Pid :: pid(), DeviceUUID :: binary(), Auth :: boolean()) -> ok | {error, Reason :: any()}.
activate_device(Pid, DeviceUUID, Auth) when is_pid(Pid), is_binary(DeviceUUID), is_boolean(Auth) ->
gen_statem:call(Pid, {activate_device, DeviceUUID, Auth}).
-spec heartbeat(Pid :: pid()) -> no_return().
heartbeat(undefined) ->
ok;
heartbeat(Pid) when is_pid(Pid) ->
gen_statem:cast(Pid, heartbeat).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
gen_statem:start_link({local, Name}, ?MODULE, [UUID], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([UUID]) ->
case host_bo:get_host_by_uuid(UUID) of
{ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} ->
%% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid
AliasName = get_alias_name(HostId),
global:register_name(AliasName, self()),
%% 心跳检测机制
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
StateName = case AuthorizeStatus =:= 1 of
true -> ?STATE_ACTIVATED;
false -> ?STATE_DENIED
end,
%% 加载设备信息
{ok, DeviceItems} = device_bo:get_host_devices(HostId),
Devices = lists:flatmap(fun(DeviceUUID) ->
case iot_device:new(DeviceUUID) of
error ->
[];
{ok, Device} ->
[{DeviceUUID, Device}]
end
end, DeviceItems),
{ok, StateName, #state{host_id = HostId, uuid = UUID, device_map = maps:from_list(Devices), has_session = false}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore
end.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event({call, From}, get_metric, _, State = #state{metrics = Metrics}) ->
{keep_state, State, [{reply, From, {ok, Metrics}}]};
%% 获取主机的状态
handle_event({call, From}, get_status, _, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) ->
HasChannel = (ChannelPid /= undefined),
Reply = #{
<<"has_channel">> => HasChannel,
<<"has_session">> => HasSession,
<<"heartbeat_counter">> => HeartbeatCounter,
<<"metrics">> => Metrics
},
{keep_state, State, [{reply, From, {ok, Reply}}]};
%% 只要channel存在就负责将消息推送到边缘端主机
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) when is_binary(Command), is_pid(ChannelPid) ->
%% 通过websocket发送请求
lager:debug("[iot_host] host: ~p, will publish message: ~p", [UUID, Command]),
Ref = tcp_channel:publish(ChannelPid, ReceiverPid, <<CommandType:8, Command/binary>>),
{keep_state, State, [{reply, From, {ok, Ref}}]};
handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
%% 发送指令时, 指令要通过aes加密必须要求session是存在的
handle_event({call, From}, {publish_directive, ReceiverPid, Topic, Content}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
lager:debug("[iot_host] host: ~p, publish_directive to topic: ~p, content: ~p", [UUID, Topic, Content]),
BinTopicMessage = message_pb:encode_msg(#topic_message{topic = Topic, content = Content}),
%% 通过websocket发送请求
Ref = tcp_channel:publish(ChannelPid, ReceiverPid, <<16:8, BinTopicMessage/binary>>),
{keep_state, State, [{reply, From, {ok, Ref}}]};
%% 其他情况下,发送指令是失败的
handle_event({call, From}, {publish_directive, _, Topic, Content}, _, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] uuid: ~p, publish_directive to topic: ~p, content: ~p, invalid state: ~p", [UUID, Topic, Content, state_map(State)]),
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
%% 发送指令时!! 指令要明确发送到的目标device_uuid, 因为指令内容采用了json的格式
handle_event({call, From}, {send_directive, Topic, Content}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
BinTopicMessage = message_pb:encode_msg(#topic_message{topic = Topic, content = Content}),
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Content]),
%% 通过websocket发送请求
tcp_channel:send(ChannelPid, <<16:8, BinTopicMessage/binary>>),
{keep_state, State, [{reply, From, ok}]};
%% 其他情况下,发送指令是失败的
handle_event({call, From}, {send_directive, Topic, Content}, _, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] host_uuid: ~p, send_directive to topic: ~p, content: ~p, invalid state: ~p",
[UUID, Topic, Content, state_map(State)]),
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
%% 激活主机
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) of
true ->
BinReply = message_pb:encode_msg(#activate_push{auth = true}),
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
tcp_channel:send(ChannelPid, <<8:8, BinReply/binary>>);
false ->
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID])
end,
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
%% 关闭授权
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) of
true ->
BinReply = message_pb:encode_msg(#activate_push{auth = false}),
tcp_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
tcp_channel:stop(ChannelPid, closed);
false ->
lager:debug("[iot_host] uuid: ~p, activate: false, no channel", [UUID])
end,
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
%% 绑定channel
handle_event({call, From}, {attach_channel, ChannelPid}, StateName, State = #state{uuid = UUID, channel_pid = undefined}) ->
case StateName of
?STATE_ACTIVATED ->
erlang:monitor(process, ChannelPid),
%% 更新主机为在线状态
{ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE),
report_event(UUID, ?HOST_ONLINE),
lager:debug("[iot_host] host_id(attach_channel) uuid: ~p, will change status, affected_row: ~p", [UUID, AffectedRow]),
{keep_state, State#state{channel_pid = ChannelPid, has_session = true}, [{reply, From, ok}]};
%% 主机未激活
?STATE_DENIED ->
lager:notice("[iot_host] attach_channel host_id uuid: ~p, channel: ~p, host inactivated", [UUID, ChannelPid]),
erlang:monitor(process, ChannelPid),
{keep_state, State#state{channel_pid = ChannelPid}, [{reply, From, {denied, <<"host inactivated">>}}]}
end;
%% 已经绑定了channel
handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, channel_pid = OldChannelPid}) ->
lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]),
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
%% 重新加载设备信息
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
case maps:find(DeviceUUID, DeviceMap) of
error ->
{keep_state, State, [{reply, From, {error, <<"device not found">>}}]};
{ok, Device} ->
case iot_device:reload(Device) of
error ->
{keep_state, State#state{device_map = maps:remove(Device, DeviceMap)}, [{reply, From, {error, <<"reload device error">>}}]};
{ok, NDevice} ->
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}, [{reply, From, ok}]}
end
end;
%% 删除设备
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
{keep_state, State#state{device_map = maps:remove(DeviceUUID, DeviceMap)}, [{reply, From, ok}]};
%% 激活设备
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{device_map = DeviceMap}) ->
case maps:find(DeviceUUID, DeviceMap) of
error ->
{keep_state, State, [{reply, From, {error, <<"device not found">>}}]};
{ok, Device} ->
NDevice = iot_device:auth(Device, Auth),
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}, [{reply, From, ok}]}
end;
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props服务端暂时未用到
handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
%handle_data(Data, State),
{keep_state, State};
%% ping的数据是通过aes加密后的因此需要在有会话的情况下才行
handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
handle_event(cast, {handle, {inform, Inform}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, host_id = HostId, has_session = true}) ->
#service_inform{service_id = ServiceId, status = Status, timestamp = Timestamp} = Inform,
lager:debug("[iot_host] host: ~p, service infos is: ~p", [UUID, Inform]),
%% props 主机id:场景id:微服务id
{keep_state, State};
handle_event(cast, {handle, {feedback_result, FeedbackResult}}, ?STATE_ACTIVATED, State = #state{has_session = true}) ->
#feedback_phase{task_id = TaskId} = FeedbackResult,
{keep_state, State};
handle_event(cast, {handle, {event, Event}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] uuid: ~p, get event: ~p", [UUID, Event]),
#event{service_id = ServiceId, event_type = EventType, params = Params} = Event,
%DevicePid = iot_device:get_pid(DeviceUUID),
%iot_device:change_status(DevicePid, Status),
{keep_state, State};
handle_event(cast, {handle, {ai_event, AIEventPB}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
#ai_event{service_id = ServiceId, event_type = EventType, params = Params} = message_pb:decode_msg(AIEventPB, ai_event),
lager:debug("[iot_host] uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
{keep_state, State};
%% 心跳机制
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};
%% 没有收到心跳包,主机下线, 设备状态不变
handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID, heartbeat_counter = 0, channel_pid = ChannelPid}) ->
lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]),
{ok, #{<<"status">> := Status}} = host_bo:get_host_by_uuid(UUID),
case Status of
?HOST_NOT_JOINED ->
lager:debug("[iot_host] host: ~p, host_maybe_offline, host not joined, can not change to offline", [UUID]);
?HOST_OFFLINE ->
lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]);
?HOST_ONLINE ->
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE),
report_event(UUID, ?HOST_OFFLINE)
end,
%% 关闭channel主机需要重新连接才能保存状态的一致
is_pid(ChannelPid) andalso tcp_channel:stop(ChannelPid, closed),
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
{keep_state, State#state{channel_pid = undefined, has_session = false, heartbeat_counter = 0}};
%% 其他情况下需要重置系统计数器
handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{}) ->
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
{keep_state, State#state{heartbeat_counter = 0}};
%% 当websocket断开的时候主机的状态不一定改变主机的状态改变通过心跳机制会话状态需要改变
handle_event(info, {'DOWN', _Ref, process, ChannelPid, Reason}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
lager:warning("[iot_host] uuid: ~p, channel: ~p, down with reason: ~p, has_session: ~p, state: ~p", [UUID, ChannelPid, Reason, HasSession, State]),
{keep_state, State#state{channel_pid = undefined, has_session = false}};
handle_event(info, {'DOWN', _Ref, process, Pid, Reason}, _, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] uuid: ~p, process_pid: ~p, down with reason: ~p, state: ~p", [UUID, Pid, Reason, State]),
{keep_state, State};
handle_event(Event, Info, StateName, State = #state{uuid = UUID}) ->
lager:warning("[iot_host] host: ~p, event: ~p, unknown message: ~p, state_name: ~p, state: ~p", [UUID, Event, Info, StateName, state_map(State)]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, _State = #state{uuid = UUID, has_session = HasSession}) ->
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 处理相关数据
%handle_data(#data{device_uuid = DeviceUUID, service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID, device_map = DeviceMap})
% when is_binary(DeviceUUID), DeviceUUID /= <<>> ->
%
% case maps:find(DeviceUUID, DeviceMap) of
%
% end,
%
%
% case iot_device:get_pid(DeviceUUID) of
% undefined ->
% lager:warning("[iot_host] host uuid: ~p, device uuid: ~p not found, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags]),
% ok;
% DevicePid when is_pid(DevicePid) ->
% case iot_device:is_activated(DevicePid) of
% true ->
% %% 查找终端设备对应的点位信息
% iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
%
% %% 数据写入influxdb
% NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID},
% influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
%
% iot_device:change_status(DevicePid, ?DEVICE_ONLINE);
% false ->
% lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])
% end
% end;
%
%handle_data(#data{service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID}) ->
% %% 查找终端设备对应的点位信息
% iot_router:route_uuid(UUID, FieldsList, Timestamp),
%
% %% 数据写入influxdb
% NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName},
% influx_client:write_data(UUID, NTags, FieldsList, Timestamp).
-spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return().
report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
TextMap = #{
0 => <<"离线"/utf8>>,
1 => <<"在线"/utf8>>
},
%% 设备的状态信息上报给中电
Timestamp = iot_util:timestamp_of_seconds(),
FieldsList = [#{
<<"key">> => <<"host_status">>,
<<"value">> => NewStatus,
<<"value_text">> => maps:get(NewStatus, TextMap),
<<"unit">> => 0,
<<"type">> => <<"DI">>,
<<"name">> => <<"主机状态"/utf8>>,
<<"timestamp">> => Timestamp
}],
iot_router:route_uuid(UUID, FieldsList, Timestamp),
lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]).
%% 将当前的state转换成map
state_map(#state{host_id = HostId, uuid = UUID, has_session = HasSession, heartbeat_counter = HeartbeatCounter, channel_pid = ChannelPid, metrics = Metrics}) ->
#{
host_id => HostId,
uuid => UUID,
has_session => HasSession,
heartbeat_counter => HeartbeatCounter,
channel_pid => ChannelPid,
metrics => Metrics
}.