add logger

This commit is contained in:
anlicheng 2025-09-29 16:58:46 +08:00
parent 6a75597134
commit efb128ee28
4 changed files with 241 additions and 160 deletions

View File

@ -14,14 +14,6 @@
-export([start_monitor/3]). -export([start_monitor/3]).
-export([deploy/3]). -export([deploy/3]).
-define(SERVER, ?MODULE).
-record(state, {
root_dir :: string(),
task_id :: integer(),
config = #{}
}).
%%%=================================================================== %%%===================================================================
%%% API %%% API
%%%=================================================================== %%%===================================================================
@ -47,36 +39,56 @@ start_monitor(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(Con
%} %}
-spec deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return(). -spec deploy(TaskId :: integer(), ContainerDir :: string(), Config :: map()) -> no_return().
deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) -> deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerDir), is_map(Config) ->
Q = queue:new(),
%% %%
ContainerName = maps:get(<<"container_name">>, Config), ContainerName = maps:get(<<"container_name">>, Config),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始部署容器:"/utf8, ContainerName/binary>>),
Msg = <<"开始部署容器:"/utf8, ContainerName/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg),
efka_logger:write(format_log(TaskId, <<"info">>, Msg)),
case docker_commands:check_container_exist(ContainerName) of case docker_commands:check_container_exist(ContainerName) of
true -> true ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"本地容器已经存在:"/utf8, ContainerName/binary>>), Msg1 = <<"本地容器已经存在:"/utf8, ContainerName/binary>>,
efka_remote_agent:close_task_event_stream(TaskId); efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg1),
efka_remote_agent:close_task_event_stream(TaskId),
efka_logger:write(format_log(TaskId, <<"info">>, Msg1));
false -> false ->
Image0 = maps:get(<<"image">>, Config), Image0 = maps:get(<<"image">>, Config),
Image = normalize_image(Image0), Image = normalize_image(Image0),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"使用镜像:"/utf8, Image/binary>>),
Msg2 = <<"使用镜像:"/utf8, Image/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg2),
efka_logger:write(format_log(TaskId, <<"info">>, Msg2)),
PullResult = case docker_commands:check_image_exist(Image) of PullResult = case docker_commands:check_image_exist(Image) of
true -> true ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"镜像本地已存在:"/utf8, Image/binary>>), Msg3 = <<"镜像本地已存在:"/utf8, Image/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg3),
efka_logger:write(format_log(TaskId, <<"info">>, Msg3)),
ok; ok;
false -> false ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始拉取镜像:"/utf8, Image/binary>>), Msg4 = <<"开始拉取镜像:"/utf8, Image/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg4),
efka_logger:write(format_log(TaskId, <<"info">>, Msg4)),
CB = fun CB = fun
({message, Msg}) -> ({message, Msg}) ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg); efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg),
efka_logger:write(format_log(TaskId, <<"info">>, Msg));
({error, Error}) -> ({error, Error}) ->
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error) efka_remote_agent:task_event_stream(TaskId, <<"error">>, Error),
efka_logger:write(format_log(TaskId, <<"error">>, Error))
end, end,
docker_commands:pull_image(Image, CB) docker_commands:pull_image(Image, CB)
end, end,
case PullResult of case PullResult of
ok -> ok ->
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"开始创建容器: "/utf8, ContainerName/binary>>), Msg5 = <<"开始创建容器: "/utf8, ContainerName/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg5),
efka_logger:write(format_log(TaskId, <<"info">>, Msg5)),
case docker_commands:create_container(ContainerName, ContainerDir, Config) of case docker_commands:create_container(ContainerName, ContainerDir, Config) of
{ok, ContainerId} -> {ok, ContainerId} ->
%% %%
@ -87,19 +99,37 @@ deploy(TaskId, ContainerDir, Config) when is_integer(TaskId), is_list(ContainerD
file:close(FD); file:close(FD);
{error, Reason} -> {error, Reason} ->
Reason1 = list_to_binary(io_lib:format("~p", [Reason])), Reason1 = list_to_binary(io_lib:format("~p", [Reason])),
efka_remote_agent:task_event_stream(TaskId, <<"notice">>, <<"创建配置文件失败: "/utf8, Reason1/binary>>) Msg6 = <<"创建配置文件失败: "/utf8, Reason1/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"notice">>, Msg6),
efka_logger:write(format_log(TaskId, <<"notice">>, Msg6))
end, end,
ShortContainerId = binary:part(ContainerId, 1, 12), ShortContainerId = binary:part(ContainerId, 1, 12),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"容器创建成功: "/utf8, ShortContainerId/binary>>),
efka_remote_agent:task_event_stream(TaskId, <<"info">>, <<"任务完成"/utf8>>), Msg7 = <<"容器创建成功: "/utf8, ShortContainerId/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg7),
efka_logger:write(format_log(TaskId, <<"info">>, Msg7)),
Msg8 = <<"任务完成"/utf8>>,
efka_remote_agent:task_event_stream(TaskId, <<"info">>, Msg8),
efka_logger:write(format_log(TaskId, <<"info">>, Msg8)),
efka_remote_agent:close_task_event_stream(TaskId); efka_remote_agent:close_task_event_stream(TaskId);
{error, Reason} -> {error, Reason} ->
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"容器创建失败: "/utf8, Reason/binary>>), Msg9 = <<"容器创建失败: "/utf8, Reason/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败"/utf8>>), efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg9),
efka_logger:write(format_log(TaskId, <<"error">>, Msg9)),
Msg10 = <<"任务失败"/utf8>>,
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg10),
efka_logger:write(format_log(TaskId, <<"error">>, Msg10)),
efka_remote_agent:close_task_event_stream(TaskId) efka_remote_agent:close_task_event_stream(TaskId)
end; end;
{error, Reason} -> {error, Reason} ->
efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"镜像拉取失败: "/utf8, Reason/binary>>), Msg11 = <<"镜像拉取失败: "/utf8, Reason/binary>>,
efka_remote_agent:task_event_stream(TaskId, <<"error">>, Msg11),
efka_logger:write(format_log(TaskId, <<"error">>, Msg11)),
efka_remote_agent:close_task_event_stream(TaskId) efka_remote_agent:close_task_event_stream(TaskId)
end end
end. end.
@ -112,4 +142,8 @@ normalize_image(Image) when is_binary(Image) ->
[_Name] -> <<Last/binary, ":latest">>; [_Name] -> <<Last/binary, ":latest">>;
[_Name, _Tag] -> Last [_Name, _Tag] -> Last
end, end,
iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])). iolist_to_binary(lists:join(<<"/">>, PrefixParts ++ [NormalizedLast])).
-spec format_log(Label :: binary(), TaskId :: integer(), Msg :: binary()) -> binary().
format_log(TaskId, Label, Msg) when is_binary(Label), is_binary(Msg) ->
iolist_to_binary([<<"task_id=">>, integer_to_binary(TaskId), <<" ">>, Label, <<" ">>, Msg]).

