%%%------------------------------------------------------------------- %%% @author anlicheng %%% @copyright (C) 2025, %%% @doc %%% %%% @end %%% Created : 20. 4月 2025 18:47 %%%------------------------------------------------------------------- -module(efka_transport). -author("anlicheng"). -include("message_pb.hrl"). -include("efka.hrl"). -behaviour(gen_server). %% API -export([start_link/3]). -export([connect/1, auth_request/2, send/3, push_response/3, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). -record(state, { parent_pid :: pid(), host :: string(), port :: integer(), socket :: undefined | ssl:sslsocket(), packet_id = 1 }). %%%=================================================================== %%% API %%%=================================================================== -spec auth_request(Pid :: pid(), Timeout :: integer()) -> no_return(). auth_request(Pid, Timeout) when is_pid(Pid), is_integer(Timeout) -> gen_server:cast(Pid, {auth_request, Timeout}). -spec connect(Pid :: pid()) -> no_return(). connect(Pid) when is_pid(Pid) -> gen_server:cast(Pid, connect). -spec send(Pid :: pid(), Method :: integer(), Packet :: binary()) -> no_return(). send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> gen_server:cast(Pid, {send, Method, Packet}). -spec push_response(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return(). push_response(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) -> gen_server:cast(Pid, {push_response, PacketId, Response}). -spec stop(Pid :: pid()) -> ok. stop(Pid) when is_pid(Pid) -> gen_server:stop(Pid, normal, 2000). %% @doc Spawns the server and registers the local name (unique) -spec(start_link(ParentPid :: pid(), Host :: string(), Port :: integer()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_integer(Port) -> gen_server:start_link(?MODULE, [ParentPid, Host, Port], []). %%%=================================================================== %%% gen_server callbacks %%%=================================================================== %% @private %% @doc Initializes the server -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([ParentPid, Host, Port]) -> {ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}. %% @private %% @doc Handling call messages -spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> {reply, Reply :: term(), NewState :: #state{}} | {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call(_Req, _From, State = #state{}) -> {reply, ok, State#state{}}. %% @private %% @doc Handling cast messages -spec(handle_cast(Request :: term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 建立到目标服务器的连接 handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = ParentPid}) -> SslOptions = [ binary, {packet, 4}, {verify, verify_none} ], case ssl:connect(Host, Port, SslOptions, 5000) of {ok, Socket} -> ok = ssl:controlling_process(Socket, self()), ParentPid ! {connect_reply, ok}, {noreply, State#state{socket = Socket}}; {error, Reason} -> ParentPid ! {connect_reply, {error, Reason}}, {noreply, State#state{socket = undefined}} end; %% auth校验 handle_cast({auth_request, Timeout}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) -> {ok, AuthInfo} = application:get_env(efka, auth), UUID = proplists:get_value(uuid, AuthInfo), Username = proplists:get_value(username, AuthInfo), Salt = proplists:get_value(salt, AuthInfo), Token = proplists:get_value(token, AuthInfo), RequestBin = message_pb:encode_msg(#auth_request{ uuid = unicode:characters_to_binary(UUID), username = unicode:characters_to_binary(Username), salt = unicode:characters_to_binary(Salt), token = unicode:characters_to_binary(Token), timestamp = efka_util:timestamp() }), ok = ssl:send(Socket, <>), %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> ParentPid ! {auth_reply, {ok, ReplyBin}}, {noreply, State#state{packet_id = PacketId + 1}} after Timeout -> ParentPid ! {auth_reply, {error, timeout}}, {noreply, State#state{packet_id = PacketId + 1}} end; handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), {noreply, State}; %% 服务push的消息的回复 handle_cast({push_response, PacketId, Response}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), {noreply, State}. %% @private %% @doc Handling all non call/cast messages -spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> ParentPid ! {server_command, CommandType, Command}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> #pub{topic = Topic, content = Content} = message_pb:decode_msg(PubBin, pub), ParentPid ! {server_pub, Topic, Content}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> ParentPid ! {server_push, PacketId, PushBin}, {noreply, State}; handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> lager:debug("[efka_transport] ssl error: ~p", [Reason]), {stop, normal, State}; handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> {stop, normal, State}; handle_info(Info, State = #state{}) -> lager:notice("[efka_transport] get unknown info: ~p", [Info]), {noreply, State}. %% @private %% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_server terminates %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). terminate(Reason, #state{}) -> efka_logger:notice("[efka_transport] terminate with reason: ~p", [Reason]), ok. %% @private %% @doc Convert process state when code is changed -spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, Extra :: term()) -> {ok, NewState :: #state{}} | {error, Reason :: term()}). code_change(_OldVsn, State = #state{}, _Extra) -> {ok, State}. %%%=================================================================== %%% Internal functions %%%===================================================================