From 38b46874d33f180223fa103ff27ed945451c2626 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 18 Sep 2025 15:16:07 +0800 Subject: [PATCH] fix efka --- apps/efka/include/message.hrl | 30 ++++++++++--------- apps/efka/src/efka_remote_agent.erl | 6 ++-- apps/efka/src/efka_sup.erl | 16 +++++----- apps/efka/src/efka_transport.erl | 16 +++++----- apps/efka/src/{ => message}/message_codec.erl | 14 ++++----- 5 files changed, 42 insertions(+), 40 deletions(-) rename apps/efka/src/{ => message}/message_codec.erl (92%) diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index c4895fc..f5f13c9 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -9,8 +9,8 @@ -author("anlicheng"). %% efka主动发起的消息体类型, 消息大类 --define(PACKET_RPC, 16#01). --define(PACKET_RPC_REPLY, 16#02). +-define(PACKET_REQUEST, 16#01). +-define(PACKET_RESPONSE, 16#02). %% efka主动发起不需要返回的数据 -define(PACKET_CAST, 16#03). @@ -21,13 +21,15 @@ %% 主机端上报数据类型标识 -define(MESSAGE_AUTH_REQUEST, 16#01). --define(MESSAGE_AUTH_REPLY, 16#02). --define(MESSAGE_PUB, 16#03). --define(MESSAGE_COMMAND, 16#04). --define(MESSAGE_RPC_DEPLOY, 16#05). --define(MESSAGE_RPC_CONTAINER, 16#06). --define(MESSAGE_DATA, 16#07). --define(MESSAGE_EVENT, 16#08). +-define(MESSAGE_PUB, 16#02). +-define(MESSAGE_COMMAND, 16#03). +-define(MESSAGE_RPC_DEPLOY, 16#04). +-define(MESSAGE_RPC_CONTAINER, 16#05). +-define(MESSAGE_DATA, 16#06). +-define(MESSAGE_EVENT, 16#07). + +%% 响应数据 +-define(MESSAGE_RPC_REPLY, 16#FF). %%%% 命令类型子分类, 不需要返回值 %% 授权 @@ -41,11 +43,6 @@ timestamp :: integer() }). --record(auth_reply, { - code :: integer(), - message :: binary() -}). - -record(pub, { topic :: binary(), content :: binary() @@ -67,6 +64,11 @@ params = <<>> :: binary() }). +-record(rpc_reply, { + code :: integer(), + payload :: binary() +}). + -record(data, { service_id :: binary(), device_uuid :: binary(), diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 7924c3e..d857b7c 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -165,7 +165,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> case Reply of - {ok, #auth_reply{code = Code, message = Message}} -> + {ok, #rpc_reply{code = Code, payload = Message}} -> case Code of 0 -> lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]), @@ -342,8 +342,8 @@ auth_request() -> -spec reply_success(Result :: binary()) -> binary(). reply_success(Result) when is_binary(Result) -> - <<1:8, Result/binary>>. + message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Result}). -spec reply_error(Message :: binary()) -> binary(). reply_error(Message) when is_binary(Message) -> - <<0:8, Message/binary>>. \ No newline at end of file + message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = Message}). \ No newline at end of file diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index 574d285..c613e07 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -73,14 +73,14 @@ init([]) -> modules => ['docker_manager'] } - %#{ - % id => 'efka_remote_agent', - % start => {'efka_remote_agent', start_link, []}, - % restart => permanent, - % shutdown => 2000, - % type => worker, - % modules => ['efka_remote_agent'] - %}, + #{ + id => 'efka_remote_agent', + start => {'efka_remote_agent', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['efka_remote_agent'] + } ], diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index fac16a6..7949a26 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -47,8 +47,8 @@ send(Pid, Packet) when is_pid(Pid), is_binary(Packet) -> -spec rpc_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return(). rpc_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) -> ok; -rpc_reply(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) -> - gen_server:cast(Pid, {rpc_reply, PacketId, Response}). +rpc_reply(Pid, PacketId, Reply) when is_pid(Pid), is_integer(PacketId), is_binary(Reply) -> + gen_server:cast(Pid, {rpc_reply, PacketId, Reply}). %% 关闭的时候不一定能成功,可能关闭的时候;transport进程已经退出了 -spec stop(Pid :: pid() | undefined) -> ok. @@ -114,11 +114,11 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren %% auth校验 handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) -> PacketId = 1, - ok = ssl:send(Socket, <>), + ok = ssl:send(Socket, <>), %% 需要等待auth返回的结果 receive - {ssl, Socket, <>} -> - {ok, #auth_reply{} = Reply} = message_codec:decode(ReplyBin), + {ssl, Socket, <>} -> + {ok, #rpc_reply{} = Reply} = message_codec:decode(ReplyBin), ParentPid ! {auth_reply, {ok, Reply}}, {noreply, State}; {ssl, Socket, Info} -> @@ -135,8 +135,8 @@ handle_cast({send, Packet}, State = #state{socket = Socket}) -> {noreply, State}; %% 服务push的消息的回复 -handle_cast({rpc_reply, PacketId, Response}, State = #state{socket = Socket}) -> - ok = ssl:send(Socket, <>), +handle_cast({rpc_reply, PacketId, Reply}, State = #state{socket = Socket}) -> + ok = ssl:send(Socket, <>), {noreply, State}. %% @private @@ -151,7 +151,7 @@ handle_info({ssl, Socket, <>}, State = #state{s ParentPid ! {server_cast, CastRequest}, {noreply, State}; -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> {ok, RPCRequest} = message_codec:decode(RPCRequestBin), ParentPid ! {server_rpc, PacketId, RPCRequest}, {noreply, State}; diff --git a/apps/efka/src/message_codec.erl b/apps/efka/src/message/message_codec.erl similarity index 92% rename from apps/efka/src/message_codec.erl rename to apps/efka/src/message/message_codec.erl index e52442b..6e2927b 100644 --- a/apps/efka/src/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -16,10 +16,10 @@ %% API -export([encode/2, decode/1]). --spec encode0(Message :: any()) -> binary(). -encode(PacketType, Packet) when is_integer(PacketType) -> - Bin = encode0(Packet), - <>. +-spec encode(MessageType :: integer(), Message :: any()) -> binary(). +encode(MessageType, Message) when is_integer(MessageType) -> + Bin = encode0(Message), + <>. encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}) -> iolist_to_binary([ marshal(?Bytes, UUID), @@ -28,7 +28,7 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok marshal(?Bytes, Token), marshal(?I32, Timestamp) ]); -encode0(#auth_reply{code = Code, message = Message}) -> +encode0(#rpc_reply{code = Code, payload = Message}) -> iolist_to_binary([ marshal(?I32, Code), marshal(?Bytes, Message) @@ -78,8 +78,8 @@ decode(<>) -> end. decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}}; -decode0(?MESSAGE_AUTH_REPLY, [Code, Message]) -> - {ok, #auth_reply{code = Code, message = Message}}; +decode0(?MESSAGE_RPC_REPLY, [Code, Message]) -> + {ok, #rpc_reply{code = Code, payload = Message}}; decode0(?MESSAGE_PUB, [Topic, Content]) -> {ok, #pub{topic = Topic, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->