This commit is contained in:
anlicheng 2025-09-26 15:33:38 +08:00
parent 09ae0bc8c0
commit e334df0939
2 changed files with 25 additions and 25 deletions

View File

@ -240,71 +240,71 @@ handle_event(info, flush_cache, _, State) ->
%% %%
%% %%
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"deploy">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"deploy">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#{<<"task_id">> := TaskId, <<"config">> := Config} = Params, #{<<"task_id">> := TaskId, <<"config">> := Config} = Params,
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
case docker_manager:deploy(TaskId, Config) of case docker_manager:deploy(TaskId, Config) of
ok -> ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason))
end, end,
{keep_state, State}; {keep_state, State};
%% %%
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"start_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"start_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#{<<"container_name">> := ContainerName} = Params, #{<<"container_name">> := ContainerName} = Params,
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
case docker_manager:start_container(ContainerName) of case docker_manager:start_container(ContainerName) of
ok -> ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason))
end, end,
{keep_state, State}; {keep_state, State};
%% %%
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"stop_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"stop_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#{<<"container_name">> := ContainerName} = Params, #{<<"container_name">> := ContainerName} = Params,
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
case docker_manager:stop_container(ContainerName) of case docker_manager:stop_container(ContainerName) of
ok -> ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason))
end, end,
{keep_state, State}; {keep_state, State};
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"kill_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"kill_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#{<<"container_name">> := ContainerName} = Params, #{<<"container_name">> := ContainerName} = Params,
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
case docker_manager:kill_container(ContainerName) of case docker_manager:kill_container(ContainerName) of
ok -> ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason))
end, end,
{keep_state, State}; {keep_state, State};
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"remove_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"remove_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#{<<"container_name">> := ContainerName} = Params, #{<<"container_name">> := ContainerName} = Params,
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
case docker_manager:remove_container(ContainerName) of case docker_manager:remove_container(ContainerName) of
ok -> ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason))
end, end,
{keep_state, State}; {keep_state, State};
%% config.json配置信息 %% config.json配置信息
handle_event(info, {server_rpc, PacketId, #jsonrpc_request{id = Id, method = <<"config_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_rpc, PacketId, #jsonrpc_request{method = <<"config_container">>, params = Params}}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#{<<"container_name">> := ContainerName, <<"config">> := Config} = Params, #{<<"container_name">> := ContainerName, <<"config">> := Config} = Params,
case docker_manager:config_container(ContainerName, Config) of case docker_manager:config_container(ContainerName, Config) of
ok -> ok ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_success(Id, <<"ok">>)); efka_transport:rpc_reply(TransportPid, PacketId, reply_success(<<"ok">>));
{error, Reason} -> {error, Reason} ->
efka_transport:rpc_reply(TransportPid, PacketId, reply_error(Id, -1, Reason)) efka_transport:rpc_reply(TransportPid, PacketId, reply_error(-1, Reason))
end, end,
{keep_state, State}; {keep_state, State};
@ -392,14 +392,14 @@ auth_request() ->
timestamp = efka_util:timestamp() timestamp = efka_util:timestamp()
}). }).
-spec reply_success(Id :: integer(), Result :: any()) -> binary(). -spec reply_success(Result :: any()) -> binary().
reply_success(Id, Result) when is_binary(Result) -> reply_success(Result) when is_binary(Result) ->
message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{id = Id, result = Result}). message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{result = Result}).
-spec reply_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). -spec reply_error(Code :: integer(), Message :: binary()) -> binary().
reply_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> reply_error(Code, Message) when is_integer(Code), is_binary(Message) ->
Error = #{ Error = #{
<<"code">> => Code, <<"code">> => Code,
<<"message">> => Message <<"message">> => Message
}, },
message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{id = Id, error = Error}). message_codec:encode(?MESSAGE_JSONRPC_REPLY, #jsonrpc_reply{error = Error}).

View File

@ -87,7 +87,7 @@ decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
case jiffy:decode(ReplyBin, [return_maps]) of case jiffy:decode(ReplyBin, [return_maps]) of
#{<<"result">> := Result} -> #{<<"result">> := Result} ->
{ok, #jsonrpc_reply{result = Result}}; {ok, #jsonrpc_reply{result = Result}};
#{<<"id">> := Id, <<"error">> := Error} -> #{<<"error">> := Error} ->
{ok, #jsonrpc_reply{error = Error}}; {ok, #jsonrpc_reply{error = Error}};
_ -> _ ->
error error