add task logs
This commit is contained in:
parent
ef56328f70
commit
738f16ecba
@ -27,6 +27,12 @@
|
||||
data :: binary()
|
||||
}).
|
||||
|
||||
%% 数据缓存
|
||||
-record(task_log, {
|
||||
task_id = 0 :: integer(),
|
||||
logs = []:: list()
|
||||
}).
|
||||
|
||||
%% id生成器
|
||||
-record(id_generator, {
|
||||
id,
|
||||
|
||||
@ -36,4 +36,5 @@ start_mnesia() ->
|
||||
not lists:member(id_generator, Tables) andalso id_generator_model:create_table(),
|
||||
not lists:member(service, Tables) andalso service_model:create_table(),
|
||||
not lists:member(cache, Tables) andalso cache_model:create_table(),
|
||||
not lists:member(task_log, Tables) andalso task_log_model:create_table(),
|
||||
ok.
|
||||
131
apps/efka/src/efka_inetd_task_log.erl
Normal file
131
apps/efka/src/efka_inetd_task_log.erl
Normal file
@ -0,0 +1,131 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2025, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 09. 5月 2025 16:45
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(efka_inetd_task_log).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([stash/2, flush/1, get_logs/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {
|
||||
%% #{task_id => queue:new()}
|
||||
pending_map = #{}
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec stash(TaskId :: integer(), Log :: binary()) -> no_return().
|
||||
stash(TaskId, Log) when is_integer(TaskId), is_binary(Log) ->
|
||||
gen_server:cast(?SERVER, {stash, TaskId, Log}).
|
||||
|
||||
-spec flush(TaskId :: integer()) -> no_return().
|
||||
flush(TaskId) when is_integer(TaskId) ->
|
||||
gen_server:cast(?SERVER, {flush, TaskId}).
|
||||
|
||||
-spec get_logs(TaskId :: integer()) -> {ok, Logs :: list()}.
|
||||
get_logs(TaskId) when is_integer(TaskId) ->
|
||||
gen_server:call(?SERVER, {get_logs, TaskId}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call({get_logs, TaskId}, _From, State = #state{pending_map = PendingMap}) ->
|
||||
case maps:find(TaskId, PendingMap) of
|
||||
error ->
|
||||
Logs = task_log_model:get_logs(TaskId),
|
||||
{reply, {ok, Logs}, State};
|
||||
{ok, Q} ->
|
||||
Logs = queue:to_list(Q),
|
||||
{reply, {ok, Logs}, State}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({stash, TaskId, Log}, State = #state{pending_map = PendingMap}) ->
|
||||
Q = maps:get(TaskId, PendingMap, queue:new()),
|
||||
NQ = queue:in(Log, Q),
|
||||
{noreply, State#state{pending_map = maps:put(TaskId, NQ, PendingMap)}};
|
||||
handle_cast({flush, TaskId}, State = #state{pending_map = PendingMap}) ->
|
||||
case maps:take(TaskId, PendingMap) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{Q, NPendingMap} ->
|
||||
Logs = queue:to_list(Q),
|
||||
ok = task_log_model:insert(TaskId, Logs),
|
||||
{noreply, State#state{pending_map = NPendingMap}}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
46
apps/efka/src/mnesia/task_log_model.erl
Normal file
46
apps/efka/src/mnesia/task_log_model.erl
Normal file
@ -0,0 +1,46 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 04. 7月 2023 12:31
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(task_log_model).
|
||||
-author("aresei").
|
||||
-include("efka_tables.hrl").
|
||||
-include_lib("stdlib/include/qlc.hrl").
|
||||
|
||||
-define(TAB, task_log).
|
||||
|
||||
%% API
|
||||
-export([create_table/0]).
|
||||
-export([insert/2, get_logs/1]).
|
||||
|
||||
create_table() ->
|
||||
%% id生成器
|
||||
mnesia:create_table(task_log, [
|
||||
{attributes, record_info(fields, task_log)},
|
||||
{record_name, task_log},
|
||||
{disc_copies, [node()]},
|
||||
{type, ordered_set}
|
||||
]).
|
||||
|
||||
-spec insert(TaskId :: integer(), Logs :: [binary()]) -> ok | {error, Reason :: term()}.
|
||||
insert(TaskId, Logs) when is_integer(TaskId), is_list(Logs) ->
|
||||
TaskLog = #task_log{task_id = TaskId, logs = Logs},
|
||||
case mnesia:transaction(fun() -> mnesia:write(?TAB, TaskLog, write) end) of
|
||||
{'atomic', Res} ->
|
||||
Res;
|
||||
{'aborted', Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
-spec get_logs(TaskId :: integer()) -> Logs :: [binary()].
|
||||
get_logs(TaskId) when is_integer(TaskId) ->
|
||||
case mnesia:dirty_read(?TAB, TaskId) of
|
||||
[] ->
|
||||
[];
|
||||
[#task_log{logs = Logs}] ->
|
||||
Logs
|
||||
end.
|
||||
Loading…
x
Reference in New Issue
Block a user