From 5b3e39c31be4da086eadfdd53174ce23e41d532a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 18 Sep 2025 11:42:30 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96=E6=B6=88=E6=81=AF=E7=B1=BB?= =?UTF-8?q?=E5=9E=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/efka/include/message.hrl | 17 ++--------------- apps/efka/src/efka_remote_agent.erl | 4 ++-- apps/efka/src/efka_transport.erl | 23 +++++++++-------------- 3 files changed, 13 insertions(+), 31 deletions(-) diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index f76b53f..c4895fc 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -9,25 +9,12 @@ -author("anlicheng"). %% efka主动发起的消息体类型, 消息大类 --define(PACKET_REQUEST, 16#01). --define(PACKET_RESPONSE, 16#02). +-define(PACKET_RPC, 16#01). +-define(PACKET_RPC_REPLY, 16#02). %% efka主动发起不需要返回的数据 -define(PACKET_CAST, 16#03). -%% 服务器基于pub/sub的消息, 消息大类 --define(PACKET_PUB, 16#04). -%% push调用不需要返回, 消息大类 --define(PACKET_COMMAND, 16#05). - -%% 服务器主动发起的RPC调用 -%% 服务端不能直接和Service通讯,链路太麻烦了,并且服务端也不知道如何和服务通讯 --define(PACKET_RPC, 16#10). --define(PACKET_RPC_REPLY, 16#11). - -%% ping包,客户端主动发起 --define(PACKET_PING, 16#FF). - %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%% 二级分类定义 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 92723cc..7924c3e 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -273,7 +273,7 @@ handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>, % {keep_state, State}; %% 处理命令 -handle_event(info, {server_command, #command{command_type = ?COMMAND_AUTH, command = Auth0}}, StateName, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_cast, #command{command_type = ?COMMAND_AUTH, command = Auth0}}, StateName, State = #state{transport_pid = TransportPid}) -> Auth = binary_to_integer(Auth0), case {Auth, StateName} of {1, ?STATE_ACTIVATED} -> @@ -289,7 +289,7 @@ handle_event(info, {server_command, #command{command_type = ?COMMAND_AUTH, comma end; %% 处理Pub/Sub机制 -handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) -> +handle_event(info, {server_cast, #pub{topic = Topic, content = Content}}, ?STATE_ACTIVATED, State) -> lager:debug("[efka_remote_agent] get pub topic: ~p, content: ~p", [Topic, Content]), %% 消息发送到订阅系统 efka_subscription:publish(Topic, Content), diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index cb73deb..fac16a6 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -114,10 +114,10 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren %% auth校验 handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) -> PacketId = 1, - ok = ssl:send(Socket, <>), + ok = ssl:send(Socket, <>), %% 需要等待auth返回的结果 receive - {ssl, Socket, <>} -> + {ssl, Socket, <>} -> {ok, #auth_reply{} = Reply} = message_codec:decode(ReplyBin), ParentPid ! {auth_reply, {ok, Reply}}, {noreply, State}; @@ -146,14 +146,9 @@ handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - {ok, #command{} = Command} = message_codec:decode(CommandBin), - ParentPid ! {server_command, Command}, - {noreply, State}; - -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - {ok, #pub{} = Pub} = message_codec:decode(PubBin), - ParentPid ! {server_pub, Pub}, +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + {ok, CastRequest} = message_codec:decode(CommandBin), + ParentPid ! {server_cast, CastRequest}, {noreply, State}; handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> @@ -168,10 +163,10 @@ handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> handle_info({ssl_closed, Socket}, State = #state{socket = Socket}) -> {stop, normal, State}; -handle_info({timeout, _, ping_ticker}, State = #state{socket = Socket}) -> - ok = ssl:send(Socket, <>), - ping_ticker(), - {noreply, State}; +%handle_info({timeout, _, ping_ticker}, State = #state{socket = Socket}) -> +% ok = ssl:send(Socket, <>), +% ping_ticker(), +% {noreply, State}; handle_info(Info, State = #state{}) -> lager:notice("[efka_transport] get unknown info: ~p", [Info]),