diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 47a0157..4aeb520 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -93,36 +93,36 @@ attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> get_containers(Pid) when is_pid(Pid) -> Request = #jsonrpc_request{method = <<"get_containers">>, params = #{}}, EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), - gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}). + gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}). -spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) -> Request = #jsonrpc_request{method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}}, EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), - gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}). + gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}). -spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: map()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_map(Config) -> Request = #jsonrpc_request{method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}}, EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), - gen_statem:call(Pid, {rpc_call, self(), EncDeployBin}). + gen_statem:call(Pid, {jsonrpc_call, self(), EncDeployBin}). -spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> Request = #jsonrpc_request{method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}}, EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), - gen_statem:call(Pid, {rpc_call, self(), EncCallBin}). + gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}). -spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> Request = #jsonrpc_request{method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}}, EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request), - gen_statem:call(Pid, {rpc_call, self(), EncCallBin}). + gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}). %-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. %task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) -> % TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}), -% gen_statem:call(Pid, {rpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}). +% gen_statem:call(Pid, {jsonrpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}). -spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}. await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> @@ -234,11 +234,11 @@ handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPi {keep_state, State, [{reply, From, {ok, Reply}}]}; %% 只要channel存在,就负责将消息推送到边缘端主机 -handle_event({call, From}, {rpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> +handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> %% 通过websocket发送请求 - Ref = tcp_channel:rpc_call(ChannelPid, ReceiverPid, RpcCall), + Ref = tcp_channel:jsonrpc_call(ChannelPid, ReceiverPid, RpcCall), {keep_state, State, [{reply, From, {ok, Ref}}]}; false -> lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]), diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 84b234b..25e93e9 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -12,7 +12,7 @@ -behaviour(ranch_protocol). %% API --export([pub/3, rpc_call/3, command/3]). +-export([pub/3, jsonrpc_call/3, command/3]). -export([start_link/3, stop/2]). %% gen_server callbacks @@ -43,10 +43,10 @@ command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is gen_server:cast(Pid, {command, CommandType, Command}). %% 向通道中写入消息 --spec rpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference(). -rpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) -> +-spec jsonrpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference(). +jsonrpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) -> Ref = make_ref(), - gen_server:cast(Pid, {rpc_call, ReceiverPid, Ref, CallBin}), + gen_server:cast(Pid, {jsonrpc_call, ReceiverPid, Ref, CallBin}), Ref. %% 关闭方法 @@ -86,7 +86,7 @@ handle_cast({command, CommandType, Command}, State = #state{transport = Transpor {noreply, State}; %% 推送消息 -handle_cast({rpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> +handle_cast({jsonrpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Transport:send(Socket, <>), {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.