This commit is contained in:
anlicheng 2025-09-16 15:42:11 +08:00
parent 737f5dce00
commit ed90316457
4 changed files with 27 additions and 28 deletions

View File

@ -34,7 +34,7 @@ pull_image(Image) when is_binary(Image) ->
{error, <<"exec command startup failed">>} {error, <<"exec command startup failed">>}
end. end.
-spec create_container(ContainerName :: binary(), ContainerDir :: string(), Config :: map()) -> ok | {error, Reason :: any()}. -spec create_container(ContainerName :: binary(), ContainerDir :: string(), Config :: map()) -> {ok, ContainerId :: binary()} | {error, Reason :: any()}.
create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerName), is_list(ContainerDir), is_map(Config) -> create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerName), is_list(ContainerDir), is_map(Config) ->
Image = maps:get(<<"image">>, Config), Image = maps:get(<<"image">>, Config),
Cmd = maps:get(<<"command">>, Config, []), Cmd = maps:get(<<"command">>, Config, []),
@ -54,9 +54,8 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa
case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of
Port when is_port(Port) -> Port when is_port(Port) ->
case gather_output(Port) of case gather_output(Port) of
{0, Output} -> {0, <<ContainerId:64/binary, $\n>>} ->
lager:debug("docker create output: ~p", [Output]), {ok, ContainerId};
ok;
{ExitCode, Error} -> {ExitCode, Error} ->
lager:debug("call me here: ~p", [Error]), lager:debug("call me here: ~p", [Error]),
{error, {ExitCode, Error}} {error, {ExitCode, Error}}

View File

