From 9d67f095774361fe7c6ab0863d40763b6a2b01f7 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 28 Apr 2025 22:50:01 +0800 Subject: [PATCH] fix transport --- apps/efka/src/efka_agent.erl | 99 ++++++++++++++++++++++++-------- apps/efka/src/efka_transport.erl | 66 +++++++++++---------- 2 files changed, 109 insertions(+), 56 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 0ef4df8..e23b771 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -21,7 +21,10 @@ -define(SERVER, ?MODULE). +%% 标记当前agent的状态,只有在 activated 状态下才可以正常的发送数据 -define(STATE_DENIED, denied). +-define(STATE_CONNECTING, connecting). +-define(STATE_AUTH, auth). -define(STATE_ACTIVATED, activated). -record(state, { @@ -81,38 +84,84 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). handle_info({timeout, _, create_transport}, State = #state{}) -> - case efka_transport:start_link(self()) of - {ok, TransportPid} -> - Ref = efka_transport:auth_request(TransportPid, 5000), - receive - %% 验证通过 - {auth_reply, Ref, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}} -> - lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}}; + {ok, Props} = application:get_env(efka, tls_server), + Host = proplists:get_value(host, Props), + Port = proplists:get_value(port, Props), - %% 主机denied状态 - {auth_reply, Ref, {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}}} -> - lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State}; + {ok, TransportPid} = efka_transport:start_link(self(), Host, Port), + efka_transport:connect(TransportPid), - %% 验证不通过 - {auth_reply, Ref, {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}}} -> - lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State}; + {noreply, State#state{transport_pid = TransportPid, status = ?STATE_CONNECTING}}; - {auth_reply, Ref, {error, Reason}} -> - lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), - {noreply, State} - end; +%% 连接的消息确认 +handle_info({connect_reply, Reply}, State = #state{transport_pid = TransportPid}) -> + case Reply of + ok -> + efka_transport:auth_request(TransportPid, 5000), + {noreply, State#state{status = ?STATE_AUTH}}; {error, Reason} -> - lager:warning("[efka_agent] connect get error: ~p", [Reason]), - {noreply, State} + lager:debug("[efka_agent] connect failed, error: ~p", [Reason]), + efka_transport:stop(TransportPid), + {noreply, State#state{status = ?STATE_DENIED}} end; -handle_info({'EXIT', Pid, Reason}, State = #state{}) -> - lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [Pid, Reason]), +%% 验证的消息确认 +handle_info({auth_reply, Reply}, State = #state{transport_pid = TransportPid}) -> + case Reply of + {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} -> + lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), + {noreply, State#state{status = ?STATE_ACTIVATED}}; + %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent + %% socket的连接状态需要维持 + {ok, #auth_reply{code = -1, message = Message}} -> + lager:debug("[efka_agent] auth denied, message: ~p", [Message]), + {noreply, State#state{status = ?STATE_DENIED}}; + + %% 其他类型的错误,需要间隔时间重试 + {ok, #auth_reply{code = -2, message = Message}} -> + lager:debug("[efka_agent] auth failed, message: ~p", [Message]), + efka_transport:stop(TransportPid), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + + {error, Reason} -> + lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), + efka_transport:stop(TransportPid), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}} + end; + +%% 云端服务器推送了消息 + +%% 激活消息 +handle_info({server_push_message, <<8:8, ActivatePush>>}, State = #state{transport_pid = TransportPid, status = Status}) -> + #activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push), + case {Auth, Status} of + {true, ?STATE_ACTIVATED} -> + {noreply, State}; + {true, ?STATE_DENIED} -> + efka_transport:auth_request(TransportPid, 5000), + {noreply, State}; + {false, _} -> + {noreply, State#state{status = ?STATE_DENIED}} + end; + +%% 收到指令 +handle_info({server_push_message, <<16:8, Directive>>}, State = #state{transport_pid = TransportPid, status = Status}) -> + #activate_push{auth = Auth} = message_pb:decode_msg(Directive, activate_push), + case {Auth, Status} of + {true, ?STATE_ACTIVATED} -> + {noreply, State}; + {true, ?STATE_DENIED} -> + efka_transport:auth_request(TransportPid, 5000), + {noreply, State}; + {false, _} -> + {noreply, State#state{status = ?STATE_DENIED}} + end; + +%% 不管transport因为什么原因退出,间隔一定时间后重试 +handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) -> + lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), erlang:start_timer(5000, self(), create_transport), - {noreply, State#state{transport_pid = undefined}}; + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; handle_info(Info, State = #state{}) -> lager:debug("[efka_agent] get unknown info: ~p", [Info]), diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 2456dc5..cdd7137 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -14,8 +14,8 @@ -behaviour(gen_server). %% API --export([start_link/1]). --export([auth_request/2, send/3, response/3, stop/1]). +-export([start_link/3]). +-export([connect/1, auth_request/2, send/3, response/3, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -34,11 +34,12 @@ %%% API %%%=================================================================== --spec auth_request(Pid :: pid(), Timeout :: integer()) -> Ref :: reference(). +-spec auth_request(Pid :: pid(), Timeout :: integer()) -> no_return(). auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) -> - Ref = make_ref(), - gen_server:cast(Pid, {auth_request, self(), Ref, Timeout}), - Ref. + gen_server:cast(Pid, {auth_request, Timeout}). + +connect(Pid) when is_pid(Pid) -> + gen_server:cast(Pid, connect). send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> gen_server:cast(Pid, {send, Method, Packet}). @@ -50,10 +51,10 @@ stop(Pid) when is_pid(Pid) -> gen_server:stop(Pid). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(ParentPid :: pid()) -> +-spec(start_link(ParentPid :: pid(), Host :: string(), Port :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(ParentPid) when is_pid(ParentPid) -> - gen_server:start_link(?MODULE, [ParentPid], []). +start_link(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_integer(Port) -> + gen_server:start_link(?MODULE, [ParentPid, Host, Port], []). %%%=================================================================== %%% gen_server callbacks @@ -64,23 +65,8 @@ start_link(ParentPid) when is_pid(ParentPid) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid]) -> - {ok, Props} = application:get_env(efka, tls_server), - Host = proplists:get_value(host, Props), - Port = proplists:get_value(port, Props), - - SslOptions = [ - binary, - {packet, 4}, - {verify, verify_none} - ], - case ssl:connect(Host, Port, SslOptions, 5000) of - {ok, Socket} -> - ssl:controlling_process(Socket, self()), - {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}}; - {error, Reason} -> - {stop, Reason} - end. +init([ParentPid, Host, Port]) -> + {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}. %% @private %% @doc Handling call messages @@ -101,7 +87,25 @@ handle_call(_Req, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = Socket, packet_id = PacketId}) -> +%% 建立到目标服务器的连接 +handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = ParentPid}) -> + SslOptions = [ + binary, + {packet, 4}, + {verify, verify_none} + ], + case ssl:connect(Host, Port, SslOptions, 5000) of + {ok, Socket} -> + ssl:controlling_process(Socket, self()), + ParentPid ! {connect_reply, ok}, + {noreply, State#state{socket = Socket}}; + {error, Reason} -> + ParentPid ! {connect_reply, {error, Reason}}, + {noreply, Reason} + end; + +%% auth校验 +handle_cast({auth_request, Timeout}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) -> {ok, AuthInfo} = application:get_env(efka, auth), UUID = proplists:get_value(uuid, AuthInfo), Username = proplists:get_value(username, AuthInfo), @@ -120,10 +124,10 @@ handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = S %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> - ReceiverPid ! {auth_reply, Ref, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}}, + ParentPid ! {auth_reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}}, {noreply, State#state{packet_id = PacketId + 1}} after Timeout -> - ReceiverPid ! {auth_reply, Ref, {error, timeout}}, + ParentPid ! {auth_reply, {error, timeout}}, {stop, normal, State} end; @@ -144,13 +148,13 @@ handle_cast({response, PacketId, Response}, State = #state{socket = Socket}) -> %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> lager:debug("[efka_agent] socket get message: ~p", [Msg]), - ParentPid ! {efka_message, Msg}, + ParentPid ! {server_push_message, Msg}, {noreply, State}; %% 目前推送的消息包括: <>, <<16:8, Directive/binary>> handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> lager:debug("[efka_agent] socket get message: ~p", [Msg]), - ParentPid ! {efka_message, PacketId, Msg}, + ParentPid ! {server_push_message, PacketId, Msg}, {noreply, State#state{}}; handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) ->