From 6259eb20e72edd1319b4b2b966e718a78df52c55 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 7 May 2025 14:24:12 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 60 ++++++++++++-------------------- apps/efka/src/efka_transport.erl | 8 +++-- 2 files changed, 28 insertions(+), 40 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 422611f..9c25868 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -109,55 +109,39 @@ handle_call(_Request, _From, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 发送数据 -handle_cast({metric_data, ServiceId, LineProtocolData}, State = #state{status = Status, transport_pid = TransportPid}) -> +handle_cast({metric_data, ServiceId, LineProtocolData}, State) -> Packet = message_pb:encode_msg(#data{ service_id = ServiceId, metric = LineProtocolData }), + safe_send(?METHOD_DATA, Packet, State), - case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of - true -> - efka_transport:send(TransportPid, ?METHOD_DATA, Packet); - false -> - ok = micro_cache_model:insert(?METHOD_DATA, Packet) - end, {noreply, State}; %% Event事件 -handle_cast({event, ServiceId, EventType, Params}, State = #state{status = Status, transport_pid = TransportPid}) -> +handle_cast({event, ServiceId, EventType, Params}, State) -> EventPacket = message_pb:encode_msg(#event{ service_id = ServiceId, event_type = EventType, params = Params }), - case Status =:= ?STATE_ACTIVATED of - true -> - efka_transport:send(TransportPid, ?METHOD_EVENT, EventPacket); - false -> - ok = micro_cache_model:insert(?METHOD_EVENT, EventPacket) - end, + safe_send(?METHOD_EVENT, EventPacket, State), {noreply, State}; %% AiEvent事件 -handle_cast({ai_event, ServiceId, EventType, Params}, State = #state{status = Status, transport_pid = TransportPid}) -> +handle_cast({ai_event, ServiceId, EventType, Params}, State) -> EventPacket = message_pb:encode_msg(#ai_event{ service_id = ServiceId, event_type = EventType, params = Params }), - - case Status =:= ?STATE_ACTIVATED of - true -> - efka_transport:send(TransportPid, ?METHOD_AI_EVENT, EventPacket); - false -> - ok = micro_cache_model:insert(?METHOD_AI_EVENT, EventPacket) - end, + safe_send(?METHOD_AI_EVENT, EventPacket, State), {noreply, State}; -handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = #state{status = Status, transport_pid = TransportPid}) -> +handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State) -> PhasePacket = message_pb:encode_msg(#feedback_phase{ task_id = TaskId, timestamp = Timestamp, @@ -166,12 +150,7 @@ handle_cast({feedback_phase, TaskId, Timestamp, Phase, Code, Message}, State = # message = Message }), - case Status =:= ?STATE_ACTIVATED of - true -> - efka_transport:send(TransportPid, ?METHOD_PHASE, PhasePacket); - false -> - ok = micro_cache_model:insert(?METHOD_PHASE, PhasePacket) - end, + safe_send(?METHOD_PHASE, PhasePacket, State), {noreply, State}; @@ -194,7 +173,12 @@ handle_cast({ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ip memory = Memory, interfaces = Interfaces }), - Status =:= ?STATE_ACTIVATED andalso efka_transport:send(TransportPid, ?METHOD_PING, Ping), + case Status =:= ?STATE_ACTIVATED andalso is_pid(TransportPid) of + true -> + efka_transport:send(TransportPid, ?METHOD_PING, Ping); + false -> + ok + end, {noreply, State}; @@ -419,11 +403,13 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %% 安全回复 -spec safe_response(PacketId :: integer(), Reply :: binary(), State :: #state{}) -> no_return(). safe_response(PacketId, Reply, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_integer(PacketId), is_binary(Reply), is_pid(TransportPid) -> - case is_process_alive(TransportPid) of - true -> - efka_transport:response(TransportPid, PacketId, Reply); - false -> - ok - end; + is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Reply); safe_response(_PacketId, _Reply, #state{}) -> - ok. \ No newline at end of file + ok. + +%% 当连接正常的时候发送,否则暂存数据 +-spec safe_send(Method :: integer(), Packet :: binary(), State :: #state{}) -> no_return(). +safe_send(Method, Packet, #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) when is_pid(TransportPid) -> + efka_transport:send(TransportPid, Method, Packet); +safe_send(Method, Packet, #state{}) -> + ok = micro_cache_model:insert(Method, Packet). \ No newline at end of file diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 54750b1..f6aeb62 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -42,12 +42,15 @@ auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) -> connect(Pid) when is_pid(Pid) -> gen_server:cast(Pid, connect). +-spec send(Pid :: pid(), Method :: integer(), Packet :: binary()) -> no_return(). send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> gen_server:cast(Pid, {send, Method, Packet}). -response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId) -> +-spec response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return(). +response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) -> gen_server:cast(Pid, {response, PacketId, Response}). +-spec stop(Pid :: pid()) -> ok. stop(Pid) when is_pid(Pid) -> gen_server:stop(Pid, normal, 2000). @@ -71,8 +74,7 @@ init([ParentPid, Host, Port]) -> %% @private %% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} |