From 556b1d797c87140caf0c1db43f3cc5b49cf66658 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 17 Sep 2025 15:23:49 +0800 Subject: [PATCH] fix agent --- apps/efka/src/efka_remote_agent.erl | 30 +++-------------------------- 1 file changed, 3 insertions(+), 27 deletions(-) diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index e4e526a..98f1e2b 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -16,7 +16,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13, await_reply/2]). +-export([metric_data/4, event/3, ping/13]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -34,11 +34,7 @@ -record(state, { transport_pid :: undefined | pid(), - transport_ref :: undefined | reference(), - %% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId} - push_inflight = #{}, - %% 发送的请求的未确认列表, 映射关系 #{Ref => ReceiverPid} - request_inflight = #{} + transport_ref :: undefined | reference() }). %%%=================================================================== @@ -57,16 +53,6 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). -%% 等待消息的回复 --spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}. -await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> - receive - {request_reply, Ref, ReplyBin} -> - {ok, ReplyBin} - after Timeout -> - {error, timeout} - end. - %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. @@ -264,7 +250,7 @@ handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> +handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> #container_config{container_name = ContainerName, config = Config} = message_pb:decode_msg(ConfigBin, push_service_config), case docker_manager:config_container(ContainerName, Config) of @@ -313,16 +299,6 @@ handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) -> efka_subscription:publish(Topic, Content), {keep_state, State}; -%% 收到来自服务器端的回复 -handle_event(info, {server_reply, Ref, ReplyBin}, ?STATE_ACTIVATED, State = #state{request_inflight = RequestInflight}) -> - case maps:take(Ref, RequestInflight) of - error -> - {keep_state, State}; - {ReceiverPid, NRequestInflight} -> - is_process_alive(ReceiverPid) andalso erlang:send(ReceiverPid, {request_reply, Ref, ReplyBin}), - {keep_state, State#state{push_inflight = NRequestInflight}} - end; - %% transport进程退出 handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) -> lager:debug("[efka_remote_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),