diff --git a/apps/efka/src/docker/docker_deployer.erl b/apps/efka/src/docker/docker_deployer.erl index 840a8c4..d6a3214 100644 --- a/apps/efka/src/docker/docker_deployer.erl +++ b/apps/efka/src/docker/docker_deployer.erl @@ -14,14 +14,6 @@ -export([start_monitor/3]). -export([deploy/3]). --define(SERVER, ?MODULE). - --record(state, { - root_dir :: string(), - task_id :: integer(), - config = #{} -}). - %%%=================================================================== %%% API %%%=================================================================== @@ -47,36 +39,56 @@ start_monitor(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Con %} -spec deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return(). deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> + Q = queue:new(), + %% 尝试拉取镜像 ContainerName = maps:get(<<"container_name">>, Config), - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>), + + Msg = <<"开始部署容器:"/utf8, ContainerName/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg), + efka_logger:write(format_log(TaskId, <<"info">>, Msg)), + case docker_commands:check_container_exist(ContainerName) of true -> - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>), - efka_remote_agent:close_task_event_stream(TaskId); + Msg1 = <<"本地容器已经存在:"/utf8, ContainerName/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg1), + efka_remote_agent:close_task_event_stream(TaskId), + efka_logger:write(format_log(TaskId, <<"info">>, Msg1)); false -> Image0 = maps:get(<<"image">>, Config), Image = normalize_image(Image0), - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"使用镜像:"/utf8, Image/binary>>), + + Msg2 = <<"使用镜像:"/utf8, Image/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg2), + efka_logger:write(format_log(TaskId, <<"info">>, Msg2)), PullResult = case docker_commands:check_image_exist(Image) of true -> - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"镜像本地已存在:"/utf8, Image/binary>>), + Msg3 = <<"镜像本地已存在:"/utf8, Image/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg3), + efka_logger:write(format_log(TaskId, <<"info">>, Msg3)), ok; false -> - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始拉取镜像:"/utf8, Image/binary>>), + Msg4 = <<"开始拉取镜像:"/utf8, Image/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg4), + efka_logger:write(format_log(TaskId, <<"info">>, Msg4)), CB = fun ({message, Msg}) -> - efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg); + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg), + efka_logger:write(format_log(TaskId, <<"info">>, Msg)); ({error, Error}) -> - efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error) + efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error), + efka_logger:write(format_log(TaskId, <<"error">>, Error)) end, docker_commands:pull_image(Image, CB) end, case PullResult of ok -> - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始创建容器: "/utf8, ContainerName/binary>>), + Msg5 = <<"开始创建容器: "/utf8, ContainerName/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg5), + efka_logger:write(format_log(TaskId, <<"info">>, Msg5)), + case docker_commands:create_container(ContainerName, ContainerDir, Config) of {ok, ContainerId} -> %% 创建容器对应的配置文件 @@ -87,19 +99,37 @@ deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerD file:close(FD); {error, Reason} -> Reason1 = list_to_binary(io_lib:format("~p", [Reason])), - efka_remote_agent:task_event_stream(TaskId, <<"notice">>, <<"创建配置文件失败: "/utf8, Reason1/binary>>) + Msg6 = <<"创建配置文件失败: "/utf8, Reason1/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"notice">>, Msg6), + efka_logger:write(format_log(TaskId, <<"notice">>, Msg6)) end, ShortContainerId = binary:part(ContainerId, 1, 12), - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"任务完成"/utf8>>), + + Msg7 = <<"容器创建成功: "/utf8, ShortContainerId/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg7), + efka_logger:write(format_log(TaskId, <<"info">>, Msg7)), + + Msg8 = <<"任务完成"/utf8>>, + efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg8), + efka_logger:write(format_log(TaskId, <<"info">>, Msg8)), + efka_remote_agent:close_task_event_stream(TaskId); {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>), - efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败"/utf8>>), + Msg9 = <<"容器创建失败: "/utf8, Reason/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg9), + efka_logger:write(format_log(TaskId, <<"error">>, Msg9)), + + Msg10 = <<"任务失败"/utf8>>, + efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg10), + efka_logger:write(format_log(TaskId, <<"error">>, Msg10)), + efka_remote_agent:close_task_event_stream(TaskId) end; {error, Reason} -> - efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>), + Msg11 = <<"镜像拉取失败: "/utf8, Reason/binary>>, + efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg11), + efka_logger:write(format_log(TaskId, <<"error">>, Msg11)), + efka_remote_agent:close_task_event_stream(TaskId) end end. @@ -112,4 +142,8 @@ normalize_image(Image) when is_binary(Image) -> [_Name] -> <>; [_Name, _Tag] -> Last end, - iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])). \ No newline at end of file + iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])). + +-spec format_log(Label :: binary(), TaskId :: integer(), Msg :: binary()) -> binary(). +format_log(TaskId, Label, Msg) when is_binary(Label), is_binary(Msg) -> + iolist_to_binary([<<"task_id=">>, integer_to_binary(TaskId), <<" ">>, Label, <<" ">>, Msg]). \ No newline at end of file diff --git a/apps/efka/src/docker/docker_task_log.erl b/apps/efka/src/docker/docker_task_log.erl deleted file mode 100644 index ff16cee..0000000 --- a/apps/efka/src/docker/docker_task_log.erl +++ /dev/null @@ -1,136 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% TODO 需要完善日志的逻辑 -%%% @end -%%% Created : 09. 5月 2025 16:45 -%%%------------------------------------------------------------------- --module(docker_task_log). --author("anlicheng"). - --behaviour(gen_server). - -%% API --export([start_link/0]). --export([stash/2, flush/1, get_logs/1]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - --record(state, { - %% #{task_id => queue:new()} - pending_map = #{} -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec stash(TaskId :: integer(), Items :: binary() | [binary()]) -> no_return(). -stash(TaskId, Log) when is_integer(TaskId), is_binary(Log) -> - stash(TaskId, [Log]); -stash(TaskId, Items) when is_integer(TaskId), is_list(Items) -> - {{Y, M, D}, {H, I, S}} = calendar:local_time(), - TimePrefix = iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])), - Log = iolist_to_binary([TimePrefix, <<" ">>, lists:join(<<" ">>, Items)]), - gen_server:cast(?SERVER, {stash, TaskId, Log}). - --spec flush(TaskId :: integer()) -> no_return(). -flush(TaskId) when is_integer(TaskId) -> - gen_server:cast(?SERVER, {flush, TaskId}). - --spec get_logs(TaskId :: integer()) -> {ok, Logs :: list()}. -get_logs(TaskId) when is_integer(TaskId) -> - gen_server:call(?SERVER, {get_logs, TaskId}). - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call({get_logs, TaskId}, _From, State = #state{pending_map = PendingMap}) -> - case maps:find(TaskId, PendingMap) of - error -> - Logs = task_log_model:get_logs(TaskId), - {reply, {ok, Logs}, State}; - {ok, Q} -> - Logs = queue:to_list(Q), - {reply, {ok, Logs}, State} - end. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({stash, TaskId, Log}, State = #state{pending_map = PendingMap}) -> - Q = maps:get(TaskId, PendingMap, queue:new()), - NQ = queue:in(Log, Q), - {noreply, State#state{pending_map = maps:put(TaskId, NQ, PendingMap)}}; -handle_cast({flush, TaskId}, State = #state{pending_map = PendingMap}) -> - case maps:take(TaskId, PendingMap) of - error -> - {noreply, State}; - {Q, NPendingMap} -> - Logs = queue:to_list(Q), - ok = task_log_model:insert(TaskId, Logs), - {noreply, State#state{pending_map = NPendingMap}} - end. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== diff --git a/apps/efka/src/efka_logger.erl b/apps/efka/src/efka_logger.erl new file mode 100644 index 0000000..b6a4291 --- /dev/null +++ b/apps/efka/src/efka_logger.erl @@ -0,0 +1,174 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 07. 9月 2023 17:07 +%%%------------------------------------------------------------------- +-module(efka_logger). +-author("aresei"). + +-behaviour(gen_server). + +%% API +-export([start_link/1, write/1, write_lines/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + file_name :: string(), + date :: calendar:date(), + file +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +write(Data) -> + gen_server:cast(?SERVER, {write, Data}). + +write_lines(Lines) when is_list(Lines) -> + gen_server:cast(?SERVER, {write_lines, Lines}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(FileName :: string()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(FileName) when is_list(FileName) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [FileName], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([FileName]) -> + ensure_dir(), + FilePath = make_file(FileName), + {ok, File} = file:open(FilePath, [append, binary]), + + {ok, #state{file = File, file_name = FileName, date = get_date()}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({write, Data}, State = #state{file = OldFile, file_name = FileName, date = Date}) -> + Line = <<(time_prefix())/binary, " ", (format(Data))/binary, $\n>>, + case maybe_new_file(Date) of + true -> + file:close(OldFile), + + FilePath = make_file(FileName), + {ok, File} = file:open(FilePath, [append, binary]), + ok = file:write(File, Line), + {noreply, State#state{file = File, date = get_date()}}; + false -> + ok = file:write(OldFile, Line), + {noreply, State} + end; +handle_cast({write_lines, Lines}, State = #state{file = OldFile, file_name = FileName, date = Date}) -> + Data = iolist_to_binary(lists:join(<<$\n>>, Lines)), + case maybe_new_file(Date) of + true -> + file:close(OldFile), + + FilePath = make_file(FileName), + {ok, File} = file:open(FilePath, [append, binary]), + ok = file:write(File, Data), + {noreply, State#state{file = File, date = get_date()}}; + false -> + ok = file:write(OldFile, Data), + {noreply, State} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +format(Data) when is_binary(Data) -> + iolist_to_binary(Data); +format(Items) when is_list(Items) -> + iolist_to_binary(lists:join(<<"\t">>, Items)). + +time_prefix() -> + {{Y, M, D}, {H, I, S}} = calendar:local_time(), + iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])). + +-spec make_file(LogFile :: string()) -> string(). +make_file(LogFile) when is_list(LogFile) -> + {Year, Month, Day} = erlang:date(), + Suffix = io_lib:format("~b~2..0b~2..0b", [Year, Month, Day]), + RootDir = code:root_dir() ++ "/log/", + lists:flatten(RootDir ++ LogFile ++ "." ++ Suffix). + +ensure_dir() -> + RootDir = code:root_dir() ++ "/log/", + case filelib:is_dir(RootDir) of + true -> + ok; + false -> + file:make_dir(RootDir) + end. + +%% 获取日期信息 +-spec get_date() -> Date :: calendar:date(). +get_date() -> + {Date, _} = calendar:local_time(), + Date. + +%% 通过日志判断是否需要生成新的日志文件 +-spec maybe_new_file(Date :: calendar:date()) -> boolean(). +maybe_new_file({Y, M, D}) -> + {{Y0, M0, D0}, _} = calendar:local_time(), + not (Y =:= Y0 andalso M =:= M0 andalso D =:= D0). \ No newline at end of file diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index e7b36eb..37b1e52 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [ + #{ + id => 'efka_logger', + start => {'efka_logger', start_link, ["deploy_log"]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['efka_logger'] + }, + #{ id => 'efka_service_sup', start => {'efka_service_sup', start_link, []},