ekfa/apps/efka/src/efka_transport.erl
2025-09-17 17:11:52 +08:00

201 lines
7.9 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 20. 4月 2025 18:47
%%%-------------------------------------------------------------------
-module(efka_transport).
-author("anlicheng").
-include("efka.hrl").
-behaviour(gen_server).
%% API
-export([start_monitor/3]).
-export([connect/1, auth_request/2, send/3, rpc_reply/3, stop/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
parent_pid :: pid(),
host :: string(),
port :: integer(),
socket :: undefined | ssl:sslsocket()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec auth_request(Pid :: pid(), AuthBin :: binary()) -> no_return().
auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
gen_server:cast(Pid, {auth_request, AuthBin}).
-spec connect(Pid :: pid()) -> no_return().
connect(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, connect).
-spec send(Pid :: pid(), Method :: integer(), Packet :: binary()) -> no_return().
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}).
-spec rpc_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return().
rpc_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) ->
ok;
rpc_reply(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) ->
gen_server:cast(Pid, {rpc_reply, PacketId, Response}).
%% 关闭的时候不一定能成功可能关闭的时候transport进程已经退出了
-spec stop(Pid :: pid() | undefined) -> ok.
stop(undefined) ->
ok;
stop(Pid) when is_pid(Pid) ->
catch gen_server:stop(Pid, normal, 2000).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_monitor(ParentPid :: pid(), Host :: string(), Port :: integer()) ->
{ok, {Pid :: pid(), MRef :: reference()}} | ignore | {error, Reason :: term()}).
start_monitor(ParentPid, Host, Port) when is_pid(ParentPid), is_list(Host), is_integer(Port) ->
gen_server:start_monitor(?MODULE, [ParentPid, Host, Port], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Host, Port]) ->
{ok, #state{parent_pid = ParentPid, host = Host, port = Port, socket = undefined}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Req, _From, State = #state{}) ->
{reply, ok, State#state{}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 建立到目标服务器的连接
handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = ParentPid}) ->
SslOptions = [
binary,
{packet, 4},
{verify, verify_none}
],
case ssl:connect(Host, Port, SslOptions, 5000) of
{ok, Socket} ->
ok = ssl:controlling_process(Socket, self()),
ParentPid ! {connect_reply, ok},
ping_ticker(),
{noreply, State#state{socket = Socket}};
{error, Reason} ->
ParentPid ! {connect_reply, {error, Reason}},
{noreply, State#state{socket = undefined}}
end;
%% auth校验
handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) ->
PacketId = 1,
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, AuthRequestBin/binary>>),
%% 需要等待auth返回的结果
receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
ParentPid ! {auth_reply, {ok, ReplyBin}},
{noreply, State};
{ssl, Socket, Info} ->
lager:warning("[efka_transport] get invalid auth_reply: ~p", [Info]),
ParentPid ! {auth_reply, {error, invalid_auth_reply}},
{noreply, State}
after 5000 ->
ParentPid ! {auth_reply, {error, timeout}},
{noreply, State}
end;
handle_cast({send, Method, Packet}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_CAST, Method:8, Packet/binary>>),
{noreply, State};
%% 服务push的消息的回复
handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_RPC_REPLY, PacketId:32, Response/binary>>),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 服务器主动推送的数据有packetId的是要求返回的为0的表示不需要返回值
handle_info({ssl, Socket, <<?PACKET_COMMAND, CommandType:8, Command/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
ParentPid ! {server_command, CommandType, Command},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_PUB, PubBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
#pub{topic = Topic, content = Content} = message_pb:decode_msg(PubBin, pub),
ParentPid ! {server_pub, Topic, Content},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequest/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
ParentPid ! {server_rpc, PacketId, RPCRequest},
{noreply, State};
handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
lager:debug("[efka_transport] ssl error: ~p", [Reason]),
{stop, normal, State};
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
{stop, normal, State};
handle_info({timeout, _, ping_ticker}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_PING>>),
ping_ticker(),
{noreply, State};
handle_info(Info, State = #state{}) ->
lager:notice("[efka_transport] get unknown info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, #state{}) ->
lager:notice("[efka_transport] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
ping_ticker() ->
erlang:start_timer(5000, self(), ping_ticker).