View File

@ -1,136 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% TODO
%%% @end
%%% Created : 09. 5 2025 16:45
%%%-------------------------------------------------------------------
-module(docker_task_log).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([stash/2, flush/1, get_logs/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
%% #{task_id => queue:new()}
pending_map = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec stash(TaskId :: integer(), Items :: binary() | [binary()]) -> no_return().
stash(TaskId, Log) when is_integer(TaskId), is_binary(Log) ->
stash(TaskId, [Log]);
stash(TaskId, Items) when is_integer(TaskId), is_list(Items) ->
{{Y, M, D}, {H, I, S}} = calendar:local_time(),
TimePrefix = iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])),
Log = iolist_to_binary([TimePrefix, <<" ">>, lists:join(<<" ">>, Items)]),
gen_server:cast(?SERVER, {stash, TaskId, Log}).
-spec flush(TaskId :: integer()) -> no_return().
flush(TaskId) when is_integer(TaskId) ->
gen_server:cast(?SERVER, {flush, TaskId}).
-spec get_logs(TaskId :: integer()) -> {ok, Logs :: list()}.
get_logs(TaskId) when is_integer(TaskId) ->
gen_server:call(?SERVER, {get_logs, TaskId}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({get_logs, TaskId}, _From, State = #state{pending_map = PendingMap}) ->
case maps:find(TaskId, PendingMap) of
error ->
Logs = task_log_model:get_logs(TaskId),
{reply, {ok, Logs}, State};
{ok, Q} ->
Logs = queue:to_list(Q),
{reply, {ok, Logs}, State}
end.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({stash, TaskId, Log}, State = #state{pending_map = PendingMap}) ->
Q = maps:get(TaskId, PendingMap, queue:new()),
NQ = queue:in(Log, Q),
{noreply, State#state{pending_map = maps:put(TaskId, NQ, PendingMap)}};
handle_cast({flush, TaskId}, State = #state{pending_map = PendingMap}) ->
case maps:take(TaskId, PendingMap) of
error ->
{noreply, State};
{Q, NPendingMap} ->
Logs = queue:to_list(Q),
ok = task_log_model:insert(TaskId, Logs),
{noreply, State#state{pending_map = NPendingMap}}
end.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -0,0 +1,174 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 9 2023 17:07
%%%-------------------------------------------------------------------
-module(efka_logger).
-author("aresei").
-behaviour(gen_server).
%% API
-export([start_link/1, write/1, write_lines/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
file_name :: string(),
date :: calendar:date(),
file
}).
%%%===================================================================
%%% API
%%%===================================================================
write(Data) ->
gen_server:cast(?SERVER, {write, Data}).
write_lines(Lines) when is_list(Lines) ->
gen_server:cast(?SERVER, {write_lines, Lines}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(FileName :: string()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(FileName) when is_list(FileName) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [FileName], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([FileName]) ->
ensure_dir(),
FilePath = make_file(FileName),
{ok, File} = file:open(FilePath, [append, binary]),
{ok, #state{file = File, file_name = FileName, date = get_date()}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({write, Data}, State = #state{file = OldFile, file_name = FileName, date = Date}) ->
Line = <<(time_prefix())/binary, " ", (format(Data))/binary, $\n>>,
case maybe_new_file(Date) of
true ->
file:close(OldFile),
FilePath = make_file(FileName),
{ok, File} = file:open(FilePath, [append, binary]),
ok = file:write(File, Line),
{noreply, State#state{file = File, date = get_date()}};
false ->
ok = file:write(OldFile, Line),
{noreply, State}
end;
handle_cast({write_lines, Lines}, State = #state{file = OldFile, file_name = FileName, date = Date}) ->
Data = iolist_to_binary(lists:join(<<$\n>>, Lines)),
case maybe_new_file(Date) of
true ->
file:close(OldFile),
FilePath = make_file(FileName),
{ok, File} = file:open(FilePath, [append, binary]),
ok = file:write(File, Data),
{noreply, State#state{file = File, date = get_date()}};
false ->
ok = file:write(OldFile, Data),
{noreply, State}
end.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
format(Data) when is_binary(Data) ->
iolist_to_binary(Data);
format(Items) when is_list(Items) ->
iolist_to_binary(lists:join(<<"\t">>, Items)).
time_prefix() ->
{{Y, M, D}, {H, I, S}} = calendar:local_time(),
iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])).
-spec make_file(LogFile :: string()) -> string().
make_file(LogFile) when is_list(LogFile) ->
{Year, Month, Day} = erlang:date(),
Suffix = io_lib:format("~b~2..0b~2..0b", [Year, Month, Day]),
RootDir = code:root_dir() ++ "/log/",
lists:flatten(RootDir ++ LogFile ++ "." ++ Suffix).
ensure_dir() ->
RootDir = code:root_dir() ++ "/log/",
case filelib:is_dir(RootDir) of
true ->
ok;
false ->
file:make_dir(RootDir)
end.
%%
-spec get_date() -> Date :: calendar:date().
get_date() ->
{Date, _} = calendar:local_time(),
Date.
%%
-spec maybe_new_file(Date :: calendar:date()) -> boolean().
maybe_new_file({Y, M, D}) ->
{{Y0, M0, D0}, _} = calendar:local_time(),
not (Y =:= Y0 andalso M =:= M0 andalso D =:= D0).

View File

@ -28,6 +28,15 @@ start_link() ->
init([]) -> init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
ChildSpecs = [ ChildSpecs = [
#{
id => 'efka_logger',
start => {'efka_logger', start_link, ["deploy_log"]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['efka_logger']
},
#{ #{
id => 'efka_service_sup', id => 'efka_service_sup',
start => {'efka_service_sup', start_link, []}, start => {'efka_service_sup', start_link, []},