From 2166510989196900f56f80f9f0189ea975c71e19 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 22 Apr 2025 17:55:26 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 16 ++++++------- apps/efka/src/efka_transport.erl | 41 +++++++++++++++++++------------- 2 files changed, 32 insertions(+), 25 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 69b263a..23f76c7 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -83,25 +83,25 @@ handle_cast(_Request, State = #state{}) -> handle_info({timeout, _, create_transport}, State = #state{}) -> case efka_transport:start_link(self()) of {ok, TransportPid} -> - case efka_transport:auth_request(TransportPid, 5000) of + Ref = efka_transport:auth_request(TransportPid, 5000), + receive %% 验证通过 - {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} -> + {auth_reply, Ref, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}} -> lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]), {noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}}; %% 主机denied状态 - {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}} -> + {auth_reply, Ref, {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}}} -> lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), {noreply, State}; %% 验证不通过 - {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}} -> + {auth_reply, Ref, {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}}} -> lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), {noreply, State}; - {error, Reason} -> + {auth_reply, Ref, {error, Reason}} -> lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), - efka_transport:stop(TransportPid), {noreply, State} end; {error, Reason} -> @@ -109,8 +109,8 @@ handle_info({timeout, _, create_transport}, State = #state{}) -> {noreply, State} end; -handle_info({'EXIT', _Pid, Reason}, State = #state{}) -> - lager:debug("[efka_agent] transport exit with reason: ~p", [Reason]), +handle_info({'EXIT', Pid, Reason}, State = #state{}) -> + lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [Pid, Reason]), erlang:start_timer(5000, self(), create_transport), {noreply, State#state{transport_pid = undefined}}; diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 260bb5d..dc48df4 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -34,9 +34,11 @@ %%% API %%%=================================================================== --spec auth_request(Pid :: pid(), Timeout :: integer()) -> {ok, AuthReply :: #auth_reply{}} | {error, Reason :: any()}. +-spec auth_request(Pid :: pid(), Timeout :: integer()) -> Ref :: reference(). auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) -> - gen_server:call(Pid, {auth_request, Timeout}). + Ref = make_ref(), + gen_server:cast(Pid, {auth_request, self(), Ref, Timeout}), + Ref. send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> gen_server:cast(Pid, {send, Method, Packet}). @@ -90,7 +92,16 @@ init([ParentPid]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId}) -> +handle_call(_Req, _From, State = #state{}) -> + {reply, ok, State#state{}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = Socket, packet_id = PacketId}) -> {ok, AuthInfo} = application:get_env(efka, auth), UUID = proplists:get_value(uuid, AuthInfo), Username = proplists:get_value(username, AuthInfo), @@ -109,18 +120,14 @@ handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, pack %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> - {reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State#state{packet_id = PacketId + 1}} - after - Timeout -> - {reply, {error, timeout}, State} - end. + lager:debug("call me here auth reply"), + ReceiverPid ! {auth_reply, Ref, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}}, + {noreply, State#state{packet_id = PacketId + 1}} + after Timeout -> + ReceiverPid ! {auth_reply, Ref, {error, timeout}}, + {noreply, State} + end; -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), {noreply, State}; @@ -147,11 +154,11 @@ handle_info({ssl, Socket, <>}, State = ParentPid ! {efka_message, PacketId, Msg}, {noreply, State#state{}}; -handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> - {stop, Reason, State}; +handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) -> + {stop, normal, State}; handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> - {stop, ssl_closed, State}; + {stop, normal, State}; handle_info(Info, State = #state{}) -> lager:debug("[efka_transport] info: ~p", [Info]),