From cab9bb538c3601476310cd35301886f4ce175bdf Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 13 Aug 2025 16:58:03 +0800 Subject: [PATCH] add dets models --- apps/efka/src/models/cache_model.erl | 144 +++++++++++++++++++ apps/efka/src/models/service_model.erl | 188 +++++++++++++++++++++++++ config/sys.config | 2 + 3 files changed, 334 insertions(+) create mode 100644 apps/efka/src/models/cache_model.erl create mode 100644 apps/efka/src/models/service_model.erl diff --git a/apps/efka/src/models/cache_model.erl b/apps/efka/src/models/cache_model.erl new file mode 100644 index 0000000..4a519c6 --- /dev/null +++ b/apps/efka/src/models/cache_model.erl @@ -0,0 +1,144 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 13. 8月 2025 16:05 +%%%------------------------------------------------------------------- +-module(cache_model). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([insert/2, fetch_next/0, delete/1, get_all_cache/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(TAB, cache). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec insert(Method :: integer(), Data :: binary()) -> ok | {error, Reason :: any()}. +insert(Method, Data) when is_integer(Method), is_binary(Data) -> + Cache = #cache{id = generate_id(), method = Method, data = Data}, + gen_server:call(?SERVER, {insert, Cache}). + +fetch_next() -> + gen_server:call(?SERVER, fetch_next). + +delete(Id) when is_integer(Id) -> + gen_server:call(?SERVER, {delete, Id}). + +-spec get_all_cache() -> [#cache{}]. +get_all_cache() -> + gen_server:call(?SERVER, get_all_cache). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, DetsDir} = application:get_env(efka, dets_dir), + File = DetsDir ++ "cache.dets", + {ok, ?TAB} = dets:open_file(?TAB, [{file, File}, {type, bag}, {keypos, 2}]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({insert, Cache}, _From, State = #state{}) -> + ok = dets:insert(?TAB, Cache), + {reply, ok, State}; +handle_call(fetch_next, _From, State = #state{}) -> + case dets:first(?TAB) of + '$end_of_table' -> + {reply, error, State}; + Key -> + [Entry] = dets:lookup(?TAB, Key), + {reply, {ok, Entry}, State} + end; +handle_call({delete, Id}, _From, State = #state{}) -> + ok = dets:delete(?TAB, Id), + {reply, ok, State}; + +handle_call(get_all_cache, _From, State = #state{}) -> + Items = dets:foldl(fun(Record, Acc) -> [Record|Acc] end, [], ?TAB), + {reply, {ok, lists:reverse(Items)}, State}; + +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + dets:close(?TAB), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec generate_id() -> integer(). +generate_id() -> + os:system_time(microsecond). \ No newline at end of file diff --git a/apps/efka/src/models/service_model.erl b/apps/efka/src/models/service_model.erl new file mode 100644 index 0000000..9e0554f --- /dev/null +++ b/apps/efka/src/models/service_model.erl @@ -0,0 +1,188 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 13. 8月 2025 16:41 +%%%------------------------------------------------------------------- +-module(service_model). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(TAB, service). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +insert(Service = #service{}) -> + gen_server:call(?SERVER, {insert, Service}). + +change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewStatus) -> + gen_server:call(?SERVER, {change_status, ServiceId, NewStatus}). + +-spec set_config(ServiceId :: binary(), ConfigJson :: binary()) -> ok | {error, Reason :: any()}. +set_config(ServiceId, ConfigJson) when is_binary(ServiceId), is_binary(ConfigJson) -> + gen_server:call(?SERVER, {set_config, ServiceId, ConfigJson}). + +-spec get_config_json(ServiceId :: binary()) -> error | {ok, ConfigJson :: binary()}. +get_config_json(ServiceId) when is_binary(ServiceId) -> + gen_server:call(?SERVER, {get_config_json, ServiceId}). + +-spec get_status(ServiceId :: binary()) -> Status :: integer(). +get_status(ServiceId) when is_binary(ServiceId) -> + gen_server:call(?SERVER, {get_status, ServiceId}). + +-spec get_service(ServiceId :: binary()) -> error | {ok, Service :: #service{}}. +get_service(ServiceId) when is_binary(ServiceId) -> + gen_server:call(?SERVER, {get_service, ServiceId}). + +-spec get_all_services() -> [#service{}]. +get_all_services() -> + gen_server:call(?SERVER, get_all_services). + +-spec get_all_service_ids() -> [ServiceId :: binary()]. +get_all_service_ids() -> + gen_server:call(?SERVER, get_all_service_ids). + +-spec get_running_services() -> {ok, [#service{}]} | {error, Reason :: term()}. +get_running_services() -> + gen_server:call(?SERVER, get_running_services). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, DetsDir} = application:get_env(efka, dets_dir), + File = DetsDir ++ "service.dets", + {ok, ?TAB} = dets:open_file(?TAB, [{file, File}, {type, bag}, {keypos, 2}]), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({insert, Service}, _From, State = #state{}) -> + ok = dets:insert(?TAB, Service), + {reply, ok, State}; + +handle_call({change_status, ServiceId, NewStatus}, _From, State = #state{}) -> + case dets:lookup(?TAB, ServiceId) of + [] -> + {reply, {error, <<"service not found">>}, State}; + [OldService] -> + NewService = OldService#service{status = NewStatus}, + ok = dets:insert(?TAB, NewService), + {reply, ok, State} + end; + +handle_call({set_config, ServiceId, ConfigJson}, _From, State = #state{}) -> + case dets:lookup(?TAB, ServiceId) of + [] -> + {reply, {error, <<"service not found">>}, State}; + [OldService] -> + NewService = OldService#service{config_json = ConfigJson}, + ok = dets:insert(?TAB, NewService), + {reply, ok, State} + end; + +handle_call({get_config_json, ServiceId}, _From, State = #state{}) -> + case dets:lookup(?TAB, ServiceId) of + [] -> + {reply, error, State}; + [#service{config_json = ConfigJson}] -> + {reply, {ok, ConfigJson}, State} + end; + +handle_call({get_status, ServiceId}, _From, State = #state{}) -> + case dets:lookup(?TAB, ServiceId) of + [] -> + {reply, 0, State}; + [#service{status = Status}] -> + {reply, Status, State} + end; + +handle_call(get_all_services, _From, State = #state{}) -> + Items = dets:foldl(fun(Record, Acc) -> [Record|Acc] end, [], ?TAB), + {reply, lists:reverse(Items), State}; + +handle_call(get_running_services, _From, State = #state{}) -> + Items = dets:foldl(fun(Record, Acc) -> [Record|Acc] end, [], ?TAB), + RunningItems = lists:filter(fun(#service{status = Status}) -> Status =:= 1 end, lists:reverse(Items)), + {reply, RunningItems, State}; + +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok = dets:close(?TAB), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/config/sys.config b/config/sys.config index 7c12877..f49e156 100644 --- a/config/sys.config +++ b/config/sys.config @@ -2,6 +2,8 @@ {efka, [ {root_dir, "/usr/local/code/efka"}, + {dets_dir, "/tmp/db/"}, + {tcp_server, [ {port, 18088} ]},