fix
This commit is contained in:
parent
764a7f67c0
commit
a8962580b0
@ -35,7 +35,9 @@
|
|||||||
-record(state, {
|
-record(state, {
|
||||||
transport_pid :: undefined | pid(),
|
transport_pid :: undefined | pid(),
|
||||||
%% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId}
|
%% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId}
|
||||||
push_inflight = #{}
|
push_inflight = #{},
|
||||||
|
%% 发送的请求的未确认列表, 映射关系 #{Ref => ReceiverPid}
|
||||||
|
request_inflight = #{}
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -63,7 +65,7 @@ request_service_config(ReceiverPid, ServiceId) when is_binary(ServiceId) ->
|
|||||||
-spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}.
|
-spec await_reply(Ref :: reference(), Timeout :: timeout()) -> {ok, Reply :: binary()} | {error, timeout}.
|
||||||
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
||||||
receive
|
receive
|
||||||
{transport_reply, Ref, ReplyBin} ->
|
{request_reply, Ref, ReplyBin} ->
|
||||||
{ok, ReplyBin}
|
{ok, ReplyBin}
|
||||||
after Timeout ->
|
after Timeout ->
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
@ -98,9 +100,9 @@ callback_mode() ->
|
|||||||
%% @doc If callback_mode is handle_event_function, then whenever a
|
%% @doc If callback_mode is handle_event_function, then whenever a
|
||||||
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
%% gen_statem receives an event from call/2, cast/2, or as a normal
|
||||||
%% process message, this function is called.
|
%% process message, this function is called.
|
||||||
handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, request_inflight = RequestInflight}) ->
|
||||||
Ref = efka_transport:request(TransportPid, ReceiverPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId),
|
Ref = efka_transport:request(TransportPid, ?METHOD_REQUEST_SERVICE_CONFIG, ServiceId),
|
||||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
{keep_state, State#state{request_inflight = maps:put(Ref, ReceiverPid, RequestInflight)}, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) ->
|
handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) ->
|
||||||
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
|
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
|
||||||
@ -354,6 +356,16 @@ handle_event(info, {service_reply, Ref, EmsReply}, ?STATE_ACTIVATED, State = #st
|
|||||||
{keep_state, State#state{push_inflight = NPushInflight}}
|
{keep_state, State#state{push_inflight = NPushInflight}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% 收到来自服务器端的回复
|
||||||
|
handle_event(info, {server_reply, Ref, ReplyBin}, ?STATE_ACTIVATED, State = #state{request_inflight = RequestInflight}) ->
|
||||||
|
case maps:take(Ref, RequestInflight) of
|
||||||
|
error ->
|
||||||
|
{keep_state, State};
|
||||||
|
{ReceiverPid, NRequestInflight} ->
|
||||||
|
is_process_alive(ReceiverPid) andalso erlang:send(ReceiverPid, {request_reply, Ref, ReplyBin}),
|
||||||
|
{keep_state, State#state{push_inflight = NRequestInflight}}
|
||||||
|
end;
|
||||||
|
|
||||||
%% todo 请求超时逻辑处理
|
%% todo 请求超时逻辑处理
|
||||||
handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
|
handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State = #state{push_inflight = PushInflight, transport_pid = TransportPid}) ->
|
||||||
case maps:take(Ref, PushInflight) of
|
case maps:take(Ref, PushInflight) of
|
||||||
|
|||||||
@ -16,7 +16,7 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
||||||
-export([request/4]).
|
-export([request/3]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
@ -41,10 +41,10 @@
|
|||||||
auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
|
auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
|
||||||
gen_server:cast(Pid, {auth_request, AuthBin}).
|
gen_server:cast(Pid, {auth_request, AuthBin}).
|
||||||
|
|
||||||
-spec request(Pid :: pid(), ReceiverPid :: pid(), Method :: integer(), ReqBin :: binary()) -> Ref :: reference().
|
-spec request(Pid :: pid(), Method :: integer(), ReqBin :: binary()) -> Ref :: reference().
|
||||||
request(Pid, ReceiverPid, Method, ReqBin) when is_pid(Pid), is_binary(ReqBin) ->
|
request(Pid, Method, ReqBin) when is_pid(Pid), is_binary(ReqBin) ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
gen_server:cast(Pid, {request, Ref, ReceiverPid, Method, ReqBin}),
|
gen_server:cast(Pid, {request, Ref, Method, ReqBin}),
|
||||||
Ref.
|
Ref.
|
||||||
|
|
||||||
-spec connect(Pid :: pid()) -> no_return().
|
-spec connect(Pid :: pid()) -> no_return().
|
||||||
@ -139,9 +139,9 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 提交请求
|
%% 提交请求
|
||||||
handle_cast({request, Ref, ReceiverPid, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
handle_cast({request, Ref, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||||
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, Method:8, ReqBin/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, Method:8, ReqBin/binary>>),
|
||||||
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
|
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, Ref, Inflight)}};
|
||||||
|
|
||||||
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
|
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
|
||||||
ok = ssl:send(Socket, <<?PACKET_REQUEST, Method:8, Packet/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, Method:8, Packet/binary>>),
|
||||||
@ -173,17 +173,12 @@ handle_info({ssl, Socket, <<?PACKET_ASYNC_CALL, PacketId:32, AsyncCallBin/binary
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% efka:request <-> iot:response
|
%% efka:request <-> iot:response
|
||||||
handle_info({ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
|
handle_info({ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>}, State = #state{socket = Socket, inflight = Inflight, parent_pid = ParentPid}) ->
|
||||||
case maps:take(PacketId, Inflight) of
|
case maps:take(PacketId, Inflight) of
|
||||||
error ->
|
error ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
{{Ref, ReceiverPid}, NInflight} ->
|
{Ref, NInflight} ->
|
||||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
ParentPid ! {server_reply, Ref, ReplyBin},
|
||||||
true ->
|
|
||||||
ReceiverPid ! {transport_reply, Ref, ReplyBin};
|
|
||||||
false ->
|
|
||||||
ok
|
|
||||||
end,
|
|
||||||
{noreply, State#state{inflight = NInflight}}
|
{noreply, State#state{inflight = NInflight}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user