重新处理agent和transport之间的关系
This commit is contained in:
parent
3c8a232054
commit
5dc3ecd671
@ -34,6 +34,7 @@
|
|||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
transport_pid :: undefined | pid(),
|
transport_pid :: undefined | pid(),
|
||||||
|
transport_ref :: undefined | reference(),
|
||||||
%% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId}
|
%% 服务器端推送的消息的未确认列表, 映射关系 #{Ref => PacketId}
|
||||||
push_inflight = #{},
|
push_inflight = #{},
|
||||||
%% 发送的请求的未确认列表, 映射关系 #{Ref => ReceiverPid}
|
%% 发送的请求的未确认列表, 映射关系 #{Ref => ReceiverPid}
|
||||||
@ -86,7 +87,6 @@ start_link() ->
|
|||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
%% process to initialize.
|
%% process to initialize.
|
||||||
init([]) ->
|
init([]) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
|
||||||
erlang:start_timer(0, self(), create_transport),
|
erlang:start_timer(0, self(), create_transport),
|
||||||
{ok, ?STATE_DENIED, #state{}}.
|
{ok, ?STATE_DENIED, #state{}}.
|
||||||
|
|
||||||
@ -170,10 +170,10 @@ handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) ->
|
|||||||
{ok, Props} = application:get_env(efka, tls_server),
|
{ok, Props} = application:get_env(efka, tls_server),
|
||||||
Host = proplists:get_value(host, Props),
|
Host = proplists:get_value(host, Props),
|
||||||
Port = proplists:get_value(port, Props),
|
Port = proplists:get_value(port, Props),
|
||||||
{ok, TransportPid} = efka_transport:start_link(self(), Host, Port),
|
{ok, {TransportPid, TransportRef}} = efka_transport:start_monitor(self(), Host, Port),
|
||||||
efka_transport:connect(TransportPid),
|
efka_transport:connect(TransportPid),
|
||||||
|
|
||||||
{next_state, ?STATE_CONNECTING, State#state{transport_pid = TransportPid}};
|
{next_state, ?STATE_CONNECTING, State#state{transport_pid = TransportPid, transport_ref = TransportRef}};
|
||||||
|
|
||||||
handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{transport_pid = TransportPid}) ->
|
||||||
case Reply of
|
case Reply of
|
||||||
@ -390,10 +390,10 @@ handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% transport进程退出
|
%% transport进程退出
|
||||||
handle_event(info, {'EXIT', TransportPid, Reason}, _, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) ->
|
||||||
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
|
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
|
||||||
erlang:start_timer(5000, self(), create_transport),
|
erlang:start_timer(5000, self(), create_transport),
|
||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}.
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined, transport_ref = undefined}}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc This function is called by a gen_statem when it is about to
|
%% @doc This function is called by a gen_statem when it is about to
|
||||||
|
|||||||
@ -14,7 +14,7 @@
|
|||||||
-behaviour(gen_server).
|
-behaviour(gen_server).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/3]).
|
-export([start_monitor/3]).
|
||||||
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
||||||
-export([request/3]).
|
-export([request/3]).
|
||||||
|
|
||||||
@ -68,10 +68,10 @@ stop(Pid) when is_pid(Pid) ->
|
|||||||
gen_server:stop(Pid, normal, 2000).
|
gen_server:stop(Pid, normal, 2000).
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
-spec(start_link(ParentPid :: pid(), Host :: string(), Port :: integer()) ->
|
-spec(start_monitor(ParentPid :: pid(), Host :: string(), Port :: integer()) ->
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
{ok, {Pid :: pid(), MRef :: reference()}} | ignore | {error, Reason :: term()}).
|
||||||
start_link(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_integer(Port) ->
|
start_monitor(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_integer(Port) ->
|
||||||
gen_server:start_link(?MODULE, [ParentPid, Host, Port], []).
|
gen_server:start_monitor(?MODULE, [ParentPid, Host, Port], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_server callbacks
|
%%% gen_server callbacks
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user