Compare commits
5 Commits
f947453e59
...
8f8e813a8c
| Author | SHA1 | Date | |
|---|---|---|---|
| 8f8e813a8c | |||
| 2166510989 | |||
| 57288fd464 | |||
| 5a795fc142 | |||
| 8d89223f3c |
@ -19,10 +19,14 @@
|
|||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
|
-define(STATE_DENIED, denied).
|
||||||
|
-define(STATE_ACTIVATED, activated).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
transport_pid :: undefined | pid()
|
transport_pid :: undefined | pid(),
|
||||||
|
status = ?STATE_DENIED
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -51,8 +55,7 @@ init([]) ->
|
|||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) ->
|
||||||
State :: #state{}) ->
|
|
||||||
{reply, Reply :: term(), NewState :: #state{}} |
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
@ -78,12 +81,37 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({timeout, _, create_transport}, State = #state{}) ->
|
handle_info({timeout, _, create_transport}, State = #state{}) ->
|
||||||
{ok, TransportPid} = efka_transport:start_link(self()),
|
case efka_transport:start_link(self()) of
|
||||||
{noreply, State#state{transport_pid = TransportPid}};
|
{ok, TransportPid} ->
|
||||||
|
Ref = efka_transport:auth_request(TransportPid, 5000),
|
||||||
|
receive
|
||||||
|
%% 验证通过
|
||||||
|
{auth_reply, Ref, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}} ->
|
||||||
|
lager:debug("[efka_agent] auth result: ~p, repository_url: ~p", [Message, RepositoryUrl]),
|
||||||
|
{noreply, State#state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}};
|
||||||
|
|
||||||
handle_info({'EXIT', _Pid, Reason}, State = #state{}) ->
|
%% 主机denied状态
|
||||||
lager:debug("[efka_agent] transport exit with reason: ~p", [Reason]),
|
{auth_reply, Ref, {ok, #auth_reply{code = -1, message = Message, repository_url = RepositoryUrl}}} ->
|
||||||
retry_connect(),
|
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
%% 验证不通过
|
||||||
|
{auth_reply, Ref, {ok, #auth_reply{code = -2, message = Message, repository_url = RepositoryUrl}}} ->
|
||||||
|
lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
{auth_reply, Ref, {error, Reason}} ->
|
||||||
|
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:warning("[efka_agent] connect get error: ~p", [Reason]),
|
||||||
|
{noreply, State}
|
||||||
|
end;
|
||||||
|
|
||||||
|
handle_info({'EXIT', Pid, Reason}, State = #state{}) ->
|
||||||
|
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [Pid, Reason]),
|
||||||
|
erlang:start_timer(5000, self(), create_transport),
|
||||||
{noreply, State#state{transport_pid = undefined}};
|
{noreply, State#state{transport_pid = undefined}};
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
handle_info(Info, State = #state{}) ->
|
||||||
@ -110,7 +138,4 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
retry_connect() ->
|
|
||||||
erlang:start_timer(5000, self(), create_transport).
|
|
||||||
@ -15,7 +15,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
-export([auth_request/2, send/3, response/3]).
|
-export([auth_request/2, send/3, response/3, stop/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
@ -34,8 +34,11 @@
|
|||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
|
-spec auth_request(Pid :: pid(), Timeout :: integer()) -> Ref :: reference().
|
||||||
auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) ->
|
auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) ->
|
||||||
gen_server:call(Pid, {auth_request, Timeout}).
|
Ref = make_ref(),
|
||||||
|
gen_server:cast(Pid, {auth_request, self(), Ref, Timeout}),
|
||||||
|
Ref.
|
||||||
|
|
||||||
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
|
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
|
||||||
gen_server:cast(Pid, {send, Method, Packet}).
|
gen_server:cast(Pid, {send, Method, Packet}).
|
||||||
@ -43,6 +46,9 @@ send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet
|
|||||||
response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId) ->
|
response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId) ->
|
||||||
gen_server:cast(Pid, {response, PacketId, Response}).
|
gen_server:cast(Pid, {response, PacketId, Response}).
|
||||||
|
|
||||||
|
stop(Pid) when is_pid(Pid) ->
|
||||||
|
gen_server:stop(Pid).
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
-spec(start_link(ParentPid :: pid()) ->
|
-spec(start_link(ParentPid :: pid()) ->
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
@ -66,12 +72,15 @@ init([ParentPid]) ->
|
|||||||
SslOptions = [
|
SslOptions = [
|
||||||
binary,
|
binary,
|
||||||
{packet, 4},
|
{packet, 4},
|
||||||
{active, true},
|
|
||||||
{verify, verify_none}
|
{verify, verify_none}
|
||||||
],
|
],
|
||||||
{ok, Socket} = ssl:connect(Host, Port, SslOptions, 5000),
|
case ssl:connect(Host, Port, SslOptions, 5000) of
|
||||||
|
{ok, Socket} ->
|
||||||
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}}.
|
ssl:controlling_process(Socket, self()),
|
||||||
|
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = Socket}};
|
||||||
|
{error, Reason} ->
|
||||||
|
{stop, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -83,7 +92,16 @@ init([ParentPid]) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId}) ->
|
handle_call(_Req, _From, State = #state{}) ->
|
||||||
|
{reply, ok, State#state{}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling cast messages
|
||||||
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_cast({auth_request, ReceiverPid, Ref, Timeout}, State = #state{socket = Socket, packet_id = PacketId}) ->
|
||||||
{ok, AuthInfo} = application:get_env(efka, auth),
|
{ok, AuthInfo} = application:get_env(efka, auth),
|
||||||
UUID = proplists:get_value(uuid, AuthInfo),
|
UUID = proplists:get_value(uuid, AuthInfo),
|
||||||
Username = proplists:get_value(username, AuthInfo),
|
Username = proplists:get_value(username, AuthInfo),
|
||||||
@ -102,19 +120,13 @@ handle_call({auth_request, Timeout}, _From, State = #state{socket = Socket, pack
|
|||||||
%% 需要等待auth返回的结果
|
%% 需要等待auth返回的结果
|
||||||
receive
|
receive
|
||||||
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
|
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
|
||||||
lager:debug("[efka_agent] get a reply bin: ~p", [ReplyBin]),
|
ReceiverPid ! {auth_reply, Ref, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}},
|
||||||
{reply, {ok, message_pb:decode_msg(ReplyBin, auth_reply)}, State#state{packet_id = PacketId + 1}}
|
{noreply, State#state{packet_id = PacketId + 1}}
|
||||||
after
|
after Timeout ->
|
||||||
Timeout ->
|
ReceiverPid ! {auth_reply, Ref, {error, timeout}},
|
||||||
{reply, {error, timeout}, State}
|
{stop, normal, State}
|
||||||
end.
|
end;
|
||||||
|
|
||||||
%% @private
|
|
||||||
%% @doc Handling cast messages
|
|
||||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
|
||||||
{noreply, NewState :: #state{}} |
|
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
|
||||||
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
|
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
|
||||||
ok = ssl:send(Socket, <<?PACKET_REQUEST, 0:32, Method, Packet/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, 0:32, Method, Packet/binary>>),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
@ -141,13 +153,14 @@ handle_info({ssl, Socket, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State =
|
|||||||
ParentPid ! {efka_message, PacketId, Msg},
|
ParentPid ! {efka_message, PacketId, Msg},
|
||||||
{noreply, State#state{}};
|
{noreply, State#state{}};
|
||||||
|
|
||||||
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
|
handle_info({ssl_error, Socket, _Reason}, State = #state{socket = Socket}) ->
|
||||||
{stop, Reason, State};
|
{stop, normal, State};
|
||||||
|
|
||||||
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
|
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
|
||||||
{stop, ssl_closed, State};
|
{stop, normal, State};
|
||||||
|
|
||||||
handle_info(_Info, State = #state{}) ->
|
handle_info(Info, State = #state{}) ->
|
||||||
|
lager:debug("[efka_transport] info: ~p", [Info]),
|
||||||
{noreply, State}.
|
{noreply, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user