@ -16,7 +16,7 @@
%% API %% API
-export([start_link/0]). -export([start_link/0]).
-export([deploy/3, start_service/1, stop_service/1]). -export([deploy/2, start_container/1, stop_container/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -25,7 +25,7 @@
-record(state, { -record(state, {
root_dir :: string(), root_dir :: string(),
%% ref之间的映射, #{TaskPid => {TaskId, ServiceId}} %% ref之间的映射, #{TaskPid => TaskId}
task_map = #{} task_map = #{}
}). }).
@ -33,17 +33,17 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
-spec deploy(TaskId :: integer(), ImageUrl :: binary(), Args :: [binary()]) -> {ok, ContainerId :: binary()} | {error, Reason :: binary()}. -spec deploy(TaskId :: integer(), Config :: map()) -> {ok, ContainerId :: binary()} | {error, Reason :: binary()}.
deploy(TaskId, ImageUrl, Args) when is_integer(TaskId), is_binary(ImageUrl), is_list(Args) -> deploy(TaskId, Config) when is_integer(TaskId), is_map(Config) ->
gen_server:call(?SERVER, {deploy, TaskId, ImageUrl, Args}). gen_server:call(?SERVER, {deploy, TaskId, Config}).
-spec start_service(ServiceId :: binary()) -> ok | {error, Reason :: term()}. -spec start_container(ServiceId :: binary()) -> ok | {error, Reason :: term()}.
start_service(ServiceId) when is_binary(ServiceId) -> start_container(ContainerId) when is_binary(ContainerId) ->
gen_server:call(?SERVER, {start_service, ServiceId}). gen_server:call(?SERVER, {start_container, ContainerId}).
-spec stop_service(ServiceId :: binary()) -> ok | {error, Reason :: term()}. -spec stop_container(ServiceId :: binary()) -> ok | {error, Reason :: term()}.
stop_service(ServiceId) when is_binary(ServiceId) -> stop_container(ContainerId) when is_binary(ContainerId) ->
gen_server:call(?SERVER, {stop_service, ServiceId}). gen_server:call(?SERVER, {stop_container, ContainerId}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link() -> -spec(start_link() ->
@ -75,13 +75,13 @@ init([]) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_call({deploy, TaskId, ImageUrl, Args}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> handle_call({deploy, TaskId, Config}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) ->
%% %%
{ok, TaskPid} = efka_inetd_task:start_link(TaskId, RootDir, ImageUrl, Args), {ok, TaskPid} = efka_inetd_task:start_link(TaskId, RootDir, Config),
efka_inetd_task:deploy(TaskPid), efka_inetd_task:deploy(TaskPid),
lager:debug("[efka_inetd] start task_id: ~p, tar_url: ~p", [TaskId, ImageUrl]), lager:debug("[efka_inetd] start deploy task_id: ~p, config: ~p", [TaskId, Config]),
%% todo %% todo
{reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, xx}, TaskMap)}}; {reply, ok, State#state{task_map = maps:put(TaskPid, TaskId, TaskMap)}};
%% : %% :
handle_call({start_container, ContainerId}, _From, State) -> handle_call({start_container, ContainerId}, _From, State) ->
@ -133,13 +133,13 @@ handle_info({'EXIT', TaskPid, Reason}, State = #state{task_map = TaskMap}) ->
case maps:take(TaskPid, TaskMap) of case maps:take(TaskPid, TaskMap) of
error -> error ->
{noreply, State}; {noreply, State};
{{TaskId, ServiceId}, NTaskMap} -> {TaskId, NTaskMap} ->
case Reason of case Reason of
normal -> normal ->
lager:debug("[efka_inetd] service_id: ~p, task_pid: ~p, exit normal", [ServiceId, TaskPid]), lager:debug("[efka_inetd] task_pid: ~p, exit normal", [TaskPid]),
efka_inetd_task_log:flush(TaskId); efka_inetd_task_log:flush(TaskId);
Error -> Error ->
lager:notice("[efka_inetd] service_id: ~p, task_pid: ~p, exit with error: ~p", [ServiceId, TaskPid, Error]), lager:notice("[efka_inetd] task_pid: ~p, exit with error: ~p", [TaskPid, Error]),
efka_inetd_task_log:stash(TaskId, <<"task aborted">>), efka_inetd_task_log:stash(TaskId, <<"task aborted">>),
efka_inetd_task_log:flush(TaskId) efka_inetd_task_log:flush(TaskId)
end, end,

View File

@ -15,6 +15,7 @@
%% API %% API
-export([start_link/3]). -export([start_link/3]).
-export([deploy/1]). -export([deploy/1]).
-export([maybe_create_env_file/2]).
-export([test/0]). -export([test/0]).
%% gen_server callbacks %% gen_server callbacks
@ -176,16 +177,15 @@ do_deploy(TaskId, RootDir, Config) when is_integer(TaskId), is_list(RootDir), is
ok -> ok ->
%% container %% container
%% envs参数 %% envs参数
maybe_create_env_file(ContainerDir, maps:get(<<"envs">>, Config, [])), %maybe_create_env_file(ContainerDir, maps:get(<<"envs">>, Config, [])),
%% , : "/etc/容器名称/" %% , : "/etc/容器名称/"
case efka_docker_command:check_container_exist(ContainerName) of case efka_docker_command:check_container_exist(ContainerName) of
true -> true ->
{error, <<"container exist">>}; {error, <<"container exist">>};
false -> false ->
case efka_docker_command:create_container(ContainerName, ContainerDir, Config) of case efka_docker_command:create_container(ContainerName, ContainerDir, Config) of
ok -> {ok, ContainerId} ->
ok; {ok, ContainerId};
{error, Reason} -> {error, Reason} ->
{error, Reason} {error, Reason}
end end

View File

@ -252,7 +252,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_DEPLOY:8, DeployBin/bin
%% %%
handle_event(info, {server_async_call, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_async_call, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:start_service(ServiceId) of Reply = case efka_inetd:start_container(ServiceId) of
ok -> ok ->
#async_call_reply{code = 1, result = <<"ok">>}; #async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
@ -265,7 +265,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_START_SERVICE:8, Servic
%% %%
handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了 %% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:stop_service(ServiceId) of Reply = case efka_inetd:stop_container(ServiceId) of
ok -> ok ->
#async_call_reply{code = 1, result = <<"ok">>}; #async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->