From f51756e4b5806c4f6fc4a52d687f272277c4e7ec Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 13 Aug 2025 17:13:09 +0800 Subject: [PATCH] remove mnesia --- apps/efka/src/efka.app.src | 1 - apps/efka/src/efka_app.erl | 37 +----- apps/efka/src/efka_model_sup.erl | 75 +++++++++++ apps/efka/src/efka_sup.erl | 9 ++ apps/efka/src/mnesia/cache_model.erl | 74 ----------- apps/efka/src/mnesia/id_generator_model.erl | 26 ---- apps/efka/src/mnesia/service_model.erl | 140 -------------------- apps/efka/src/mnesia/task_log_model.erl | 46 ------- apps/efka/src/models/service_model.erl | 5 +- apps/efka/src/models/task_log_model.erl | 122 +++++++++++++++++ 10 files changed, 208 insertions(+), 327 deletions(-) create mode 100644 apps/efka/src/efka_model_sup.erl delete mode 100644 apps/efka/src/mnesia/cache_model.erl delete mode 100644 apps/efka/src/mnesia/id_generator_model.erl delete mode 100644 apps/efka/src/mnesia/service_model.erl delete mode 100644 apps/efka/src/mnesia/task_log_model.erl create mode 100644 apps/efka/src/models/task_log_model.erl diff --git a/apps/efka/src/efka.app.src b/apps/efka/src/efka.app.src index beb2604..3fe4287 100644 --- a/apps/efka/src/efka.app.src +++ b/apps/efka/src/efka.app.src @@ -8,7 +8,6 @@ sync, jiffy, %gpb, - mnesia, parse_trans, lager, crypto, diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index 9fbea81..ac89df2 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -11,44 +11,9 @@ start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), - %% 启动mnesia数据库 - start_mnesia(), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), efka_sup:start_link(). stop(_State) -> - ok. - -%% internal functions - -%% 启动内存数据库 -start_mnesia() -> - %% 启动数据库 - ensure_mnesia_schema(), - ok = mnesia:start(), - Tables = mnesia:system_info(tables), - lager:debug("[efka_app] tables: ~p", [Tables]), - %% 创建数据库表 - not lists:member(id_generator, Tables) andalso id_generator_model:create_table(), - not lists:member(service, Tables) andalso service_model:create_table(), - not lists:member(cache, Tables) andalso cache_model:create_table(), - not lists:member(task_log, Tables) andalso task_log_model:create_table(), - ok. - --spec ensure_mnesia_schema() -> any(). -ensure_mnesia_schema() -> - case mnesia:system_info(use_dir) of - true -> - lager:debug("[efka_app] mnesia schema exists"), - ok; - false -> - mnesia:stop(), - case mnesia:create_schema([node()]) of - ok -> ok; - {error, {_, {already_exists, _}}} -> ok; - Error -> - lager:debug("[iot_app] create mnesia schema failed with error: ~p", [Error]), - throw({init_schema, Error}) - end - end. \ No newline at end of file + ok. \ No newline at end of file diff --git a/apps/efka/src/efka_model_sup.erl b/apps/efka/src/efka_model_sup.erl new file mode 100644 index 0000000..92ccd8d --- /dev/null +++ b/apps/efka/src/efka_model_sup.erl @@ -0,0 +1,75 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 18. 4月 2025 16:42 +%%%------------------------------------------------------------------- +-module(efka_model_sup). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). +-export([start_service/1, stop_service/1]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%% @doc Starts the supervisor +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + Specs = [ + #{ + id => cache_model, + start => {cache_model, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => ['cache_model'] + }, + #{ + id => service_model, + start => {service_model, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => ['service_model'] + }, + #{ + id => task_log_model, + start => {task_log_model, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => ['task_log_model'] + } + ], + + {ok, {SupFlags, Specs}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index 212e443..9e95ea3 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [ + #{ + id => 'efka_model_sup', + start => {'efka_model_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['efka_model_sup'] + }, + #{ id => 'efka_inetd_task_log', start => {'efka_inetd_task_log', start_link, []}, diff --git a/apps/efka/src/mnesia/cache_model.erl b/apps/efka/src/mnesia/cache_model.erl deleted file mode 100644 index 28fb6f1..0000000 --- a/apps/efka/src/mnesia/cache_model.erl +++ /dev/null @@ -1,74 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 12:31 -%%%------------------------------------------------------------------- --module(cache_model). --author("aresei"). --include("efka_tables.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --define(TAB, cache). - -%% API --export([create_table/0]). --export([insert/2, get_all_cache/0, fetch_next/0, delete/1, next_id/0]). --export([first_key/0]). - -create_table() -> - %% id生成器 - {atomic, ok} = mnesia:create_table(cache, [ - {attributes, record_info(fields, cache)}, - {record_name, cache}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - -next_id() -> - id_generator_model:next_id(?TAB). - --spec insert(Method :: integer(), Data :: binary()) -> ok | {error, Reason :: any()}. -insert(Method, Data) when is_integer(Method), is_binary(Data) -> - Cache = #cache{id = next_id(), method = Method, data = Data}, - case mnesia:transaction(fun() -> mnesia:write(?TAB, Cache, write) end) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - -fetch_next() -> - case mnesia:dirty_first(?TAB) of - '$end_of_table' -> - error; - Id -> - [Entry] = mnesia:dirty_read(?TAB, Id), - {ok, Entry} - end. - -delete(Id) when is_integer(Id) -> - case mnesia:transaction(fun() -> mnesia:delete(?TAB, Id, write) end) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec get_all_cache() -> [#cache{}]. -get_all_cache() -> - Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB)]), - qlc:e(Q) - end, - case mnesia:transaction(Fun) of - {'atomic', Res} -> - Res; - {'aborted', _} -> - [] - end. - -first_key() -> - mnesia:dirty_first(?TAB). diff --git a/apps/efka/src/mnesia/id_generator_model.erl b/apps/efka/src/mnesia/id_generator_model.erl deleted file mode 100644 index 8bc2b80..0000000 --- a/apps/efka/src/mnesia/id_generator_model.erl +++ /dev/null @@ -1,26 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 06. 5月 2025 10:32 -%%%------------------------------------------------------------------- --module(id_generator_model). --author("anlicheng"). --include("efka_tables.hrl"). - -%% API --export([create_table/0, next_id/1]). - -create_table() -> - %% id生成器 - {atomic, ok} = mnesia:create_table(id_generator, [ - {attributes, record_info(fields, id_generator)}, - {record_name, id_generator}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - -next_id(Tab) when is_atom(Tab) -> - mnesia:dirty_update_counter(id_generator, Tab, 1). \ No newline at end of file diff --git a/apps/efka/src/mnesia/service_model.erl b/apps/efka/src/mnesia/service_model.erl deleted file mode 100644 index d5f3fa7..0000000 --- a/apps/efka/src/mnesia/service_model.erl +++ /dev/null @@ -1,140 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 12:31 -%%%------------------------------------------------------------------- --module(service_model). --author("aresei"). --include("efka_tables.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --define(TAB, service). - -%% API --export([create_table/0]). --export([insert/1, get_all_services/0, get_all_service_ids/0, get_running_services/0]). --export([get_config_json/1, set_config/2, get_service/1, get_status/1, change_status/2]). --export([display_services/0]). - -create_table() -> - %% id生成器 - {atomic, ok} = mnesia:create_table(service, [ - {attributes, record_info(fields, service)}, - {record_name, service}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - -insert(Service = #service{}) -> - case mnesia:transaction(fun() -> mnesia:write(?TAB, Service, write) end) of - {'atomic', Res} -> - Res; - {'aborted', Reason} -> - {error, Reason} - end. - -change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewStatus) -> - Fun = fun() -> - case mnesia:read(?TAB, ServiceId, write) of - [] -> - mnesia:abort(<<"service not found">>); - [Service] -> - mnesia:write(?TAB, Service#service{status = NewStatus}, write) - end - end, - case mnesia:transaction(Fun) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec set_config(ServiceId :: binary(), ConfigJson :: binary()) -> ok | {error, Reason :: any()}. -set_config(ServiceId, ConfigJson) when is_binary(ServiceId), is_binary(ConfigJson) -> - Fun = fun() -> - case mnesia:read(?TAB, ServiceId, write) of - [] -> - mnesia:abort(<<"service not found">>); - [S] -> - mnesia:write(?TAB, S#service{config_json = ConfigJson}, write) - end - end, - case mnesia:transaction(Fun) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec get_config_json(ServiceId :: binary()) -> error | {ok, ConfigJson :: binary()}. -get_config_json(ServiceId) when is_binary(ServiceId) -> - case mnesia:dirty_read(?TAB, ServiceId) of - [] -> - error; - [#service{config_json = ConfigJson}] -> - {ok, ConfigJson} - end. - --spec get_status(ServiceId :: binary()) -> Status :: integer(). -get_status(ServiceId) when is_binary(ServiceId) -> - case mnesia:dirty_read(?TAB, ServiceId) of - [] -> - 0; - [#service{status = Status}] -> - Status - end. - --spec get_service(ServiceId :: binary()) -> error | {ok, Service :: #service{}}. -get_service(ServiceId) when is_binary(ServiceId) -> - case mnesia:dirty_read(?TAB, ServiceId) of - [] -> - error; - [Service] -> - {ok, Service} - end. - --spec get_all_services() -> [#service{}]. -get_all_services() -> - Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB)]), - qlc:e(Q) - end, - - case mnesia:transaction(Fun) of - {'atomic', Res} -> - Res; - {'aborted', _} -> - [] - end. - --spec get_all_service_ids() -> [ServiceId :: binary()]. -get_all_service_ids() -> - mnesia:dirty_all_keys(?TAB). - --spec get_running_services() -> {ok, [#service{}]} | {error, Reason :: term()}. -get_running_services() -> - F = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB), E#service.status == 1]), - qlc:e(Q) - end, - case mnesia:transaction(F) of - {atomic, Services} -> - {ok, Services}; - {aborted, Error} -> - {error, Error} - end. - -display_services() -> - F = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB)]), - qlc:e(Q) - end, - case mnesia:transaction(F) of - {atomic, Services} -> - {ok, Services}; - {aborted, Error} -> - {error, Error} - end. \ No newline at end of file diff --git a/apps/efka/src/mnesia/task_log_model.erl b/apps/efka/src/mnesia/task_log_model.erl deleted file mode 100644 index 617f332..0000000 --- a/apps/efka/src/mnesia/task_log_model.erl +++ /dev/null @@ -1,46 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 12:31 -%%%------------------------------------------------------------------- --module(task_log_model). --author("aresei"). --include("efka_tables.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --define(TAB, task_log). - -%% API --export([create_table/0]). --export([insert/2, get_logs/1]). - -create_table() -> - %% id生成器 - {atomic, ok} = mnesia:create_table(task_log, [ - {attributes, record_info(fields, task_log)}, - {record_name, task_log}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - --spec insert(TaskId :: integer(), Logs :: [binary()]) -> ok | {error, Reason :: term()}. -insert(TaskId, Logs) when is_integer(TaskId), is_list(Logs) -> - TaskLog = #task_log{task_id = TaskId, logs = Logs}, - case mnesia:transaction(fun() -> mnesia:write(?TAB, TaskLog, write) end) of - {'atomic', Res} -> - Res; - {'aborted', Reason} -> - {error, Reason} - end. - --spec get_logs(TaskId :: integer()) -> Logs :: [binary()]. -get_logs(TaskId) when is_integer(TaskId) -> - case mnesia:dirty_read(?TAB, TaskId) of - [] -> - []; - [#task_log{logs = Logs}] -> - Logs - end. \ No newline at end of file diff --git a/apps/efka/src/models/service_model.erl b/apps/efka/src/models/service_model.erl index 9e0554f..7c3eca3 100644 --- a/apps/efka/src/models/service_model.erl +++ b/apps/efka/src/models/service_model.erl @@ -14,6 +14,7 @@ %% API -export([start_link/0]). +-export([insert/1, change_status/2, set_config/2, get_config_json/1, get_status/1, get_service/1, get_all_services/0, get_running_services/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -55,10 +56,6 @@ get_service(ServiceId) when is_binary(ServiceId) -> get_all_services() -> gen_server:call(?SERVER, get_all_services). --spec get_all_service_ids() -> [ServiceId :: binary()]. -get_all_service_ids() -> - gen_server:call(?SERVER, get_all_service_ids). - -spec get_running_services() -> {ok, [#service{}]} | {error, Reason :: term()}. get_running_services() -> gen_server:call(?SERVER, get_running_services). diff --git a/apps/efka/src/models/task_log_model.erl b/apps/efka/src/models/task_log_model.erl new file mode 100644 index 0000000..aae3192 --- /dev/null +++ b/apps/efka/src/models/task_log_model.erl @@ -0,0 +1,122 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 13. 8月 2025 17:01 +%%%------------------------------------------------------------------- +-module(task_log_model). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([insert/2, get_logs/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(TAB, task_log). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec insert(TaskId :: integer(), Logs :: [binary()]) -> ok | {error, Reason :: term()}. +insert(TaskId, Logs) when is_integer(TaskId), is_list(Logs) -> + TaskLog = #task_log{task_id = TaskId, logs = Logs}, + gen_server:call(?SERVER, {insert, TaskLog}). + +-spec get_logs(TaskId :: integer()) -> Logs :: [binary()]. +get_logs(TaskId) when is_integer(TaskId) -> + gen_server:call(?SERVER, {get_logs, TaskId}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, DetsDir} = application:get_env(efka, dets_dir), + File = DetsDir ++ "task_log.dets", + {ok, ?TAB} = dets:open_file(?TAB, [{file, File}, {type, set}, {keypos, 2}]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({insert, TaskLog}, _From, State = #state{}) -> + ok = dets:insert(?TAB, TaskLog), + {reply, ok, State}; +handle_call({get_logs, TaskId}, _From, State = #state{}) -> + Reply = case dets:lookup(?TAB, TaskId) of + [] -> + []; + [#task_log{logs = Logs}|_] -> + Logs + end, + {reply, Reply, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%===================================================================