From 57288fd4646d0b8c71f5e76db24f18d3a3e9bbe6 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 22 Apr 2025 17:35:21 +0800 Subject: [PATCH] fix transport --- apps/efka/src/efka_agent.erl | 46 +++++++++++++++++--------------- apps/efka/src/efka_transport.erl | 14 ++++++---- 2 files changed, 33 insertions(+), 27 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 5560dab..69b263a 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -81,32 +81,37 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, create_transport}, State = #state{}) -> - {ok, TransportPid} = efka_transport:start_link(self()), - case efka_transport:auth_request(TransportPid, 5000) of - %% 验证通过 - {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} -> - lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}}; + case efka_transport:start_link(self()) of + {ok, TransportPid} -> + case efka_transport:auth_request(TransportPid, 5000) of + %% 验证通过 + {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} -> + lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]), + {noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}}; - %% 主机denied状态 - {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}} -> - lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State#state{transport_pid = TransportPid, status = ?STATE_DENIED}}; + %% 主机denied状态 + {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}} -> + lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), + {noreply, State}; - %% 验证不通过 - {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}} -> - lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + %% 验证不通过 + {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}} -> + lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), + {noreply, State}; + {error, Reason} -> + lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), + efka_transport:stop(TransportPid), + {noreply, State} + end; {error, Reason} -> - lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), - efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}} + lager:warning("[efka_agent] connect get error: ~p", [Reason]), + {noreply, State} end; handle_info({'EXIT', _Pid, Reason}, State = #state{}) -> lager:debug("[efka_agent] transport exit with reason: ~p", [Reason]), - retry_connect(), + erlang:start_timer(5000, self(), create_transport), {noreply, State#state{transport_pid = undefined}}; handle_info(Info, State = #state{}) -> @@ -133,7 +138,4 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions -%%%=================================================================== - -retry_connect() -> - erlang:start_timer(5000, self(), create_transport). \ No newline at end of file +%%%=================================================================== \ No newline at end of file diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 32027fa..260bb5d 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -70,12 +70,15 @@ init([ParentPid]) -> SslOptions = [ binary, {packet, 4}, - {active, true}, {verify, verify_none} ], - {ok, Socket} = ssl:connect(Host, Port, SslOptions, 5000), - - {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}}. + case ssl:connect(Host, Port, SslOptions, 5000) of + {ok, Socket} -> + ssl:controlling_process(Socket, self()), + {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}}; + {error, Reason} -> + {stop, Reason} + end. %% @private %% @doc Handling call messages @@ -150,7 +153,8 @@ handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> {stop, ssl_closed, State}; -handle_info(_Info, State = #state{}) -> +handle_info(Info, State = #state{}) -> + lager:debug("[efka_transport] info: ~p", [Info]), {noreply, State}. %% @private