From 738f16ecba1b380a97b4fe1b01bf5726797ab910 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 9 May 2025 17:05:10 +0800 Subject: [PATCH] add task logs --- apps/efka/include/efka_tables.hrl | 6 ++ apps/efka/src/efka_app.erl | 1 + apps/efka/src/efka_inetd_task_log.erl | 131 ++++++++++++++++++++++++ apps/efka/src/mnesia/task_log_model.erl | 46 +++++++++ 4 files changed, 184 insertions(+) create mode 100644 apps/efka/src/efka_inetd_task_log.erl create mode 100644 apps/efka/src/mnesia/task_log_model.erl diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index 2db5a07..7579eeb 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -27,6 +27,12 @@ data :: binary() }). +%% 数据缓存 +-record(task_log, { + task_id = 0 :: integer(), + logs = []:: list() +}). + %% id生成器 -record(id_generator, { id, diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index 55a0113..7d1f95e 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -36,4 +36,5 @@ start_mnesia() -> not lists:member(id_generator, Tables) andalso id_generator_model:create_table(), not lists:member(service, Tables) andalso service_model:create_table(), not lists:member(cache, Tables) andalso cache_model:create_table(), + not lists:member(task_log, Tables) andalso task_log_model:create_table(), ok. \ No newline at end of file diff --git a/apps/efka/src/efka_inetd_task_log.erl b/apps/efka/src/efka_inetd_task_log.erl new file mode 100644 index 0000000..ce68d23 --- /dev/null +++ b/apps/efka/src/efka_inetd_task_log.erl @@ -0,0 +1,131 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 09. 5月 2025 16:45 +%%%------------------------------------------------------------------- +-module(efka_inetd_task_log). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([stash/2, flush/1, get_logs/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + %% #{task_id => queue:new()} + pending_map = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec stash(TaskId :: integer(), Log :: binary()) -> no_return(). +stash(TaskId, Log) when is_integer(TaskId), is_binary(Log) -> + gen_server:cast(?SERVER, {stash, TaskId, Log}). + +-spec flush(TaskId :: integer()) -> no_return(). +flush(TaskId) when is_integer(TaskId) -> + gen_server:cast(?SERVER, {flush, TaskId}). + +-spec get_logs(TaskId :: integer()) -> {ok, Logs :: list()}. +get_logs(TaskId) when is_integer(TaskId) -> + gen_server:call(?SERVER, {get_logs, TaskId}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({get_logs, TaskId}, _From, State = #state{pending_map = PendingMap}) -> + case maps:find(TaskId, PendingMap) of + error -> + Logs = task_log_model:get_logs(TaskId), + {reply, {ok, Logs}, State}; + {ok, Q} -> + Logs = queue:to_list(Q), + {reply, {ok, Logs}, State} + end. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({stash, TaskId, Log}, State = #state{pending_map = PendingMap}) -> + Q = maps:get(TaskId, PendingMap, queue:new()), + NQ = queue:in(Log, Q), + {noreply, State#state{pending_map = maps:put(TaskId, NQ, PendingMap)}}; +handle_cast({flush, TaskId}, State = #state{pending_map = PendingMap}) -> + case maps:take(TaskId, PendingMap) of + error -> + {noreply, State}; + {Q, NPendingMap} -> + Logs = queue:to_list(Q), + ok = task_log_model:insert(TaskId, Logs), + {noreply, State#state{pending_map = NPendingMap}} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/efka/src/mnesia/task_log_model.erl b/apps/efka/src/mnesia/task_log_model.erl new file mode 100644 index 0000000..6871e1c --- /dev/null +++ b/apps/efka/src/mnesia/task_log_model.erl @@ -0,0 +1,46 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 04. 7月 2023 12:31 +%%%------------------------------------------------------------------- +-module(task_log_model). +-author("aresei"). +-include("efka_tables.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB, task_log). + +%% API +-export([create_table/0]). +-export([insert/2, get_logs/1]). + +create_table() -> + %% id生成器 + mnesia:create_table(task_log, [ + {attributes, record_info(fields, task_log)}, + {record_name, task_log}, + {disc_copies, [node()]}, + {type, ordered_set} + ]). + +-spec insert(TaskId :: integer(), Logs :: [binary()]) -> ok | {error, Reason :: term()}. +insert(TaskId, Logs) when is_integer(TaskId), is_list(Logs) -> + TaskLog = #task_log{task_id = TaskId, logs = Logs}, + case mnesia:transaction(fun() -> mnesia:write(?TAB, TaskLog, write) end) of + {'atomic', Res} -> + Res; + {'aborted', Reason} -> + {error, Reason} + end. + +-spec get_logs(TaskId :: integer()) -> Logs :: [binary()]. +get_logs(TaskId) when is_integer(TaskId) -> + case mnesia:dirty_read(?TAB, TaskId) of + [] -> + []; + [#task_log{logs = Logs}] -> + Logs + end. \ No newline at end of file