fix message

This commit is contained in:
anlicheng 2025-09-26 15:07:57 +08:00
parent 43a263e109
commit 580714b300
4 changed files with 77 additions and 63 deletions

View File

@ -21,18 +21,20 @@
%%
-define(MESSAGE_AUTH_REQUEST, 16#01).
-define(MESSAGE_PUB, 16#02).
-define(MESSAGE_AUTH_REPLY, 16#02).
-define(MESSAGE_COMMAND, 16#03).
-define(MESSAGE_RPC_DEPLOY, 16#04).
-define(MESSAGE_RPC_CONTAINER, 16#05).
-define(MESSAGE_DEPLOY, 16#04).
-define(MESSAGE_PUB, 16#05).
-define(MESSAGE_DATA, 16#06).
-define(MESSAGE_EVENT, 16#07).
%% efka主动上报的event-stream流, : docker-create的实时处理逻辑上报
-define(MESSAGE_EVENT_STREAM, 16#08).
%%
-define(MESSAGE_RPC_REPLY, 16#FF).
-define(MESSAGE_JSONRPC_REQUEST, 16#F0).
-define(MESSAGE_JSONRPC_REPLY, 16#F1).
%%%% ,
%%
@ -46,6 +48,11 @@
timestamp :: integer()
}).
-record(auth_reply, {
code :: integer(),
payload :: binary()
}).
-record(pub, {
topic :: binary(),
content :: binary()
@ -56,20 +63,16 @@
command :: binary()
}).
-record(rpc_deploy, {
task_id :: integer(),
config :: binary()
}).
-record(rpc_container, {
-record(jsonrpc_request, {
id = 0 :: integer(),
method :: binary(),
container_name :: binary(),
params = <<>> :: binary()
}).
-record(rpc_reply, {
code :: integer(),
payload :: binary()
-record(jsonrpc_reply, {
id :: integer(),
result :: any() | undefined,
error :: any() | undefined
}).
-record(data, {

View File

@ -196,7 +196,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
case Reply of
{ok, #rpc_reply{code = Code, payload = Message}} ->
{ok, #auth_reply{code = Code, payload = Message}} ->
case Code of
0 ->
lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]),
@ -240,70 +240,70 @@ handle_event(info, flush_cache, _, State) ->
%%
%%
handle_event(info, {server_rpc, PacketId, #rpc_deploy{task_id = TaskId, config = Config0}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"deploy">>, params = #{<<"task_id">> := TaskId, <<"config">> := Config0}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case catch jiffy:decode(Config0, [return_maps]) of
Config when is_map(Config) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:deploy(TaskId, Config) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason))
end;
_Error ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(<<"invalid config json">>))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, <<"invalid config json">>))
end,
{keep_state, State};
%%
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"start">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"start_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:start_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason))
end,
{keep_state, State};
%%
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"stop">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"stop_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:stop_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason))
end,
{keep_state, State};
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"kill">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"kill_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:kill_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason))
end,
{keep_state, State};
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"remove">>, container_name = ContainerName}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"remove_container">>, params = #{<<"container_name">> := ContainerName}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
case docker_manager:remove_container(ContainerName) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>));
{error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason))
end,
{keep_state, State};
%% config.json配置信息
handle_event(info, {server_rpc, PacketId, #rpc_container{method = <<"config">>, container_name = ContainerName, params = Config}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"config_container">>, params = #{<<"container_name">> := ContainerName, <<"config">> := Config}}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
case docker_manager:config_container(ContainerName, Config) of
ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>));
{error, Reason} ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Reason))
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason))
end,
{keep_state, State};
@ -391,10 +391,14 @@ auth_request() ->
timestamp = efka_util:timestamp()
}).
-spec reply_success(Result :: binary()) -> binary().
reply_success(Result) when is_binary(Result) ->
message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Result}).
-spec reply_success(Id :: integer(), Result :: any()) -> binary().
reply_success(Id, Result) when is_binary(Result) ->
message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{id = Id, result = Result}).
-spec reply_error(Message :: binary()) -> binary().
reply_error(Message) when is_binary(Message) ->
message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = Message}).
-spec reply_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary().
reply_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) ->
Error = #{
<<"code">> => Code,
<<"message">> => Message
},
message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{id = Id, error = Error}).

