This commit is contained in:
anlicheng 2025-04-22 17:55:26 +08:00
parent 57288fd464
commit 2166510989
2 changed files with 32 additions and 25 deletions

View File

@ -83,25 +83,25 @@ handle_cast(_Request, State = #state{}) ->
handle_info({timeout, _, create_transport}, State = #state{}) -> handle_info({timeout, _, create_transport}, State = #state{}) ->
case efka_transport:start_link(self()) of case efka_transport:start_link(self()) of
{ok, TransportPid} -> {ok, TransportPid} ->
case efka_transport:auth_request(TransportPid, 5000) of Ref = efka_transport:auth_request(TransportPid, 5000),
receive
%% %%
{ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} -> {auth_reply, Ref, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}} ->
lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]), lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}}; {noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}};
%% denied状态 %% denied状态
{ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}} -> {auth_reply, Ref, {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}}} ->
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State}; {noreply, State};
%% %%
{ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}} -> {auth_reply, Ref, {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}}} ->
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
{noreply, State}; {noreply, State};
{error, Reason} -> {auth_reply, Ref, {error, Reason}} ->
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
efka_transport:stop(TransportPid),
{noreply, State} {noreply, State}
end; end;
{error, Reason} -> {error, Reason} ->
@ -109,8 +109,8 @@ handle_info({timeout, _, create_transport}, State = #state{}) ->
{noreply, State} {noreply, State}
end; end;
handle_info({'EXIT', _Pid, Reason}, State = #state{}) -> handle_info({'EXIT', Pid, Reason}, State = #state{}) ->
lager:debug("[efka_agent] transport exit with reason: ~p", [Reason]), lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [Pid, Reason]),
erlang:start_timer(5000, self(), create_transport), erlang:start_timer(5000, self(), create_transport),
{noreply, State#state{transport_pid = undefined}}; {noreply, State#state{transport_pid = undefined}};

View File

@ -34,9 +34,11 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
-spec auth_request(Pid :: pid(), Timeout :: integer()) -> {ok, AuthReply :: #auth_reply{}} | {error, Reason :: any()}. -spec auth_request(Pid :: pid(), Timeout :: integer()) -> Ref :: reference().
auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) -> auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) ->
gen_server:call(Pid, {auth_request, Timeout}). Ref = make_ref(),
gen_server:cast(Pid, {auth_request, self(), Ref, Timeout}),
Ref.
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}). gen_server:cast(Pid, {send, Method, Packet}).
@ -90,7 +92,16 @@ init([ParentPid]) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId}) -> handle_call(_Req, _From, State = #state{}) ->
{reply, ok, State#state{}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = Socket, packet_id = PacketId}) ->
{ok, AuthInfo} = application:get_env(efka, auth), {ok, AuthInfo} = application:get_env(efka, auth),
UUID = proplists:get_value(uuid, AuthInfo), UUID = proplists:get_value(uuid, AuthInfo),
Username = proplists:get_value(username, AuthInfo), Username = proplists:get_value(username, AuthInfo),
@ -109,18 +120,14 @@ handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, pack
%% auth返回的结果 %% auth返回的结果
receive receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} -> {ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
{reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State#state{packet_id = PacketId + 1}} lager:debug("call me here auth reply"),
after ReceiverPid ! {auth_reply, Ref, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}},
Timeout -> {noreply, State#state{packet_id = PacketId + 1}}
{reply, {error, timeout}, State} after Timeout ->
end. ReceiverPid ! {auth_reply, Ref, {error, timeout}},
{noreply, State}
end;
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_REQUEST, 0:32, Method, Packet/binary>>), ok = ssl:send(Socket, <<?PACKET_REQUEST, 0:32, Method, Packet/binary>>),
{noreply, State}; {noreply, State};
@ -147,11 +154,11 @@ handle_info({ssl, Socket, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State =
ParentPid ! {efka_message, PacketId, Msg}, ParentPid ! {efka_message, PacketId, Msg},
{noreply, State#state{}}; {noreply, State#state{}};
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) ->
{stop, Reason, State}; {stop, normal, State};
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
{stop, ssl_closed, State}; {stop, normal, State};
handle_info(Info, State = #state{}) -> handle_info(Info, State = #state{}) ->
lager:debug("[efka_transport] info: ~p", [Info]), lager:debug("[efka_transport] info: ~p", [Info]),