From 5c63283e5a3bd2b731a406c0fa3a5d911c4f6ec6 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 26 Sep 2025 15:30:13 +0800 Subject: [PATCH] fix jsonrpc --- apps/iot/include/message.hrl | 33 ++++++++++--------- apps/iot/src/iot_host.erl | 16 +++++---- apps/iot/src/message/message_codec.erl | 45 +++++++++++++++----------- apps/iot/src/tcp/tcp_channel.erl | 8 ++--- 4 files changed, 57 insertions(+), 45 deletions(-) diff --git a/apps/iot/include/message.hrl b/apps/iot/include/message.hrl index 2f1b985..804148c 100644 --- a/apps/iot/include/message.hrl +++ b/apps/iot/include/message.hrl @@ -21,18 +21,20 @@ %% 主机端上报数据类型标识 -define(MESSAGE_AUTH_REQUEST, 16#01). --define(MESSAGE_PUB, 16#02). +-define(MESSAGE_AUTH_REPLY, 16#02). + -define(MESSAGE_COMMAND, 16#03). --define(MESSAGE_RPC_DEPLOY, 16#04). --define(MESSAGE_RPC_CONTAINER, 16#05). +-define(MESSAGE_DEPLOY, 16#04). +-define(MESSAGE_PUB, 16#05). + -define(MESSAGE_DATA, 16#06). -define(MESSAGE_EVENT, 16#07). %% efka主动上报的event-stream流, 单向消息,主要是: docker-create的实时处理逻辑上报 -define(MESSAGE_EVENT_STREAM, 16#08). -%% 响应数据 --define(MESSAGE_RPC_REPLY, 16#FF). +-define(MESSAGE_JSONRPC_REQUEST, 16#F0). +-define(MESSAGE_JSONRPC_REPLY, 16#F1). %%%% 命令类型子分类, 不需要返回值 %% 授权 @@ -46,6 +48,11 @@ timestamp :: integer() }). +-record(auth_reply, { + code :: integer(), + payload :: binary() +}). + -record(pub, { topic :: binary(), content :: binary() @@ -56,20 +63,14 @@ command :: binary() }). --record(rpc_deploy, { - task_id :: integer(), - config :: binary() -}). - --record(rpc_container, { +-record(jsonrpc_request, { method :: binary(), - container_name :: binary(), - params = <<>> :: binary() + params = <<>> :: any() }). --record(rpc_reply, { - code :: integer(), - payload :: binary() +-record(jsonrpc_reply, { + result :: any() | undefined, + error :: any() | undefined }). -record(data, { diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index b683927..cd4374c 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -91,22 +91,26 @@ attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> -spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) -> - EncConfigBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"config">>, container_name = ContainerName, params = ConfigJson}), + Request = #jsonrpc_request{id = 0, method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}}, + EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}). -spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_binary(Config) -> - EncDeployBin = message_codec:encode(?MESSAGE_RPC_DEPLOY, #rpc_deploy{task_id = TaskId, config = Config}), + Request = #jsonrpc_request{id = 0, method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}}, + EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {rpc_call, self(), EncDeployBin}). -spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> - EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"start">>, container_name = ContainerName}), + Request = #jsonrpc_request{id = 0, method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}}, + EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {rpc_call, self(), EncCallBin}). -spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> - EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"stop">>, container_name = ContainerName}), + Request = #jsonrpc_request{id = 0, method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}}, + EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), gen_statem:call(Pid, {rpc_call, self(), EncCallBin}). %-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. @@ -117,9 +121,9 @@ stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> -spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}. await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> receive - {rpc_reply, Ref, #rpc_reply{code = 1, payload = Result}} -> + {jsonrpc_reply, Ref, #jsonrpc_reply{result = Result, error = undefined}} -> {ok, Result}; - {rpc_reply, Ref, #rpc_reply{code = 0, payload = Message}} -> + {jsonrpc_reply, Ref, #jsonrpc_reply{result = undefined, error = #{<<"message">> := Message}}} -> {error, Message} after Timeout -> {error, <<"timeout">>} diff --git a/apps/iot/src/message/message_codec.erl b/apps/iot/src/message/message_codec.erl index eb34af5..e5bd740 100644 --- a/apps/iot/src/message/message_codec.erl +++ b/apps/iot/src/message/message_codec.erl @@ -28,11 +28,17 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok marshal(?Bytes, Token), marshal(?I32, Timestamp) ]); -encode0(#rpc_reply{code = Code, payload = Message}) -> +encode0(#auth_reply{code = Code, payload = Payload}) -> iolist_to_binary([ marshal(?I32, Code), - marshal(?Bytes, Message) + marshal(?Bytes, Payload) ]); +encode0(#jsonrpc_reply{result = Result, error = undefined}) -> + ResultBin = jiffy:encode(#{<<"result">> => Result}, [force_utf8]), + iolist_to_binary([marshal(?Bytes, ResultBin)]); +encode0(#jsonrpc_reply{result = undefined, error = Error}) -> + ResultBin = jiffy:encode(#{<<"error">> => Error}, [force_utf8]), + iolist_to_binary([marshal(?Bytes, ResultBin)]); encode0(#pub{topic = Topic, content = Content}) -> iolist_to_binary([ marshal(?Bytes, Topic), @@ -43,17 +49,10 @@ encode0(#command{command_type = CommandType, command = Command}) -> marshal(?I32, CommandType), marshal(?Bytes, Command) ]); -encode0(#rpc_deploy{task_id = TaskId, config = Config}) -> - iolist_to_binary([ - marshal(?I32, TaskId), - marshal(?Bytes, Config) - ]); -encode0(#rpc_container{method = Method, container_name = ContainerName, params = Params}) -> - iolist_to_binary([ - marshal(?Bytes, Method), - marshal(?Bytes, ContainerName), - marshal(?Bytes, Params) - ]); + +encode0(#jsonrpc_request{method = Method, params = Params}) -> + ReqBody = jiffy:encode(#{<<"method">> => Method, <<"params">> => Params}, [force_utf8]), + marshal(?Bytes, ReqBody); encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> iolist_to_binary([ marshal(?Bytes, ServiceId), @@ -84,16 +83,24 @@ decode(<>) -> end. decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) -> {ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}}; -decode0(?MESSAGE_RPC_REPLY, [Code, Message]) -> - {ok, #rpc_reply{code = Code, payload = Message}}; +decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) -> + case jiffy:decode(ReplyBin, [return_maps]) of + #{<<"result">> := Result} -> + {ok, #jsonrpc_reply{result = Result}}; + #{<<"id">> := Id, <<"error">> := Error} -> + {ok, #jsonrpc_reply{error = Error}}; + _ -> + error + end; decode0(?MESSAGE_PUB, [Topic, Content]) -> {ok, #pub{topic = Topic, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> {ok, #command{command_type = CommandType, command = Command}}; -decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) -> - {ok, #rpc_deploy{task_id = TaskId, config = Config}}; -decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) -> - {ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}}; +decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> + {ok, #auth_reply{code = Code, payload = Payload}}; +decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) -> + #{<<"method">> := Method, <<"params">> := Params} = jiffy:decode(ReqBody, [return_maps]), + {ok, #jsonrpc_request{method = Method, params = Params}}; decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index be26552..84b234b 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -107,20 +107,20 @@ handle_info({tcp, Socket, <>}, ok -> %% 建立到host的monitor erlang:monitor(process, HostPid), - AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = <<"ok">>}), + AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, payload = <<"ok">>}), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), - AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Reason}), + AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, payload = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> - AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 2, payload = Reason}), + AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, payload = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), @@ -161,7 +161,7 @@ handle_info({tcp, Socket, <>} {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true -> - ReceiverPid ! {rpc_reply, Ref, RpcReply}; + ReceiverPid ! {jsonrpc_reply, Ref, RpcReply}; false -> lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [RpcReply, PacketId]) end,