View File

@ -118,7 +118,7 @@ handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPi
%% auth返回的结果
receive
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ReplyBin/binary>>} ->
{ok, #rpc_reply{} = Reply} = message_codec:decode(ReplyBin),
{ok, #auth_reply{} = Reply} = message_codec:decode(ReplyBin),
ParentPid ! {auth_reply, {ok, Reply}},
{noreply, State};
{ssl, Socket, Info} ->
@ -146,8 +146,8 @@ handle_cast({rpc_reply, PacketId, Reply}, State = #state{socket = Socket}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% packetId的是要求返回的0
handle_info({ssl, Socket, <<?PACKET_CAST, CommandBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
{ok, CastRequest} = message_codec:decode(CommandBin),
handle_info({ssl, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{socket = Socket, parent_pid = ParentPid}) ->
{ok, CastRequest} = message_codec:decode(CastBin),
ParentPid ! {server_cast, CastRequest},
{noreply, State};

View File

@ -28,11 +28,17 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok
marshal(?Bytes, Token),
marshal(?I32, Timestamp)
]);
encode0(#rpc_reply{code = Code, payload = Message}) ->
encode0(#auth_reply{code = Code, payload = Payload}) ->
iolist_to_binary([
marshal(?I32, Code),
marshal(?Bytes, Message)
marshal(?Bytes, Payload)
]);
encode0(#jsonrpc_reply{id = Id, result = Result, error = undefined}) ->
ResultBin = jiffy:encode(#{<<"id">> => Id, <<"result">> => Result}, [return_maps]),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#jsonrpc_reply{id = Id, result = undefined, error = Error}) ->
ResultBin = jiffy:encode(#{<<"id">> => Id, <<"error">> => Error}, [return_maps]),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#pub{topic = Topic, content = Content}) ->
iolist_to_binary([
marshal(?Bytes, Topic),
@ -43,17 +49,10 @@ encode0(#command{command_type = CommandType, command = Command}) ->
marshal(?I32, CommandType),
marshal(?Bytes, Command)
]);
encode0(#rpc_deploy{task_id = TaskId, config = Config}) ->
iolist_to_binary([
marshal(?I32, TaskId),
marshal(?Bytes, Config)
]);
encode0(#rpc_container{method = Method, container_name = ContainerName, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, Method),
marshal(?Bytes, ContainerName),
marshal(?Bytes, Params)
]);
encode0(#jsonrpc_request{id = Id, method = Method, params = Params}) ->
ReqBody = jiffy:decode(#{<<"id">> => Id, <<"method">> => Method, <<"params">> => Params}, [force_utf8]),
iolist_to_binary([marshal(?Bytes, ReqBody)]);
encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
@ -84,16 +83,24 @@ decode(<<PacketType:8, Packet/binary>>) ->
end.
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
decode0(?MESSAGE_RPC_REPLY, [Code, Message]) ->
{ok, #rpc_reply{code = Code, payload = Message}};
decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
case jiffy:decode(ReplyBin, [return_maps]) of
#{<<"id">> := Id, <<"result">> := Result} ->
{ok, #jsonrpc_reply{id = Id, result = Result}};
#{<<"id">> := Id, <<"error">> := Error} ->
{ok, #jsonrpc_reply{id = Id, error = Error}};
_ ->
error
end;
decode0(?MESSAGE_PUB, [Topic, Content]) ->
{ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
{ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) ->
{ok, #rpc_deploy{task_id = TaskId, config = Config}};
decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) ->
{ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
{ok, #auth_reply{code = Code, payload = Payload}};
decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) ->
#{<<"id">> := Id, <<"method">> := Method, <<"params">> := Params} = jiffy:decode(ReqBody, [return_maps]),
{ok, #jsonrpc_request{id = Id, method = Method, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->