简化消息类型

This commit is contained in:
anlicheng 2025-09-18 11:42:30 +08:00
parent 900314a6b6
commit 5b3e39c31b
3 changed files with 13 additions and 31 deletions

View File

@ -9,25 +9,12 @@
-author("anlicheng").
%% efka主动发起的消息体类型,
-define(PACKET_REQUEST, 16#01).
-define(PACKET_RESPONSE, 16#02).
-define(PACKET_RPC, 16#01).
-define(PACKET_RPC_REPLY, 16#02).
%% efka主动发起不需要返回的数据
-define(PACKET_CAST, 16#03).
%% pub/sub的消息,
-define(PACKET_PUB, 16#04).
%% push调用不需要返回,
-define(PACKET_COMMAND, 16#05).
%% RPC调用
%% Service通讯,
-define(PACKET_RPC, 16#10).
-define(PACKET_RPC_REPLY, 16#11).
%% ping包
-define(PACKET_PING, 16#FF).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -273,7 +273,7 @@ handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>,
% {keep_state, State};
%%
handle_event(info, {server_command, #command{command_type = ?COMMAND_AUTH, command = Auth0}}, StateName, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_cast, #command{command_type = ?COMMAND_AUTH, command = Auth0}}, StateName, State = #state{transport_pid = TransportPid}) ->
Auth = binary_to_integer(Auth0),
case {Auth, StateName} of
{1, ?STATE_ACTIVATED} ->
@ -289,7 +289,7 @@ handle_event(info, {server_command, #command{command_type = ?COMMAND_AUTH, comma
end;
%% Pub/Sub机制
handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) ->
handle_event(info, {server_cast, #pub{topic = Topic, content = Content}}, ?STATE_ACTIVATED, State) ->
lager:debug("[efka_remote_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
%%
efka_subscription:publish(Topic, Content),

View File

@ -114,10 +114,10 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren
%% auth校验
handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) ->
PacketId = 1,
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, AuthRequestBin/binary>>),
ok = ssl:send(Socket, <<?PACKET_RPC, PacketId:32, AuthRequestBin/binary>>),
%% auth返回的结果
receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
{ssl, Socket, <<?PACKET_RPC_REPLY, PacketId:32, ReplyBin/binary>>} ->
{ok, #auth_reply{} = Reply} = message_codec:decode(ReplyBin),
ParentPid ! {auth_reply, {ok, Reply}},
{noreply, State};
@ -146,14 +146,9 @@ handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% packetId的是要求返回的0
handle_info({ssl, Socket, <<?PACKET_COMMAND, CommandBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
{ok, #command{} = Command} = message_codec:decode(CommandBin),
ParentPid ! {server_command, Command},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_PUB, PubBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
{ok, #pub{} = Pub} = message_codec:decode(PubBin),
ParentPid ! {server_pub, Pub},
handle_info({ssl, Socket, <<?PACKET_CAST, CommandBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
{ok, CastRequest} = message_codec:decode(CommandBin),
ParentPid ! {server_cast, CastRequest},
{noreply, State};
handle_info({ssl, Socket, <<?PACKET_RPC, PacketId:32, RPCRequestBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
@ -168,10 +163,10 @@ handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) ->
handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) ->
{stop, normal, State};
handle_info({timeout, _, ping_ticker}, State = #state{socket = Socket}) ->
ok = ssl:send(Socket, <<?PACKET_PING>>),
ping_ticker(),
{noreply, State};
%handle_info({timeout, _, ping_ticker}, State = #state{socket = Socket}) ->
% ok = ssl:send(Socket, <<?PACKET_PING>>),
% ping_ticker(),
% {noreply, State};
handle_info(Info, State = #state{}) ->
lager:notice("[efka_transport] get unknown info: ~p", [Info]),