diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 66d8917..35b983e 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -14,7 +14,7 @@ -behaviour(gen_server). %% API --export([start_link/0]). +-export([start_link/1]). -export([auth_request/2, data/2, ping/2, inform/2, feedback_step/2, feedback_result/2, event/2, ai_event/2, response/2]). %% gen_server callbacks @@ -23,6 +23,7 @@ -define(SERVER, ?MODULE). -record(state, { + parent_pid :: pid(), host :: string(), port :: integer(), socket :: undefined | ssl:sslsocket(), @@ -61,10 +62,10 @@ response(Pid, Response) when is_pid(Pid) -> gen_server:cast(Pid, {response, Response}). %% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> +-spec(start_link(ParentPid :: pid()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link(?MODULE, [], []). +start_link(ParentPid) when is_pid(ParentPid) -> + gen_server:start_link(?MODULE, [ParentPid], []). %%%=================================================================== %%% gen_server callbacks @@ -75,7 +76,7 @@ start_link() -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([]) -> +init([ParentPid]) -> {ok, Props} = application:get_env(efka, tls_server), Host = proplists:get_value(host, Props), Port = proplists:get_value(port, Props), @@ -88,7 +89,7 @@ init([]) -> ], case ssl:connect(Host, Port, SslOptions, 5000) of {ok, Socket} -> - {ok, #state{host = Host, port = Port, socket = Socket}}; + {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}}; {error, Reason} -> {stop, Reason} end. @@ -123,9 +124,10 @@ handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, pack receive {ssl, Socket, <>} -> lager:debug("[efka_agent] get a reply bin: ~p", [ReplyBin]), - {reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State} - after Timeout -> - {reply, {error, timeout}, State} + {reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State#state{packet_id = PacketId + 1}} + after + Timeout -> + {reply, {error, timeout}, State} end. %% @private @@ -177,13 +179,22 @@ handle_cast({response, Response}, State = #state{socket = Socket}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({ssl, Socket, Data}, State = #state{socket = Socket}) -> - lager:debug("[efka_agent] socket get message: ~p", [Data]), - {noreply, State#state{socket = undefined}}; +%% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + lager:debug("[efka_agent] socket get message: ~p", [Msg]), + ParentPid ! {efka_message, Msg}, + {noreply, State}; + +%% 目前推送的消息包括: <>, <<16:8, Directive/binary>> +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + lager:debug("[efka_agent] socket get message: ~p", [Msg]), + ParentPid ! {efka_message, PacketId, Msg}, + {noreply, State#state{}}; handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> lager:warning("[efka_agent] socket close with reason: ~p", [Reason]), {stop, transport_error, State}; + handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> lager:warning("[efka_agent] socket closed"), {stop, transport_closed, State};