ekfa/apps/efka/src/efka_agent.erl
2025-05-06 22:54:30 +08:00

412 lines
17 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 需要支持 云服务离线时候的数据暂存
%%% @end
%%% Created : 06. 5月 2025 00:01
%%%-------------------------------------------------------------------
-module(efka_agent).
-author("anlicheng").
-include("message_pb.hrl").
-include("efka_tables.hrl").
-include("efka.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([metric_data/2, event/3, ai_event/3, ping/13]).
-export([feedback_phase/4, feedback_phase/5]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% 标记当前agent的状态只有在 activated 状态下才可以正常的发送数据
-define(STATE_DENIED, denied).
-define(STATE_CONNECTING, connecting).
-define(STATE_AUTH, auth).
%% 不能推送消息到服务,但是可以接受服务器的部分指令
-define(STATE_RESTRICTED, restricted).
%% 激活状态下
-define(STATE_ACTIVATED, activated).
-record(state, {
transport_pid :: undefined | pid(),
status = ?STATE_DENIED
}).
%%%===================================================================
%%% API
%%%===================================================================
%% 发送数据
metric_data(ServiceId, LineProtocolData) when is_binary(ServiceId), is_binary(LineProtocolData) ->
gen_server:cast(?SERVER, {metric_data, ServiceId, LineProtocolData}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
feedback_phase(TaskId, Timestamp, Phase, Code) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase), is_integer(Code) ->
gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase, Code, <<"">>}).
feedback_phase(TaskId, Timestamp, Phase, Code, Message) when is_integer(TaskId), is_integer(Timestamp), is_binary(Phase), is_integer(Code), is_binary(Message) ->
gen_server:cast(?SERVER, {feedback_phase, TaskId, Timestamp, Phase, Code, Message}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}).
-spec ai_event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
ai_event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {ai_event, ServiceId, EventType, Params}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
erlang:process_flag(trap_exit, true),
erlang:start_timer(0, self(), create_transport),
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 发送数据
handle_cast({metric_data, ServiceId, LineProtocolData}, State = #state{status = Status, transport_pid = TransportPid}) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
metric = LineProtocolData
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_DATA, Packet);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_DATA,
data = Packet
})
end,
{noreply, State};
%% Event事件
handle_cast({event, ServiceId, EventType, Params}, State = #state{status = Status, transport_pid = TransportPid}) ->
EventPacket = message_pb:encode_msg(#event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_EVENT, EventPacket);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_EVENT,
data = EventPacket
})
end,
{noreply, State};
%% AiEvent事件
handle_cast({ai_event, ServiceId, EventType, Params}, State = #state{status = Status, transport_pid = TransportPid}) ->
EventPacket = message_pb:encode_msg(#ai_event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_AI_EVENT, EventPacket);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_AI_EVENT,
data = EventPacket
})
end,
{noreply, State};
handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = #state{status = Status, transport_pid = TransportPid}) ->
PhasePacket = message_pb:encode_msg(#feedback_phase{
task_id = TaskId,
timestamp = Timestamp,
phase = Phase,
code = Code,
message = Message
}),
case Status =:= ?STATE_ACTIVATED of
true ->
efka_transport:send(TransportPid, ?METHOD_PHASE, PhasePacket);
false ->
ok = micro_cache_model:insert(#micro_cache {
id = micro_cache_model:next_id(),
method = ?METHOD_PHASE,
data = PhasePacket
})
end,
{noreply, State};
%% 处理ping消息
handle_cast({ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces},
State = #state{status = Status, transport_pid = TransportPid}) ->
Ping = message_pb:encode_msg(#ping{
adcode = AdCode,
boot_time = BootTime,
province = Province,
city = City,
efka_version = EfkaVersion,
kernel_arch = KernelArch,
ips = Ips,
cpu_core = CpuCore,
cpu_load = CpuLoad,
cpu_temperature = CpuTemperature,
disk = Disk,
memory = Memory,
interfaces = Interfaces
}),
Status =:= ?STATE_ACTIVATED andalso efka_transport:send(TransportPid, ?METHOD_PING, Ping),
{noreply, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_transport}, State = #state{status = ?STATE_DENIED}) ->
{ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
{ok, TransportPid} = efka_transport:start_link(self(), Host, Port),
efka_transport:connect(TransportPid),
{noreply, State#state{status = ?STATE_CONNECTING, transport_pid = TransportPid}};
handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, transport_pid = TransportPid}) ->
case Reply of
ok ->
efka_transport:auth_request(TransportPid, 5000),
{noreply, State#state{status = ?STATE_AUTH}};
{error, Reason} ->
efka_logger:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
efka_transport:stop(TransportPid),
{noreply, ?STATE_DENIED, State#state{status = ?STATE_DENIED}}
end;
handle_info({auth_reply, {ok, ReplyBin}}, State = #state{status = ?STATE_AUTH, transport_pid = TransportPid}) ->
#auth_reply{code = Code, message = Message, repository_url = RepositoryUrl} = message_pb:decode_msg(ReplyBin, auth_reply),
case Code of
0 ->
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
%% 上传缓冲区里面的所有数据
CacheItems = micro_cache_model:get_all_cache(),
lists:foreach(fun(#micro_cache{id = Id, method = Method, data = Packet}) ->
efka_transport:send(TransportPid, Method, Packet),
micro_cache_model:delete(Id)
end, CacheItems),
{noreply, State#state{status = ?STATE_ACTIVATED}};
1 ->
%% 主机在后台的授权未通过此时agent不能推送数据给云端服务器但是云端服务器可以推送命令给agent
%% socket的连接状态需要维持
lager:debug("[efka_agent] auth denied, message: ~p", [Message]),
{noreply, State#state{status = ?STATE_RESTRICTED}};
2 ->
% 其他类型的错误,需要间隔时间重试
efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
_ ->
% 其他类型的错误,需要间隔时间重试
lager:debug("[efka_agent] auth failed, invalid message"),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}
end;
handle_info({auth_reply, {error, Reason}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) ->
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
%% 云端服务器推送了消息
%% 激活消息
%% 微服务部署
handle_info({server_push_message, PacketId, <<?METHOD_DEPLOY:8, DeployBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
#deploy{task_id = TaskId, from = From, service_id = ServiceId} = message_pb:decode_msg(DeployBin, deploy),
Reply = case efka_inetd:deploy(TaskId, ServiceId, From) of
ok ->
#efka_response{code = 1, message = <<"">>};
{error, Reason} when is_binary(Reason) ->
#efka_response{code = 1, message = Reason}
end,
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%% 参数项目
handle_info({server_push_message, PacketId, <<?METHOD_PRAMAS:8, ParamsBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
#service_params{service_id = ServiceId, params = Params} = message_pb:decode_msg(ParamsBin, service_params),
case efka_micro_service:get_pid(ServiceId) of
undefined ->
Reply = #efka_response {
code = 0,
message = <<"service not run">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_params(ServicePid, Params) of
ok ->
Reply = #efka_response{
code = 1,
message = <<"">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
{error, Reason} ->
Reply = #efka_response{
code = 0,
message = Reason
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply))
end
end,
{noreply, State};
%% 采集项目
handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
#service_metrics{service_id = ServiceId, metrics = Metrics} = message_pb:decode_msg(MetricsBin, service_metrics),
case efka_micro_service:get_pid(ServiceId) of
undefined ->
Reply = #efka_response{
code = 0,
message = <<"service not run">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_metrics(ServicePid, Metrics) of
ok ->
Reply = #efka_response {
code = 1,
message = <<"">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
{error, Reason} ->
Reply = #efka_response{
code = 0,
message = Reason
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply))
end
end,
{noreply, State};
%% TODO
handle_info({server_push_message, <<8:8, ActivatePush/binary>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
#activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push),
case {Auth, Status} of
{true, ?STATE_ACTIVATED} ->
{noreply, State};
{true, ?STATE_DENIED} ->
%% 重新激活, 需要重新校验
efka_transport:auth_request(TransportPid, 5000),
{noreply, State#state{status = ?STATE_AUTH}};
{false, _} ->
%% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息
{noreply, State#state{status = ?STATE_RESTRICTED}}
end;
%% 收到需要回复的指令
handle_info({server_push_message, PacketId, <<16:8, Directive/binary>>}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) ->
#topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, topic_message),
efka_logger:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]),
%% 消息发送到订阅系统
case PacketId > 0 of
true ->
CallbackFun = fun(Response) -> is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Response) end,
efka_subscription:publish(PacketId, Topic, Content, CallbackFun);
false ->
efka_subscription:publish(Topic, Content)
end,
{noreply, State};
%% transport进程退出
handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) ->
efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
erlang:start_timer(500000, self(), create_transport),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================