diff --git a/apps/efka/src/efka_agent2.erl b/apps/efka/src/efka_agent2.erl index 902015f..55e931b 100644 --- a/apps/efka/src/efka_agent2.erl +++ b/apps/efka/src/efka_agent2.erl @@ -35,7 +35,9 @@ -record(state, { transport_pid :: undefined | pid(), %% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId} - push_inflight = #{} + push_inflight = #{}, + %% 发送的请求的未确认列表, 映射关系 #{Ref => ReceiverPid} + request_inflight = #{} }). %%%=================================================================== @@ -63,7 +65,7 @@ request_service_config(ReceiverPid, ServiceId) when is_binary(ServiceId) -> -spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}. await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> receive - {transport_reply, Ref, ReplyBin} -> + {request_reply, Ref, ReplyBin} -> {ok, ReplyBin} after Timeout -> {error, timeout} @@ -98,9 +100,9 @@ callback_mode() -> %% @doc If callback_mode is handle_event_function, then whenever a %% gen_statem receives an event from call/2, cast/2, or as a normal %% process message, this function is called. -handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - Ref = efka_transport:request(TransportPid, ReceiverPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId), - {keep_state, State, [{reply, From, {ok, Ref}}]}; +handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, request_inflight = RequestInflight}) -> + Ref = efka_transport:request(TransportPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId), + {keep_state, State#state{request_inflight = maps:put(Ref, ReceiverPid, RequestInflight)}, [{reply, From, {ok, Ref}}]}; handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) -> {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; @@ -354,6 +356,16 @@ handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #st {keep_state, State#state{push_inflight = NPushInflight}} end; +%% 收到来自服务器端的回复 +handle_event(info, {server_reply, Ref, ReplyBin}, ?STATE_ACTIVATED, State = #state{request_inflight = RequestInflight}) -> + case maps:take(Ref, RequestInflight) of + error -> + {keep_state, State}; + {ReceiverPid, NRequestInflight} -> + is_process_alive(ReceiverPid) andalso erlang:send(ReceiverPid, {request_reply, Ref, ReplyBin}), + {keep_state, State#state{push_inflight = NRequestInflight}} + end; + %% todo 请求超时逻辑处理 handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) -> case maps:take(Ref, PushInflight) of diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index f3cb40a..19b03cc 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -16,7 +16,7 @@ %% API -export([start_link/3]). -export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]). --export([request/4]). +-export([request/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,10 +41,10 @@ auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) -> gen_server:cast(Pid, {auth_request, AuthBin}). --spec request(Pid :: pid(), ReceiverPid :: pid(), Method :: integer(), ReqBin :: binary()) -> Ref :: reference(). -request(Pid, ReceiverPid, Method, ReqBin) when is_pid(Pid), is_binary(ReqBin) -> +-spec request(Pid :: pid(), Method :: integer(), ReqBin :: binary()) -> Ref :: reference(). +request(Pid, Method, ReqBin) when is_pid(Pid), is_binary(ReqBin) -> Ref = make_ref(), - gen_server:cast(Pid, {request, Ref, ReceiverPid, Method, ReqBin}), + gen_server:cast(Pid, {request, Ref, Method, ReqBin}), Ref. -spec connect(Pid :: pid()) -> no_return(). @@ -139,9 +139,9 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi end; %% 提交请求 -handle_cast({request, Ref, ReceiverPid, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> +handle_cast({request, Ref, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> ok = ssl:send(Socket, <>), - {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, Ref, Inflight)}}; handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), @@ -173,17 +173,12 @@ handle_info({ssl, Socket, < iot:response -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, inflight = Inflight, parent_pid = ParentPid}) -> case maps:take(PacketId, Inflight) of error -> {noreply, State}; - {{Ref, ReceiverPid}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {transport_reply, Ref, ReplyBin}; - false -> - ok - end, + {Ref, NInflight} -> + ParentPid ! {server_reply, Ref, ReplyBin}, {noreply, State#state{inflight = NInflight}} end;