ekfa/apps/efka/src/efka_agent.erl
2025-05-20 16:09:00 +08:00

425 lines
18 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 需要支持 云服务离线时候的数据暂存
%%% @end
%%% Created : 06. 5月 2025 00:01
%%%-------------------------------------------------------------------
-module(efka_agent).
-author("anlicheng").
-include("message_pb.hrl").
-include("efka_tables.hrl").
-include("efka.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([metric_data/3, event/3, ping/13]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% 标记当前agent的状态只有在 activated 状态下才可以正常的发送数据
-define(STATE_DENIED, denied).
-define(STATE_CONNECTING, connecting).
-define(STATE_AUTH, auth).
%% 不能推送消息到服务,但是可以接受服务器的部分指令
-define(STATE_RESTRICTED, restricted).
%% 激活状态下
-define(STATE_ACTIVATED, activated).
-record(state, {
transport_pid :: undefined | pid(),
status = ?STATE_DENIED,
%% 映射关系 #{Ref => PacketId}
inflight = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
%% 发送数据
-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), LineProtocolData :: binary()) -> no_return().
metric_data(ServiceId, DeviceUUID, LineProtocolData) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(LineProtocolData) ->
gen_server:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, LineProtocolData}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_server:cast(?SERVER, {event, ServiceId, EventType, Params}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
erlang:process_flag(trap_exit, true),
erlang:start_timer(0, self(), create_transport),
{ok, #state{status = ?STATE_DENIED}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 发送数据
handle_cast({metric_data, ServiceId, DeviceUUID, LineProtocolData}, State) ->
Packet = message_pb:encode_msg(#data{
service_id = ServiceId,
device_uuid = DeviceUUID,
metric = LineProtocolData
}),
safe_send(?METHOD_DATA, Packet, State),
{noreply, State};
%% Event事件
handle_cast({event, ServiceId, EventType, Params}, State) ->
EventPacket = message_pb:encode_msg(#event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
safe_send(?METHOD_EVENT, EventPacket, State),
{noreply, State};
%% 处理ping消息
handle_cast({ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces},
State = #state{status = Status, transport_pid = TransportPid}) ->
Ping = message_pb:encode_msg(#ping{
adcode = AdCode,
boot_time = BootTime,
province = Province,
city = City,
efka_version = EfkaVersion,
kernel_arch = KernelArch,
ips = Ips,
cpu_core = CpuCore,
cpu_load = CpuLoad,
cpu_temperature = CpuTemperature,
disk = Disk,
memory = Memory,
interfaces = Interfaces
}),
case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of
true ->
efka_transport:send(TransportPid, ?METHOD_PING, Ping);
false ->
ok
end,
{noreply, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_transport}, State = #state{status = ?STATE_DENIED}) ->
{ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
{ok, TransportPid} = efka_transport:start_link(self(), Host, Port),
efka_transport:connect(TransportPid),
{noreply, State#state{status = ?STATE_CONNECTING, transport_pid = TransportPid}};
%% 收到连接回复
handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, transport_pid = TransportPid}) when is_pid(TransportPid) ->
case Reply of
ok ->
AuthBin = auth_request(),
efka_transport:auth_request(TransportPid, AuthBin),
{noreply, State#state{status = ?STATE_AUTH}};
{error, Reason} ->
lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
efka_transport:stop(TransportPid),
{noreply, State#state{status = ?STATE_DENIED}}
end;
%% 收到auth回复
handle_info({auth_reply, Reply}, State = #state{status = ?STATE_AUTH, transport_pid = TransportPid}) when is_pid(TransportPid) ->
case Reply of
{ok, ReplyBin} ->
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
case Code of
0 ->
lager:debug("[efka_agent] auth success, message: ~p", [Message]),
%% 上传缓冲区里面的所有数据
CacheItems = cache_model:get_all_cache(),
lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) ->
efka_transport:send(TransportPid, Method, Packet),
cache_model:delete(Id)
end, CacheItems),
{noreply, State#state{status = ?STATE_ACTIVATED}};
1 ->
%% 主机在后台的授权未通过此时agent不能推送数据给云端服务器但是云端服务器可以推送命令给agent
%% socket的连接状态需要维持
lager:debug("[efka_agent] auth denied, message: ~p", [Message]),
{noreply, State#state{status = ?STATE_RESTRICTED}};
2 ->
% 其他类型的错误,需要间隔时间重试
lager:debug("[efka_agent] auth failed, message: ~p", [Message]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
_ ->
% 其他类型的错误,需要间隔时间重试
lager:debug("[efka_agent] auth failed, invalid message"),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}
end;
{error, Reason} ->
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}
end;
%% 云端服务器推送了消息
%% 激活消息
%% 微服务部署
handle_info({server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy),
%% 短暂的等待efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of
ok ->
#async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#async_call_reply{code = 0, message = Reason}
end,
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%% 启动微服务
handle_info({server_push, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, State = #state{transport_pid = TransportPid}) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:start_service(ServiceId) of
ok ->
#async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#async_call_reply{code = 0, message = Reason}
end,
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%% 停止微服务
handle_info({server_push, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, State = #state{transport_pid = TransportPid}) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:stop_service(ServiceId) of
ok ->
#async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#async_call_reply{code = 0, message = Reason}
end,
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%% config.json配置信息
handle_info({server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config),
case efka_service:get_pid(ServiceId) of
undefined ->
Reply = #async_call_reply{code = 0, message = <<"service not run">>},
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),
%% 将配置文件推送到对应的微服务
efka_service:push_config(ServicePid, Ref, ConfigJson),
%% 处理超时逻辑
erlang:start_timer(Timeout, self(), {request_timeout, Ref}),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end;
%% 收到需要回复的指令
handle_info({server_push, PacketId, <<?PUSH_INVOKE:8, InvokeBin/binary>>}, State = #state{status = ?STATE_ACTIVATED, inflight = Inflight}) ->
#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout} = message_pb:decode_msg(InvokeBin, invoke),
%% 消息发送到订阅系统
case efka_service:get_pid(ServiceId) of
undefined ->
Reply = #async_call_reply{code = 0, message = <<"micro_service not run">>, result = <<>>},
safe_async_call_reply(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State};
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),
efka_service:invoke(ServicePid, Ref, Payload),
%% 处理超时逻辑
erlang:start_timer(Timeout, self(), {request_timeout, Ref}),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end;
%% 处理task_log
handle_info({server_push, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, State = #state{status = ?STATE_ACTIVATED}) ->
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
lager:debug("[efka_agent] get task_log request: ~p", [TaskId]),
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
Reply = case length(Logs) > 0 of
true ->
Result = iolist_to_binary(jiffy:encode(Logs, [force_utf8])),
#async_call_reply{code = 1, result = Result};
false ->
#async_call_reply{code = 1, result = <<"[]">>}
end,
safe_async_call_reply(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State};
%% 处理命令
handle_info({server_command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
case {Auth, Status} of
{1, ?STATE_ACTIVATED} ->
{noreply, State};
{1, ?STATE_DENIED} ->
%% 重新激活, 需要重新校验
AuthRequestBin = auth_request(),
efka_transport:auth_request(TransportPid, AuthRequestBin),
{noreply, State#state{status = ?STATE_AUTH}};
{0, _} ->
%% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息
{noreply, State#state{status = ?STATE_RESTRICTED}}
end;
%% 收到需要回复的指令
handle_info({server_pub, Topic, Content}, State = #state{status = ?STATE_ACTIVATED}) ->
lager:debug("[efka_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
%% 消息发送到订阅系统
efka_subscription:publish(Topic, Content),
{noreply, State};
%% 收到来自efka_service的回复
handle_info({service_reply, Ref, EmsReply}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{PacketId, NInflight} ->
Reply = case EmsReply of
{ok, Result} ->
#async_call_reply{code = 1, result = Result};
{error, Reason} ->
#async_call_reply{code = 0, message = Reason}
end,
safe_async_call_reply(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State#state{inflight = NInflight}}
end;
%% 请求超时逻辑处理
handle_info({timeout, _, {request_timeout, Ref}}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{PacketId, NInflight} ->
Reply = #async_call_reply{code = 0, message = <<"reqeust timeout">>, result = <<>>},
safe_async_call_reply(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State#state{inflight = NInflight}}
end;
%% transport进程退出
handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) ->
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
erlang:start_timer(5000, self(), create_transport),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 安全回复
-spec safe_async_call_reply(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return().
safe_async_call_reply(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) ->
is_process_alive(TransportPid) andalso efka_transport:async_call_reply(TransportPid, PacketId, Reply);
safe_async_call_reply(_PacketId, _Reply, #state{}) ->
ok.
%% 当连接正常的时候发送,否则暂存数据
-spec safe_send(Method :: integer(), Packet :: binary(), State :: #state{}) -> no_return().
safe_send(Method, Packet, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_pid(TransportPid) ->
efka_transport:send(TransportPid, Method, Packet);
safe_send(Method, Packet, #state{}) ->
ok = cache_model:insert(Method, Packet).
-spec auth_request() -> binary().
auth_request() ->
{ok, AuthInfo} = application:get_env(efka, auth),
UUID = proplists:get_value(uuid, AuthInfo),
Username = proplists:get_value(username, AuthInfo),
Salt = proplists:get_value(salt, AuthInfo),
Token = proplists:get_value(token, AuthInfo),
message_pb:encode_msg(#auth_request{
uuid = unicode:characters_to_binary(UUID),
username = unicode:characters_to_binary(Username),
salt = unicode:characters_to_binary(Salt),
token = unicode:characters_to_binary(Token),
timestamp = efka_util:timestamp()
}).