From 4a81d1f6f9c58afd23ed4b8289f11a33942e3c99 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 28 Sep 2025 16:05:09 +0800 Subject: [PATCH] fix --- apps/efka/src/docker/docker_deployer.erl | 97 +++--------------------- apps/efka/src/docker/docker_manager.erl | 17 ++--- apps/efka/src/efka_remote_agent.erl | 10 +-- 3 files changed, 24 insertions(+), 100 deletions(-) diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index 10ce618..7f07c7f 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -10,14 +10,9 @@ -author("anlicheng"). -dialyzer([{nowarn_function, normalize_image/1}]). --behaviour(gen_server). - %% API --export([start_link/3]). --export([deploy/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +-export([start_monitor/3]). +-export([deploy/3]). -define(SERVER, ?MODULE). @@ -31,79 +26,10 @@ %%% API %%%=================================================================== --spec deploy(Pid :: pid()) -> no_return(). -deploy(Pid) when is_pid(Pid) -> - gen_server:cast(Pid, deploy). - %% @doc Spawns the server and registers the local name (unique) --spec(start_link(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> - gen_server:start_link(?MODULE, [TaskId, ContainerDir, Config], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([TaskId, ContainerDir, Config]) -> - {ok, #state{task_id = TaskId, root_dir = ContainerDir, config = Config}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(deploy, State = #state{task_id = TaskId, root_dir = RootDir, config = Config}) -> - do_deploy(TaskId, RootDir, Config), - {stop, normal, State}; -handle_cast(_Request, State) -> - {stop, normal, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. +-spec(start_monitor(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> {ok, {Pid :: pid(), MRef :: reference()}}). +start_monitor(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> + {ok, spawn_monitor(?MODULE, deploy, [TaskId, ContainerDir, Config])}. %%%=================================================================== %%% Internal functions @@ -119,16 +45,15 @@ code_change(_OldVsn, State = #state{}, _Extra) -> % "command": ["nginx", "-g", "daemon off;"], % "restart": "always" %} --spec do_deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return(). -do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> +-spec deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return(). +deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> %% 尝试拉取镜像 ContainerName = maps:get(<<"container_name">>, Config), efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>), - case docker_commands:check_container_exist(ContainerName) of true -> efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>), - efka_remote_agent:task_event_stream_close(TaskId); + efka_remote_agent:close_task_event_stream(TaskId); false -> Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), @@ -167,15 +92,15 @@ do_deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Contain ShortContainerId = binary:part(ContainerId, 1, 12), efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"任务完成"/utf8>>), - efka_remote_agent:task_event_stream_close(TaskId); + efka_remote_agent:close_task_event_stream(TaskId); {error, Reason} -> efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>), efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败"/utf8>>), - efka_remote_agent:task_event_stream_close(TaskId) + efka_remote_agent:close_task_event_stream(TaskId) end; {error, Reason} -> efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>), - efka_remote_agent:task_event_stream_close(TaskId) + efka_remote_agent:close_task_event_stream(TaskId) end end. diff --git a/apps/efka/src/docker/docker_manager.erl b/apps/efka/src/docker/docker_manager.erl index 8120430..17b0352 100644 --- a/apps/efka/src/docker/docker_manager.erl +++ b/apps/efka/src/docker/docker_manager.erl @@ -94,8 +94,7 @@ init([]) -> handle_call({deploy, TaskId, Config = #{<<"container_name">> := ContainerName}}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> %% 创建目录 {ok, ContainerDir} = docker_container_helper:ensure_dir(RootDir, ContainerName), - {ok, TaskPid} = docker_deployer:start_link(TaskId, ContainerDir, Config), - docker_deployer:deploy(TaskPid), + {ok, {TaskPid, _Ref}} = docker_deployer:start_monitor(TaskId, ContainerDir, Config), lager:debug("[docker_manager] start deploy task_id: ~p, config: ~p", [TaskId, Config]), {reply, ok, State#state{task_map = maps:put(TaskPid, TaskId, TaskMap)}}; @@ -181,20 +180,20 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({'EXIT', TaskPid, Reason}, State = #state{task_map = TaskMap}) -> +handle_info({'DOWN', _Ref, process, TaskPid, Reason}, State = #state{task_map = TaskMap}) -> case maps:take(TaskPid, TaskMap) of error -> {noreply, State}; {TaskId, NTaskMap} -> case Reason of normal -> - lager:debug("[docker_manager] task_pid: ~p, exit normal", [TaskPid]), - %efka_inetd_task_log:flush(TaskId); + lager:debug("[docker_manager] task_id: ~p, exit normal", [TaskId]), ok; - Error -> - lager:notice("[docker_manager] task_pid: ~p, exit with error: ~p", [TaskPid, Error]), - %efka_inetd_task_log:stash(TaskId, <<"task aborted">>), - %efka_inetd_task_log:flush(TaskId) + Error0 -> + Error = iolist_to_binary(io_lib:format("~p", [Error0])), + efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败: "/utf8, Error/binary>>), + efka_remote_agent:close_task_event_stream(TaskId), + lager:notice("[docker_manager] task_id: ~p, exit with error: ~p", [TaskId, Error]), ok end, {noreply, State#state{task_map = NTaskMap}} diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index e56e8a3..fc01238 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -15,7 +15,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13, task_event_stream/3, task_event_stream_close/1]). +-export([metric_data/4, event/3, ping/13, task_event_stream/3, close_task_event_stream/1]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -53,9 +53,9 @@ event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventT task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> gen_statem:cast(?SERVER, {task_event_stream, TaskId, Type, Stream}). --spec task_event_stream_close(TaskId :: integer()) -> no_return(). -task_event_stream_close(TaskId) when is_integer(TaskId) -> - gen_statem:cast(?SERVER, {task_event_stream_close, TaskId}). +-spec close_task_event_stream(TaskId :: integer()) -> no_return(). +close_task_event_stream(TaskId) when is_integer(TaskId) -> + gen_statem:cast(?SERVER, {close_task_event_stream, TaskId}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -138,7 +138,7 @@ handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, efka_transport:send(TransportPid, EventPacket), {keep_state, State}; -handle_event(cast, {task_event_stream_close, TaskId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {close_task_event_stream, TaskId}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{ task_id = TaskId, type = <<"close">>,