fix transport

This commit is contained in:
anlicheng 2025-04-28 22:50:01 +08:00
parent 8f8e813a8c
commit 9d67f09577
2 changed files with 109 additions and 56 deletions

View File

@ -21,7 +21,10 @@
-define(SERVER, ?MODULE).
%% agent的状态 activated
-define(STATE_DENIED, denied).
-define(STATE_CONNECTING, connecting).
-define(STATE_AUTH, auth).
-define(STATE_ACTIVATED, activated).
-record(state, {
@ -81,38 +84,84 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, create_transport}, State = #state{}) ->
case efka_transport:start_link(self()) of
{ok, TransportPid} ->
Ref = efka_transport:auth_request(TransportPid, 5000),
receive
%%
{auth_reply, Ref, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}} ->
lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}};
{ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
%% denied状态
{auth_reply, Ref, {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}}} ->
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State};
{ok, TransportPid} = efka_transport:start_link(self(), Host, Port),
efka_transport:connect(TransportPid),
%%
{auth_reply, Ref, {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}}} ->
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State};
{noreply, State#state{transport_pid = TransportPid, status = ?STATE_CONNECTING}};
{auth_reply, Ref, {error, Reason}} ->
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
{noreply, State}
end;
%%
handle_info({connect_reply, Reply}, State = #state{transport_pid = TransportPid}) ->
case Reply of
ok ->
efka_transport:auth_request(TransportPid, 5000),
{noreply, State#state{status = ?STATE_AUTH}};
{error, Reason} ->
lager:warning("[efka_agent] connect get error: ~p", [Reason]),
{noreply, State}
lager:debug("[efka_agent] connect failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{noreply, State#state{status = ?STATE_DENIED}}
end;
handle_info({'EXIT', Pid, Reason}, State = #state{}) ->
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [Pid, Reason]),
%%
handle_info({auth_reply, Reply}, State = #state{transport_pid = TransportPid}) ->
case Reply of
{ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} ->
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State#state{status = ?STATE_ACTIVATED}};
%% agent不能推送数据给云端服务器agent
%% socket的连接状态需要维持
{ok, #auth_reply{code = -1, message = Message}} ->
lager:debug("[efka_agent] auth denied, message: ~p", [Message]),
{noreply, State#state{status = ?STATE_DENIED}};
%%
{ok, #auth_reply{code = -2, message = Message}} ->
lager:debug("[efka_agent] auth failed, message: ~p", [Message]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
{error, Reason} ->
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}
end;
%%
%%
handle_info({server_push_message, <<8:8, ActivatePush>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
#activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push),
case {Auth, Status} of
{true, ?STATE_ACTIVATED} ->
{noreply, State};
{true, ?STATE_DENIED} ->
efka_transport:auth_request(TransportPid, 5000),
{noreply, State};
{false, _} ->
{noreply, State#state{status = ?STATE_DENIED}}
end;
%%
handle_info({server_push_message, <<16:8, Directive>>}, State = #state{transport_pid = TransportPid, status = Status}) ->
#activate_push{auth = Auth} = message_pb:decode_msg(Directive, activate_push),
case {Auth, Status} of
{true, ?STATE_ACTIVATED} ->
{noreply, State};
{true, ?STATE_DENIED} ->
efka_transport:auth_request(TransportPid, 5000),
{noreply, State};
{false, _} ->
{noreply, State#state{status = ?STATE_DENIED}}
end;
%% transport因为什么原因退出
handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) ->
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
erlang:start_timer(5000, self(), create_transport),
{noreply, State#state{transport_pid = undefined}};
{noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}};
handle_info(Info, State = #state{}) ->
lager:debug("[efka_agent] get unknown info: ~p", [Info]),

View File

@ -14,8 +14,8 @@
-behaviour(gen_server).
%% API
-export([start_link/1]).
-export([auth_request/2, send/3, response/3, stop/1]).
-export([start_link/3]).
-export([connect/1, auth_request/2, send/3, response/3, stop/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -34,11 +34,12 @@
%%% API
%%%===================================================================
-spec auth_request(Pid :: pid(), Timeout :: integer()) -> Ref :: reference().
-spec auth_request(Pid :: pid(), Timeout :: integer()) -> no_return().
auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) ->
Ref = make_ref(),
gen_server:cast(Pid, {auth_request, self(), Ref, Timeout}),
Ref.
gen_server:cast(Pid, {auth_request, Timeout}).
connect(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, connect).
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}).
@ -50,10 +51,10 @@ stop(Pid) when is_pid(Pid) ->
gen_server:stop(Pid).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(ParentPid :: pid()) ->
-spec(start_link(ParentPid :: pid(), Host :: string(), Port :: integer()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(ParentPid) when is_pid(ParentPid) ->
gen_server:start_link(?MODULE, [ParentPid], []).
start_link(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_integer(Port) ->
gen_server:start_link(?MODULE, [ParentPid, Host, Port], []).
%%%===================================================================
%%% gen_server callbacks
@ -64,23 +65,8 @@ start_link(ParentPid) when is_pid(ParentPid) ->
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid]) ->
{ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props),
SslOptions = [
binary,
{packet, 4},
{verify, verify_none}
],
case ssl:connect(Host, Port, SslOptions, 5000) of
{ok, Socket} ->
ssl:controlling_process(Socket, self()),
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}};
{error, Reason} ->
{stop, Reason}
end.
init([ParentPid, Host, Port]) ->
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}.
%% @private
%% @doc Handling call messages
@ -101,7 +87,25 @@ handle_call(_Req, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = Socket, packet_id = PacketId}) ->
%%
handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = ParentPid}) ->
SslOptions = [
binary,
{packet, 4},
{verify, verify_none}
],
case ssl:connect(Host, Port, SslOptions, 5000) of
{ok, Socket} ->
ssl:controlling_process(Socket, self()),
ParentPid ! {connect_reply, ok},
{noreply, State#state{socket = Socket}};
{error, Reason} ->
ParentPid ! {connect_reply, {error, Reason}},
{noreply, Reason}
end;
%% auth校验
handle_cast({auth_request, Timeout}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) ->
{ok, AuthInfo} = application:get_env(efka, auth),
UUID = proplists:get_value(uuid, AuthInfo),
Username = proplists:get_value(username, AuthInfo),
@ -120,10 +124,10 @@ handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = S
%% auth返回的结果
receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
ReceiverPid ! {auth_reply, Ref, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}},
ParentPid ! {auth_reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}},
{noreply, State#state{packet_id = PacketId + 1}}
after Timeout ->
ReceiverPid ! {auth_reply, Ref, {error, timeout}},
ParentPid ! {auth_reply, {error, timeout}},
{stop, normal, State}
end;
@ -144,13 +148,13 @@ handle_cast({response, PacketId, Response}, State = #state{socket = Socket}) ->
%% packetId的是要求返回的0
handle_info({ssl, Socket, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
lager:debug("[efka_agent] socket get message: ~p", [Msg]),
ParentPid ! {efka_message, Msg},
ParentPid ! {server_push_message, Msg},
{noreply, State};
%% : <<CommandType:8, Command/binary>>, <<16:8, Directive/binary>>
handle_info({ssl, Socket, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
lager:debug("[efka_agent] socket get message: ~p", [Msg]),
ParentPid ! {efka_message, PacketId, Msg},
ParentPid ! {server_push_message, PacketId, Msg},
{noreply, State#state{}};
handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) ->