ekfa/apps/efka/src/efka_agent.erl
2025-05-08 11:23:39 +08:00

387 lines
16 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 需要支持 云服务离线时候的数据暂存
%%% @end
%%% Created : 06. 5月 2025 00:01
%%%-------------------------------------------------------------------
-module(efka_agent).
-author("anlicheng").
-include("message_pb.hrl").
-include("efka_tables.hrl").
-include("efka.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([metric_data/3, event/3, ai_event/3, ping/13, feedback_phase/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% 标记当前agent的状态只有在 activated 状态下才可以正常的发送数据
-define(STATE_DENIED, denied).
-define(STATE_CONNECTING, connecting).
-define(STATE_AUTH, auth).
%% 不能推送消息到服务,但是可以接受服务器的部分指令
-define(STATE_RESTRICTED, restricted).
%% 激活状态下
-define(STATE_ACTIVATED, activated).
-record(state, {
transport_pid :: undefined | pid(),
status = ?STATE_DENIED,
%% 映射关系 #{Ref => PacketId}
inflight = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
%% 发送数据
-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), LineProtocolData :: binary()) -> no_return().
metric_data(ServiceId, DeviceUUID, LineProtocolData) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(LineProtocolData) ->
gen_server:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, LineProtocolData}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
-spec feedback_phase(TaskId :: integer(), Timestamp :: integer(), Phase :: binary()) -> no_return().
feedback_phase(TaskId, Timestamp, Phase) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase) ->
gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}).
-spec ai_event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
ai_event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {ai_event, ServiceId, EventType, Params}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
erlang:process_flag(trap_exit, true),
erlang:start_timer(0, self(), create_transport),
{ok, #state{status = ?STATE_DENIED}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 发送数据
handle_cast({metric_data, ServiceId, DeviceUUID, LineProtocolData}, State) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
device_uuid = DeviceUUID,
metric = LineProtocolData
}),
safe_send(?METHOD_DATA, Packet, State),
{noreply, State};
%% Event事件
handle_cast({event, ServiceId, EventType, Params}, State) ->
EventPacket = message_pb:encode_msg(#event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
safe_send(?METHOD_EVENT, EventPacket, State),
{noreply, State};
%% AiEvent事件
handle_cast({ai_event, ServiceId, EventType, Params}, State) ->
EventPacket = message_pb:encode_msg(#ai_event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
safe_send(?METHOD_AI_EVENT, EventPacket, State),
{noreply, State};
handle_cast({feedback_phase, TaskId, Timestamp, Phase}, State) ->
PhasePacket = message_pb:encode_msg(#feedback_phase{
task_id = TaskId,
timestamp = Timestamp,
phase = Phase
}),
safe_send(?METHOD_PHASE, PhasePacket, State),
{noreply, State};
%% 处理ping消息
handle_cast({ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces},
State = #state{status = Status, transport_pid = TransportPid}) ->
Ping = message_pb:encode_msg(#ping{
adcode = AdCode,
boot_time = BootTime,
province = Province,
city = City,
efka_version = EfkaVersion,
kernel_arch = KernelArch,
ips = Ips,
cpu_core = CpuCore,
cpu_load = CpuLoad,
cpu_temperature = CpuTemperature,
disk = Disk,
memory = Memory,
interfaces = Interfaces
}),
case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of
true ->
efka_transport:send(TransportPid, ?METHOD_PING, Ping);
false ->
ok
end,
{noreply, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_transport}, State = #state{status = ?STATE_DENIED}) ->
{ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
{ok, TransportPid} = efka_transport:start_link(self(), Host, Port),
efka_transport:connect(TransportPid),
{noreply, State#state{status = ?STATE_CONNECTING, transport_pid = TransportPid}};
%% 收到连接回复
handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, transport_pid = TransportPid}) when is_pid(TransportPid) ->
case Reply of
ok ->
efka_transport:auth_request(TransportPid, 5000),
{noreply, State#state{status = ?STATE_AUTH}};
{error, Reason} ->
efka_logger:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
efka_transport:stop(TransportPid),
{noreply, ?STATE_DENIED, State#state{status = ?STATE_DENIED}}
end;
%% 收到auth回复
handle_info({auth_reply, {ok, ReplyBin}}, State = #state{status = ?STATE_AUTH, transport_pid = TransportPid}) when is_pid(TransportPid) ->
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
case Code of
0 ->
lager:debug("[efka_agent] auth failed, message: ~p", [Message]),
%% 上传缓冲区里面的所有数据
CacheItems = cache_model:get_all_cache(),
lists:foreach(fun(#micro_cache{id = Id, method = Method, data = Packet}) ->
efka_transport:send(TransportPid, Method, Packet),
cache_model:delete(Id)
end, CacheItems),
{noreply, State#state{status = ?STATE_ACTIVATED}};
1 ->
%% 主机在后台的授权未通过此时agent不能推送数据给云端服务器但是云端服务器可以推送命令给agent
%% socket的连接状态需要维持
lager:debug("[efka_agent] auth denied, message: ~p", [Message]),
{noreply, State#state{status = ?STATE_RESTRICTED}};
2 ->
% 其他类型的错误,需要间隔时间重试
efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
_ ->
% 其他类型的错误,需要间隔时间重试
lager:debug("[efka_agent] auth failed, invalid message"),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}
end;
handle_info({auth_reply, {error, Reason}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) ->
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
%% 云端服务器推送了消息
%% 激活消息
%% 微服务部署
handle_info({server_push_message, PacketId, <<?METHOD_DEPLOY:8, DeployBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy),
Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of
ok ->
#efka_response{code = 1, message = <<"">>};
{error, Reason} when is_binary(Reason) ->
#efka_response{code = 1, message = Reason}
end,
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%% config.json配置信息
handle_info({server_push_message, PacketId, <<?METHOD_CONFIG:8, ParamsBin/binary>>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
#service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ParamsBin, service_config),
case efka_micro_service:get_pid(ServiceId) of
undefined ->
Reply = #efka_response {
code = 0,
message = <<"service not run">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),
efka_micro_service:push_config(ServicePid, Ref, ConfigJson),
%% 处理超时逻辑
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end,
{noreply, State};
%% 收到来自efka_micro_service的回复
handle_info({ems_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{PacketId, NInflight} ->
Reply = case EmsReply of
ok ->
#efka_response{code = 1, message = <<"">>};
{error, Reason} ->
#efka_response{code = 0, message = Reason}
end,
safe_response(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State#state{inflight = NInflight}}
end;
%% 请求超时逻辑处理
handle_info({timeout, _, {request_timeout, Ref}}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{PacketId, NInflight} ->
Reply = #efka_response{code = 0, message = <<"reqeust timeout">>},
safe_response(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State#state{inflight = NInflight}}
end;
%% TODO
handle_info({server_push_message, <<8:8, ActivatePush/binary>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
#activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push),
case {Auth, Status} of
{true, ?STATE_ACTIVATED} ->
{noreply, State};
{true, ?STATE_DENIED} ->
%% 重新激活, 需要重新校验
efka_transport:auth_request(TransportPid, 5000),
{noreply, State#state{status = ?STATE_AUTH}};
{false, _} ->
%% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息
{noreply, State#state{status = ?STATE_RESTRICTED}}
end;
%% 收到需要回复的指令
handle_info({server_push_message, PacketId, <<16:8, Directive/binary>>}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) ->
#topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, topic_message),
efka_logger:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]),
%% 消息发送到订阅系统
case PacketId > 0 of
true ->
CallbackFun = fun(Response) -> is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Response) end,
efka_subscription:publish(PacketId, Topic, Content, CallbackFun);
false ->
efka_subscription:publish(Topic, Content)
end,
{noreply, State};
%% transport进程退出
handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) ->
efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
erlang:start_timer(5000, self(), create_transport),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 安全回复
-spec safe_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return().
safe_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) ->
is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Reply);
safe_response(_PacketId, _Reply, #state{}) ->
ok.
%% 当连接正常的时候发送,否则暂存数据
-spec safe_send(Method :: integer(), Packet :: binary(), State :: #state{}) -> no_return().
safe_send(Method, Packet, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_pid(TransportPid) ->
efka_transport:send(TransportPid, Method, Packet);
safe_send(Method, Packet, #state{}) ->
ok = cache_model:insert(Method, Packet).