fix transport

This commit is contained in:
anlicheng 2025-04-22 15:34:42 +08:00
parent 2b4e8e4f40
commit 21e1f1f9cf

View File

@ -14,7 +14,7 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API %% API
-export([start_link/0]). -export([start_link/1]).
-export([auth_request/2, data/2, ping/2, inform/2, feedback_step/2, feedback_result/2, event/2, ai_event/2, response/2]). -export([auth_request/2, data/2, ping/2, inform/2, feedback_step/2, feedback_result/2, event/2, ai_event/2, response/2]).
%% gen_server callbacks %% gen_server callbacks
@ -23,6 +23,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-record(state, { -record(state, {
parent_pid :: pid(),
host :: string(), host :: string(),
port :: integer(), port :: integer(),
socket :: undefined | ssl:sslsocket(), socket :: undefined | ssl:sslsocket(),
@ -61,10 +62,10 @@ response(Pid, Response) when is_pid(Pid) ->
gen_server:cast(Pid, {response, Response}). gen_server:cast(Pid, {response, Response}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link() -> -spec(start_link(ParentPid :: pid()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}). {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() -> start_link(ParentPid) when is_pid(ParentPid) ->
gen_server:start_link(?MODULE, [], []). gen_server:start_link(?MODULE, [ParentPid], []).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -75,7 +76,7 @@ start_link() ->
-spec(init(Args :: term()) -> -spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([]) -> init([ParentPid]) ->
{ok, Props} = application:get_env(efka, tls_server), {ok, Props} = application:get_env(efka, tls_server),
Host = proplists:get_value(host, Props), Host = proplists:get_value(host, Props),
Port = proplists:get_value(port, Props), Port = proplists:get_value(port, Props),
@ -88,7 +89,7 @@ init([]) ->
], ],
case ssl:connect(Host, Port, SslOptions, 5000) of case ssl:connect(Host, Port, SslOptions, 5000) of
{ok, Socket} -> {ok, Socket} ->
{ok, #state{host = Host, port = Port, socket = Socket}}; {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}};
{error, Reason} -> {error, Reason} ->
{stop, Reason} {stop, Reason}
end. end.
@ -123,8 +124,9 @@ handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, pack
receive receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} -> {ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
lager:debug("[efka_agent] get a reply bin: ~p", [ReplyBin]), lager:debug("[efka_agent] get a reply bin: ~p", [ReplyBin]),
{reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State} {reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State#state{packet_id = PacketId + 1}}
after Timeout -> after
Timeout ->
{reply, {error, timeout}, State} {reply, {error, timeout}, State}
end. end.
@ -177,13 +179,22 @@ handle_cast({response, Response}, State = #state{socket = Socket}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_info({ssl, Socket, Data}, State = #state{socket = Socket}) -> %% packetId的是要求返回的0
lager:debug("[efka_agent] socket get message: ~p", [Data]), handle_info({ssl, Socket, <<?PACKET_PUBLISH, 0:32, Msg/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
{noreply, State#state{socket = undefined}}; lager:debug("[efka_agent] socket get message: ~p", [Msg]),
ParentPid ! {efka_message, Msg},
{noreply, State};
%% : <<CommandType:8, Command/binary>>, <<16:8, Directive/binary>>
handle_info({ssl, Socket, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
lager:debug("[efka_agent] socket get message: ~p", [Msg]),
ParentPid ! {efka_message, PacketId, Msg},
{noreply, State#state{}};
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
lager:warning("[efka_agent] socket close with reason: ~p", [Reason]), lager:warning("[efka_agent] socket close with reason: ~p", [Reason]),
{stop, transport_error, State}; {stop, transport_error, State};
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
lager:warning("[efka_agent] socket closed"), lager:warning("[efka_agent] socket closed"),
{stop, transport_closed, State}; {stop, transport_closed, State};