ekfa/apps/efka/src/efka_remote_agent.erl
2025-09-24 17:19:35 +08:00

386 lines
17 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 21. 5月 2025 18:38
%%%-------------------------------------------------------------------
-module(efka_remote_agent).
-author("anlicheng").
-include("message.hrl").
-include("efka_tables.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/0]).
-export([metric_data/4, event/3, ping/13, task_event_stream/2]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
-define(SERVER, ?MODULE).
%% 标记当前agent的状态只有在 activated 状态下才可以正常的发送数据
-define(STATE_DENIED, denied).
-define(STATE_CONNECTING, connecting).
-define(STATE_AUTH, auth).
%% 不能推送消息到服务,但是可以接受服务器的部分指令
-define(STATE_RESTRICTED, restricted).
%% 激活状态下
-define(STATE_ACTIVATED, activated).
-record(state, {
transport_pid :: undefined | pid(),
transport_ref :: undefined | reference()
}).
%%%===================================================================
%%% API
%%%===================================================================
%% 发送数据
-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}).
-spec task_event_stream(TaskId :: integer(), Stream :: binary()) -> no_return().
task_event_stream(TaskId, Stream) when is_integer(TaskId), is_binary(Stream) ->
gen_statem:cast(?SERVER, {task_event_stream, TaskId, Stream}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link() ->
gen_statem:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([]) ->
erlang:start_timer(0, self(), create_transport),
{ok, ?STATE_DENIED, #state{}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
%% 异步发送数据, 连接存在时候直接发送否则缓存到mnesia
handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Packet = message_codec:encode(?MESSAGE_DATA, #data{
service_id = ServiceId,
device_uuid = DeviceUUID,
route_key = RouteKey,
metric = Metric
}),
efka_transport:send(TransportPid, Packet),
{keep_state, State};
handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, _, State) ->
Packet = message_codec:encode(?MESSAGE_DATA, #data{
service_id = ServiceId,
device_uuid = DeviceUUID,
route_key = RouteKey,
metric = Metric
}),
ok = cache_model:insert(Packet),
{keep_state, State};
%% 异步发送事件
handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
efka_transport:send(TransportPid, EventPacket),
{keep_state, State};
handle_event(cast, {event, ServiceId, EventType, Params}, _, State) ->
EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
ok = cache_model:insert(EventPacket),
{keep_state, State};
handle_event(cast, {task_event_stream, TaskId, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]),
EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{
task_id = TaskId,
stream = Stream
}),
efka_transport:send(TransportPid, EventPacket),
{keep_state, State};
%% 其他情况下直接忽略
handle_event(cast, {task_event_stream, _TaskId, _Stream}, _, State = #state{}) ->
{keep_state, State};
%handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}, ?STATE_ACTIVATED,
% State = #state{transport_pid = TransportPid}) ->
%
% Ping = message_pb:encode_msg(#ping{
% adcode = AdCode,
% boot_time = BootTime,
% province = Province,
% city = City,
% efka_version = EfkaVersion,
% kernel_arch = KernelArch,
% ips = Ips,
% cpu_core = CpuCore,
% cpu_load = CpuLoad,
% cpu_temperature = CpuTemperature,
% disk = Disk,
% memory = Memory,
% interfaces = Interfaces
% }),
% efka_transport:send(TransportPid, ?METHOD_PING, Ping),
% {keep_state, State};
%% 异步建立到服务器的连接
handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) ->
{ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
{ok, {TransportPid, TransportRef}} = efka_transport:start_monitor(self(), Host, Port),
efka_transport:connect(TransportPid),
{next_state, ?STATE_CONNECTING, State#state{transport_pid = TransportPid, transport_ref = TransportRef}};
handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{transport_pid = TransportPid}) ->
case Reply of
ok ->
AuthBin = auth_request(),
efka_transport:auth_request(TransportPid, AuthBin),
{next_state, ?STATE_AUTH, State};
{error, Reason} ->
lager:debug("[efka_remote_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
efka_transport:stop(TransportPid),
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
end;
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
case Reply of
{ok, #rpc_reply{code = Code, payload = Message}} ->
case Code of
0 ->
lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]),
{next_state, ?STATE_ACTIVATED, State, [{next_event, info, flush_cache}]};
1 ->
%% 主机在后台的授权未通过此时agent不能推送数据给云端服务器但是云端服务器可以推送命令给agent
%% socket的连接状态需要维持
lager:debug("[efka_remote_agent] auth denied, message: ~p", [Message]),
{next_state, ?STATE_RESTRICTED, State};
2 ->
% 其他类型的错误,需要间隔时间重试
lager:debug("[efka_remote_agent] auth failed, message: ~p", [Message]),
efka_transport:stop(TransportPid),
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}};
_ ->
% 其他类型的错误,需要间隔时间重试
lager:debug("[efka_remote_agent] auth failed, invalid message"),
efka_transport:stop(TransportPid),
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
end;
{error, Reason} ->
lager:debug("[efka_remote_agent] auth_request failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
end;
%% 将缓存中的数据推送到服务器端
handle_event(info, flush_cache, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case cache_model:fetch_next() of
{ok, #cache{id = Id, data = Packet}} ->
efka_transport:send(TransportPid, Packet),
cache_model:delete(Id),
{keep_state, State, [{next_event, info, flush_cache}]};
error ->
{keep_state, State}
end;
handle_event(info, flush_cache, _, State) ->
{keep_state, State};
%% 云端服务器推送了消息
%% 激活消息
%% 微服务部署
handle_event(info, {server_rpc, PacketId, #rpc_deploy{task_id = TaskId, config = Config0}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case catch jiffy:decode(Config0, [return_maps]) of
Config when is_map(Config) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
case docker_manager:deploy(TaskId, Config) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
end;
_Error ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(<<"invalid config json">>))
end,
{keep_state, State};
%% 启动微服务
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"start">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
case docker_manager:start_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
end,
{keep_state, State};
%% 停止微服务
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"stop">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
case docker_manager:stop_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
end,
{keep_state, State};
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"kill">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
case docker_manager:kill_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
end,
{keep_state, State};
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"remove">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% 短暂的等待efka_inetd收到消息后就立即返回了
case docker_manager:remove_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
end,
{keep_state, State};
%% config.json配置信息
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>, container_name = ContainerName, params = Config}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case docker_manager:config_container(ContainerName, Config) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
end,
{keep_state, State};
%% 处理task_log
%handle_event(info, {server_async_call, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
% #fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
% lager:debug("[efka_remote_agent] get task_log request: ~p", [TaskId]),
% {ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
% Reply = case length(Logs) > 0 of
% true ->
% Result = iolist_to_binary(jiffy:encode(Logs, [force_utf8])),
% #async_call_reply{code = 1, result = Result};
% false ->
% #async_call_reply{code = 1, result = <<"[]">>}
% end,
% efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
%
% {keep_state, State};
%% 处理命令
handle_event(info, {server_cast, #command{command_type = ?COMMAND_AUTH, command = Auth0}}, StateName, State = #state{transport_pid = TransportPid}) ->
Auth = binary_to_integer(Auth0),
case {Auth, StateName} of
{1, ?STATE_ACTIVATED} ->
{keep_state, State};
{1, ?STATE_DENIED} ->
%% 重新激活, 需要重新校验
AuthRequestBin = auth_request(),
efka_transport:auth_request(TransportPid, AuthRequestBin),
{next_state, ?STATE_AUTH, State};
{0, _} ->
%% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息
{next_state, ?STATE_RESTRICTED, State}
end;
%% 处理Pub/Sub机制
handle_event(info, {server_cast, #pub{topic = Topic, content = Content}}, ?STATE_ACTIVATED, State) ->
lager:debug("[efka_remote_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
%% 消息发送到订阅系统
efka_subscription:publish(Topic, Content),
{keep_state, State};
%% transport进程退出
handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) ->
lager:debug("[efka_remote_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
erlang:start_timer(5000, self(), create_transport),
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined, transport_ref = undefined}}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(_Reason, _StateName, _State = #state{transport_pid = TransportPid}) ->
case is_pid(TransportPid) andalso is_process_alive(TransportPid) of
true ->
efka_transport:stop(TransportPid);
false ->
ok
end,
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec auth_request() -> binary().
auth_request() ->
{ok, AuthInfo} = application:get_env(efka, auth),
UUID = proplists:get_value(uuid, AuthInfo),
Username = proplists:get_value(username, AuthInfo),
Salt = proplists:get_value(salt, AuthInfo),
Token = proplists:get_value(token, AuthInfo),
message_codec:encode(?MESSAGE_AUTH_REQUEST, #auth_request{
uuid = unicode:characters_to_binary(UUID),
username = unicode:characters_to_binary(Username),
salt = unicode:characters_to_binary(Salt),
token = unicode:characters_to_binary(Token),
timestamp = efka_util:timestamp()
}).
-spec reply_success(Result :: binary()) -> binary().
reply_success(Result) when is_binary(Result) ->
message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Result}).
-spec reply_error(Message :: binary()) -> binary().
reply_error(Message) when is_binary(Message) ->
message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = Message}).