This commit is contained in:
anlicheng 2025-09-28 16:05:09 +08:00
parent a2df6214cc
commit 4a81d1f6f9
3 changed files with 24 additions and 100 deletions

View File

@ -10,14 +10,9 @@
-author("anlicheng").
-dialyzer([{nowarn_function, normalize_image/1}]).
-behaviour(gen_server).
%% API
-export([start_link/3]).
-export([deploy/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([start_monitor/3]).
-export([deploy/3]).
-define(SERVER, ?MODULE).
@ -31,79 +26,10 @@
%%% API
%%%===================================================================
-spec deploy(Pid :: pid()) -> no_return().
deploy(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, deploy).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(TaskId :: integer(), ContainerDir :: string(), Config :: map()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
gen_server:start_link(?MODULE, [TaskId, ContainerDir, Config], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([TaskId, ContainerDir, Config]) ->
{ok, #state{task_id = TaskId, root_dir = ContainerDir, config = Config}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(deploy, State = #state{task_id = TaskId, root_dir = RootDir, config = Config}) ->
do_deploy(TaskId, RootDir, Config),
{stop, normal, State};
handle_cast(_Request, State) ->
{stop, normal, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
-spec(start_monitor(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> {ok, {Pid :: pid(), MRef :: reference()}}).
start_monitor(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
{ok, spawn_monitor(?MODULE, deploy, [TaskId, ContainerDir, Config])}.
%%%===================================================================
%%% Internal functions
@ -119,16 +45,15 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
% "command": ["nginx", "-g", "daemon off;"],
% "restart": "always"
%}
-spec do_deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return().
do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
-spec deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return().
deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
%%
ContainerName = maps:get(<<"container_name">>, Config),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>),
case docker_commands:check_container_exist(ContainerName) of
true ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>),
efka_remote_agent:task_event_stream_close(TaskId);
efka_remote_agent:close_task_event_stream(TaskId);
false ->
Image0 = maps:get(<<"image">>, Config),
Image = normalize_image(Image0),
@ -167,15 +92,15 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain
ShortContainerId = binary:part(ContainerId, 1, 12),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"任务完成"/utf8>>),
efka_remote_agent:task_event_stream_close(TaskId);
efka_remote_agent:close_task_event_stream(TaskId);
{error, Reason} ->
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败"/utf8>>),
efka_remote_agent:task_event_stream_close(TaskId)
efka_remote_agent:close_task_event_stream(TaskId)
end;
{error, Reason} ->
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>),
efka_remote_agent:task_event_stream_close(TaskId)
efka_remote_agent:close_task_event_stream(TaskId)
end
end.

View File

@ -94,8 +94,7 @@ init([]) ->
handle_call({deploy, TaskId, Config = #{<<"container_name">> := ContainerName}}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) ->
%%
{ok, ContainerDir} = docker_container_helper:ensure_dir(RootDir, ContainerName),
{ok, TaskPid} = docker_deployer:start_link(TaskId, ContainerDir, Config),
docker_deployer:deploy(TaskPid),
{ok, {TaskPid, _Ref}} = docker_deployer:start_monitor(TaskId, ContainerDir, Config),
lager:debug("[docker_manager] start deploy task_id: ~p, config: ~p", [TaskId, Config]),
{reply, ok, State#state{task_map = maps:put(TaskPid, TaskId, TaskMap)}};
@ -181,20 +180,20 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'EXIT', TaskPid, Reason}, State = #state{task_map = TaskMap}) ->
handle_info({'DOWN', _Ref, process, TaskPid, Reason}, State = #state{task_map = TaskMap}) ->
case maps:take(TaskPid, TaskMap) of
error ->
{noreply, State};
{TaskId, NTaskMap} ->
case Reason of
normal ->
lager:debug("[docker_manager] task_pid: ~p, exit normal", [TaskPid]),
%efka_inetd_task_log:flush(TaskId);
lager:debug("[docker_manager] task_id: ~p, exit normal", [TaskId]),
ok;
Error ->
lager:notice("[docker_manager] task_pid: ~p, exit with error: ~p", [TaskPid, Error]),
%efka_inetd_task_log:stash(TaskId, <<"task aborted">>),
%efka_inetd_task_log:flush(TaskId)
Error0 ->
Error = iolist_to_binary(io_lib:format("~p", [Error0])),
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败: "/utf8, Error/binary>>),
efka_remote_agent:close_task_event_stream(TaskId),
lager:notice("[docker_manager] task_id: ~p, exit with error: ~p", [TaskId, Error]),
ok
end,
{noreply, State#state{task_map = NTaskMap}}

View File

@ -15,7 +15,7 @@
%% API
-export([start_link/0]).
-export([metric_data/4, event/3, ping/13, task_event_stream/3, task_event_stream_close/1]).
-export([metric_data/4, event/3, ping/13, task_event_stream/3, close_task_event_stream/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
@ -53,9 +53,9 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT
task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) ->
gen_statem:cast(?SERVER, {task_event_stream, TaskId, Type, Stream}).
-spec task_event_stream_close(TaskId :: integer()) -> no_return().
task_event_stream_close(TaskId) when is_integer(TaskId) ->
gen_statem:cast(?SERVER, {task_event_stream_close, TaskId}).
-spec close_task_event_stream(TaskId :: integer()) -> no_return().
close_task_event_stream(TaskId) when is_integer(TaskId) ->
gen_statem:cast(?SERVER, {close_task_event_stream, TaskId}).
ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) ->
gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}).
@ -138,7 +138,7 @@ handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED,
efka_transport:send(TransportPid, EventPacket),
{keep_state, State};
handle_event(cast, {task_event_stream_close, TaskId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(cast, {close_task_event_stream, TaskId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{
task_id = TaskId,
type = <<"close">>,