fix jsonrpc
This commit is contained in:
parent
634b57dc13
commit
47c513ba4d
@ -93,36 +93,36 @@ attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
|
|||||||
get_containers(Pid) when is_pid(Pid) ->
|
get_containers(Pid) when is_pid(Pid) ->
|
||||||
Request = #jsonrpc_request{method = <<"get_containers">>, params = #{}},
|
Request = #jsonrpc_request{method = <<"get_containers">>, params = #{}},
|
||||||
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
||||||
gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}).
|
gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}).
|
||||||
|
|
||||||
-spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
-spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||||||
config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) ->
|
config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) ->
|
||||||
Request = #jsonrpc_request{method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}},
|
Request = #jsonrpc_request{method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}},
|
||||||
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
||||||
gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}).
|
gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}).
|
||||||
|
|
||||||
-spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: map()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
-spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: map()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||||||
deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_map(Config) ->
|
deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_map(Config) ->
|
||||||
Request = #jsonrpc_request{method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}},
|
Request = #jsonrpc_request{method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}},
|
||||||
EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
||||||
gen_statem:call(Pid, {rpc_call, self(), EncDeployBin}).
|
gen_statem:call(Pid, {jsonrpc_call, self(), EncDeployBin}).
|
||||||
|
|
||||||
-spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
-spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||||||
start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
|
start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
|
||||||
Request = #jsonrpc_request{method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}},
|
Request = #jsonrpc_request{method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}},
|
||||||
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
||||||
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
|
gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}).
|
||||||
|
|
||||||
-spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
-spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||||||
stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
|
stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
|
||||||
Request = #jsonrpc_request{method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}},
|
Request = #jsonrpc_request{method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}},
|
||||||
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
|
||||||
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
|
gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}).
|
||||||
|
|
||||||
%-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
%-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
|
||||||
%task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) ->
|
%task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) ->
|
||||||
% TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}),
|
% TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}),
|
||||||
% gen_statem:call(Pid, {rpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
|
% gen_statem:call(Pid, {jsonrpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
|
||||||
|
|
||||||
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}.
|
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}.
|
||||||
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
|
||||||
@ -234,11 +234,11 @@ handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPi
|
|||||||
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
{keep_state, State, [{reply, From, {ok, Reply}}]};
|
||||||
|
|
||||||
%% 只要channel存在,就负责将消息推送到边缘端主机
|
%% 只要channel存在,就负责将消息推送到边缘端主机
|
||||||
handle_event({call, From}, {rpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
|
||||||
case HasSession andalso is_pid(ChannelPid) of
|
case HasSession andalso is_pid(ChannelPid) of
|
||||||
true ->
|
true ->
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
Ref = tcp_channel:rpc_call(ChannelPid, ReceiverPid, RpcCall),
|
Ref = tcp_channel:jsonrpc_call(ChannelPid, ReceiverPid, RpcCall),
|
||||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
false ->
|
false ->
|
||||||
lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]),
|
lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]),
|
||||||
|
|||||||
@ -12,7 +12,7 @@
|
|||||||
-behaviour(ranch_protocol).
|
-behaviour(ranch_protocol).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([pub/3, rpc_call/3, command/3]).
|
-export([pub/3, jsonrpc_call/3, command/3]).
|
||||||
|
|
||||||
-export([start_link/3, stop/2]).
|
-export([start_link/3, stop/2]).
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
@ -43,10 +43,10 @@ command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is
|
|||||||
gen_server:cast(Pid, {command, CommandType, Command}).
|
gen_server:cast(Pid, {command, CommandType, Command}).
|
||||||
|
|
||||||
%% 向通道中写入消息
|
%% 向通道中写入消息
|
||||||
-spec rpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference().
|
-spec jsonrpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference().
|
||||||
rpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) ->
|
jsonrpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) ->
|
||||||
Ref = make_ref(),
|
Ref = make_ref(),
|
||||||
gen_server:cast(Pid, {rpc_call, ReceiverPid, Ref, CallBin}),
|
gen_server:cast(Pid, {jsonrpc_call, ReceiverPid, Ref, CallBin}),
|
||||||
Ref.
|
Ref.
|
||||||
|
|
||||||
%% 关闭方法
|
%% 关闭方法
|
||||||
@ -86,7 +86,7 @@ handle_cast({command, CommandType, Command}, State = #state{transport = Transpor
|
|||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
%% 推送消息
|
%% 推送消息
|
||||||
handle_cast({rpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
handle_cast({jsonrpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
|
||||||
Transport:send(Socket, <<?PACKET_REQUEST, PacketId:32, CallBin/binary>>),
|
Transport:send(Socket, <<?PACKET_REQUEST, PacketId:32, CallBin/binary>>),
|
||||||
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.
|
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user