From 580714b300676d9532e5c6ba032e97d0fe653d40 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 26 Sep 2025 15:07:57 +0800 Subject: [PATCH] fix message --- apps/efka/include/message.hrl | 33 ++++++++------- apps/efka/src/efka_remote_agent.erl | 56 +++++++++++++------------ apps/efka/src/efka_transport.erl | 6 +-- apps/efka/src/message/message_codec.erl | 45 +++++++++++--------- 4 files changed, 77 insertions(+), 63 deletions(-) diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index 2f1b985..a08c533 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -21,18 +21,20 @@ %% 主机端上报数据类型标识 -define(MESSAGE_AUTH_REQUEST, 16#01). --define(MESSAGE_PUB, 16#02). +-define(MESSAGE_AUTH_REPLY, 16#02). + -define(MESSAGE_COMMAND, 16#03). --define(MESSAGE_RPC_DEPLOY, 16#04). --define(MESSAGE_RPC_CONTAINER, 16#05). +-define(MESSAGE_DEPLOY, 16#04). +-define(MESSAGE_PUB, 16#05). + -define(MESSAGE_DATA, 16#06). -define(MESSAGE_EVENT, 16#07). %% efka主动上报的event-stream流, 单向消息,主要是: docker-create的实时处理逻辑上报 -define(MESSAGE_EVENT_STREAM, 16#08). -%% 响应数据 --define(MESSAGE_RPC_REPLY, 16#FF). +-define(MESSAGE_JSONRPC_REQUEST, 16#F0). +-define(MESSAGE_JSONRPC_REPLY, 16#F1). %%%% 命令类型子分类, 不需要返回值 %% 授权 @@ -46,6 +48,11 @@ timestamp :: integer() }). +-record(auth_reply, { + code :: integer(), + payload :: binary() +}). + -record(pub, { topic :: binary(), content :: binary() @@ -56,20 +63,16 @@ command :: binary() }). --record(rpc_deploy, { - task_id :: integer(), - config :: binary() -}). - --record(rpc_container, { +-record(jsonrpc_request, { + id = 0 :: integer(), method :: binary(), - container_name :: binary(), params = <<>> :: binary() }). --record(rpc_reply, { - code :: integer(), - payload :: binary() +-record(jsonrpc_reply, { + id :: integer(), + result :: any() | undefined, + error :: any() | undefined }). -record(data, { diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index b79b1c5..f55a684 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -196,7 +196,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> case Reply of - {ok, #rpc_reply{code = Code, payload = Message}} -> + {ok, #auth_reply{code = Code, payload = Message}} -> case Code of 0 -> lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]), @@ -240,70 +240,70 @@ handle_event(info, flush_cache, _, State) -> %% 激活消息 %% 微服务部署 -handle_event(info, {server_rpc, PacketId, #rpc_deploy{task_id = TaskId, config = Config0}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"deploy">>, params = #{<<"task_id">> := TaskId, <<"config">> := Config0}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> case catch jiffy:decode(Config0, [return_maps]) of Config when is_map(Config) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:deploy(TaskId, Config) of ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); {error, Reason} when is_binary(Reason) -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end; _Error -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(<<"invalid config json">>)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, <<"invalid config json">>)) end, {keep_state, State}; %% 启动微服务 -handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"start">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"start_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:start_container(ContainerName) of ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); {error, Reason} when is_binary(Reason) -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end, {keep_state, State}; %% 停止微服务 -handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"stop">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"stop_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:stop_container(ContainerName) of ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); {error, Reason} when is_binary(Reason) -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end, {keep_state, State}; -handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"kill">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"kill_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:kill_container(ContainerName) of ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); {error, Reason} when is_binary(Reason) -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end, {keep_state, State}; -handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"remove">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"remove_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 case docker_manager:remove_container(ContainerName) of ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); {error, Reason} when is_binary(Reason) -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end, {keep_state, State}; %% config.json配置信息 -handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>, container_name = ContainerName, params = Config}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"config_container">>, params = #{<<"container_name">> := ContainerName, <<"config">> := Config}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> case docker_manager:config_container(ContainerName, Config) of ok -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); {error, Reason} -> - efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason)) + efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) end, {keep_state, State}; @@ -391,10 +391,14 @@ auth_request() -> timestamp = efka_util:timestamp() }). --spec reply_success(Result :: binary()) -> binary(). -reply_success(Result) when is_binary(Result) -> - message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Result}). +-spec reply_success(Id :: integer(), Result :: any()) -> binary(). +reply_success(Id, Result) when is_binary(Result) -> + message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{id = Id, result = Result}). --spec reply_error(Message :: binary()) -> binary(). -reply_error(Message) when is_binary(Message) -> - message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = Message}). \ No newline at end of file +-spec reply_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). +reply_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> + Error = #{ + <<"code">> => Code, + <<"message">> => Message + }, + message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{id = Id, error = Error}). \ No newline at end of file diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 78ea797..f3c2dfb 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -118,7 +118,7 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> - {ok, #rpc_reply{} = Reply} = message_codec:decode(ReplyBin), + {ok, #auth_reply{} = Reply} = message_codec:decode(ReplyBin), ParentPid ! {auth_reply, {ok, Reply}}, {noreply, State}; {ssl, Socket, Info} -> @@ -146,8 +146,8 @@ handle_cast({rpc_reply, PacketId, Reply}, State = #state{socket = Socket}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 服务器主动推送的数据,有packetId的是要求返回的;为0的表示不需要返回值 -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> - {ok, CastRequest} = message_codec:decode(CommandBin), +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> + {ok, CastRequest} = message_codec:decode(CastBin), ParentPid ! {server_cast, CastRequest}, {noreply, State}; diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index eb34af5..a7decc0 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -28,11 +28,17 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok marshal(?Bytes, Token), marshal(?I32, Timestamp) ]); -encode0(#rpc_reply{code = Code, payload = Message}) -> +encode0(#auth_reply{code = Code, payload = Payload}) -> iolist_to_binary([ marshal(?I32, Code), - marshal(?Bytes, Message) + marshal(?Bytes, Payload) ]); +encode0(#jsonrpc_reply{id = Id, result = Result, error = undefined}) -> + ResultBin = jiffy:encode(#{<<"id">> => Id, <<"result">> => Result}, [return_maps]), + iolist_to_binary([marshal(?Bytes, ResultBin)]); +encode0(#jsonrpc_reply{id = Id, result = undefined, error = Error}) -> + ResultBin = jiffy:encode(#{<<"id">> => Id, <<"error">> => Error}, [return_maps]), + iolist_to_binary([marshal(?Bytes, ResultBin)]); encode0(#pub{topic = Topic, content = Content}) -> iolist_to_binary([ marshal(?Bytes, Topic), @@ -43,17 +49,10 @@ encode0(#command{command_type = CommandType, command = Command}) -> marshal(?I32, CommandType), marshal(?Bytes, Command) ]); -encode0(#rpc_deploy{task_id = TaskId, config = Config}) -> - iolist_to_binary([ - marshal(?I32, TaskId), - marshal(?Bytes, Config) - ]); -encode0(#rpc_container{method = Method, container_name = ContainerName, params = Params}) -> - iolist_to_binary([ - marshal(?Bytes, Method), - marshal(?Bytes, ContainerName), - marshal(?Bytes, Params) - ]); + +encode0(#jsonrpc_request{id = Id, method = Method, params = Params}) -> + ReqBody = jiffy:decode(#{<<"id">> => Id, <<"method">> => Method, <<"params">> => Params}, [force_utf8]), + iolist_to_binary([marshal(?Bytes, ReqBody)]); encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> iolist_to_binary([ marshal(?Bytes, ServiceId), @@ -84,16 +83,24 @@ decode(<>) -> end. decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}}; -decode0(?MESSAGE_RPC_REPLY, [Code, Message]) -> - {ok, #rpc_reply{code = Code, payload = Message}}; +decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) -> + case jiffy:decode(ReplyBin, [return_maps]) of + #{<<"id">> := Id, <<"result">> := Result} -> + {ok, #jsonrpc_reply{id = Id, result = Result}}; + #{<<"id">> := Id, <<"error">> := Error} -> + {ok, #jsonrpc_reply{id = Id, error = Error}}; + _ -> + error + end; decode0(?MESSAGE_PUB, [Topic, Content]) -> {ok, #pub{topic = Topic, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> {ok, #command{command_type = CommandType, command = Command}}; -decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) -> - {ok, #rpc_deploy{task_id = TaskId, config = Config}}; -decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) -> - {ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}}; +decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> + {ok, #auth_reply{code = Code, payload = Payload}}; +decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) -> + #{<<"id">> := Id, <<"method">> := Method, <<"params">> := Params} = jiffy:decode(ReqBody, [return_maps]), + {ok, #jsonrpc_request{id = Id, method = Method, params = Params}}; decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->