fix jsonrpc

This commit is contained in:
anlicheng 2025-09-26 15:30:13 +08:00
parent 4f0a82c409
commit 5c63283e5a
4 changed files with 57 additions and 45 deletions

View File

@ -21,18 +21,20 @@
%%
-define(MESSAGE_AUTH_REQUEST, 16#01).
-define(MESSAGE_PUB, 16#02).
-define(MESSAGE_AUTH_REPLY, 16#02).
-define(MESSAGE_COMMAND, 16#03).
-define(MESSAGE_RPC_DEPLOY, 16#04).
-define(MESSAGE_RPC_CONTAINER, 16#05).
-define(MESSAGE_DEPLOY, 16#04).
-define(MESSAGE_PUB, 16#05).
-define(MESSAGE_DATA, 16#06).
-define(MESSAGE_EVENT, 16#07).
%% efka主动上报的event-stream流, : docker-create的实时处理逻辑上报
-define(MESSAGE_EVENT_STREAM, 16#08).
%%
-define(MESSAGE_RPC_REPLY, 16#FF).
-define(MESSAGE_JSONRPC_REQUEST, 16#F0).
-define(MESSAGE_JSONRPC_REPLY, 16#F1).
%%%% ,
%%
@ -46,6 +48,11 @@
timestamp :: integer()
}).
-record(auth_reply, {
code :: integer(),
payload :: binary()
}).
-record(pub, {
topic :: binary(),
content :: binary()
@ -56,20 +63,14 @@
command :: binary()
}).
-record(rpc_deploy, {
task_id :: integer(),
config :: binary()
}).
-record(rpc_container, {
-record(jsonrpc_request, {
method :: binary(),
container_name :: binary(),
params = <<>> :: binary()
params = <<>> :: any()
}).
-record(rpc_reply, {
code :: integer(),
payload :: binary()
-record(jsonrpc_reply, {
result :: any() | undefined,
error :: any() | undefined
}).
-record(data, {

View File

@ -91,22 +91,26 @@ attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
-spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) ->
EncConfigBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"config">>, container_name = ContainerName, params = ConfigJson}),
Request = #jsonrpc_request{id = 0, method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}},
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}).
-spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_binary(Config) ->
EncDeployBin = message_codec:encode(?MESSAGE_RPC_DEPLOY, #rpc_deploy{task_id = TaskId, config = Config}),
Request = #jsonrpc_request{id = 0, method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}},
EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {rpc_call, self(), EncDeployBin}).
-spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"start">>, container_name = ContainerName}),
Request = #jsonrpc_request{id = 0, method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}},
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
-spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"stop">>, container_name = ContainerName}),
Request = #jsonrpc_request{id = 0, method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}},
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
%-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
@ -117,9 +121,9 @@ stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}.
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
receive
{rpc_reply, Ref, #rpc_reply{code = 1, payload = Result}} ->
{jsonrpc_reply, Ref, #jsonrpc_reply{result = Result, error = undefined}} ->
{ok, Result};
{rpc_reply, Ref, #rpc_reply{code = 0, payload = Message}} ->
{jsonrpc_reply, Ref, #jsonrpc_reply{result = undefined, error = #{<<"message">> := Message}}} ->
{error, Message}
after Timeout ->
{error, <<"timeout">>}

View File

@ -28,11 +28,17 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok
marshal(?Bytes, Token),
marshal(?I32, Timestamp)
]);
encode0(#rpc_reply{code = Code, payload = Message}) ->
encode0(#auth_reply{code = Code, payload = Payload}) ->
iolist_to_binary([
marshal(?I32, Code),
marshal(?Bytes, Message)
marshal(?Bytes, Payload)
]);
encode0(#jsonrpc_reply{result = Result, error = undefined}) ->
ResultBin = jiffy:encode(#{<<"result">> => Result}, [force_utf8]),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#jsonrpc_reply{result = undefined, error = Error}) ->
ResultBin = jiffy:encode(#{<<"error">> => Error}, [force_utf8]),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#pub{topic = Topic, content = Content}) ->
iolist_to_binary([
marshal(?Bytes, Topic),
@ -43,17 +49,10 @@ encode0(#command{command_type = CommandType, command = Command}) ->
marshal(?I32, CommandType),
marshal(?Bytes, Command)
]);
encode0(#rpc_deploy{task_id = TaskId, config = Config}) ->
iolist_to_binary([
marshal(?I32, TaskId),
marshal(?Bytes, Config)
]);
encode0(#rpc_container{method = Method, container_name = ContainerName, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, Method),
marshal(?Bytes, ContainerName),
marshal(?Bytes, Params)
]);
encode0(#jsonrpc_request{method = Method, params = Params}) ->
ReqBody = jiffy:encode(#{<<"method">> => Method, <<"params">> => Params}, [force_utf8]),
marshal(?Bytes, ReqBody);
encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
@ -84,16 +83,24 @@ decode(<<PacketType:8, Packet/binary>>) ->
end.
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
decode0(?MESSAGE_RPC_REPLY, [Code, Message]) ->
{ok, #rpc_reply{code = Code, payload = Message}};
decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
case jiffy:decode(ReplyBin, [return_maps]) of
#{<<"result">> := Result} ->
{ok, #jsonrpc_reply{result = Result}};
#{<<"id">> := Id, <<"error">> := Error} ->
{ok, #jsonrpc_reply{error = Error}};
_ ->
error
end;
decode0(?MESSAGE_PUB, [Topic, Content]) ->
{ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
{ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) ->
{ok, #rpc_deploy{task_id = TaskId, config = Config}};
decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) ->
{ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
{ok, #auth_reply{code = Code, payload = Payload}};
decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) ->
#{<<"method">> := Method, <<"params">> := Params} = jiffy:decode(ReqBody, [return_maps]),
{ok, #jsonrpc_request{method = Method, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->

View File

@ -107,20 +107,20 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, RequestBin/binary>>},
ok ->
%% host的monitor
erlang:monitor(process, HostPid),
AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = <<"ok">>}),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, payload = <<"ok">>}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{denied, Reason} when is_binary(Reason) ->
erlang:monitor(process, HostPid),
AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Reason}),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, payload = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{error, Reason} when is_binary(Reason) ->
AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 2, payload = Reason}),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, payload = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
@ -161,7 +161,7 @@ handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {rpc_reply, Ref, RpcReply};
ReceiverPid ! {jsonrpc_reply, Ref, RpcReply};
false ->
lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [RpcReply, PacketId])
end,