diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl new file mode 100644 index 0000000..5ca2f10 --- /dev/null +++ b/apps/efka/include/efka_tables.hrl @@ -0,0 +1,20 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 30. 4月 2025 11:16 +%%%------------------------------------------------------------------- +-author("anlicheng"). + +%% 用来保存微服务 +-record(micro_service, { + service_id :: binary(), + service_name :: binary(), + from :: binary(), + params :: binary(), + metrics :: binary(), + %% 状态: 0: 停止, 1: 运行中 + status = 0 +}). \ No newline at end of file diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index 528e846..d2b8238 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -12,13 +12,9 @@ start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), %% 启动mnesia数据库 - % mnesia:start(), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), - %% 启动tcp的服务 - % start_tcp_server(), - %% 仓库的基础url application:set_env(efka, repository_url, "http://118.178.229.213:3000/anlicheng/ekfa/"), @@ -29,21 +25,11 @@ stop(_State) -> %% internal functions -%% 启动tcp服务 -start_tcp_server() -> - Port = 1883, - TransOpts = [ - {tcp_options, [ - binary, - {reuseaddr, true}, - {active, false}, - {packet, 2}, - {nodelay, false}, - {backlog, 1024} - ]}, - {acceptors, 10}, - {max_connections, 1024} - ], - {ok, _} = esockd:open('efka/tcp_server', Port, TransOpts, {tcp_channel, start_link, []}), - - efka_loggeefka_logger:debug("[efka_app] the tcp server start at: ~p", [Port]). \ No newline at end of file +%% 启动内存数据库 +start_mnesia() -> + %% 启动数据库 + ok = mnesia:start(), + Tables = mnesia:system_info(tables), + %% 创建数据库表 + not lists:member(build_data, Tables) andalso mnesia_build_data:create_table(), + not lists:member(counter, Tables) andalso mnesia_counter:create_table(). \ No newline at end of file diff --git a/apps/efka/src/efka_micro_service_sup.erl b/apps/efka/src/efka_micro_service_sup.erl index 16671ca..9f4222a 100644 --- a/apps/efka/src/efka_micro_service_sup.erl +++ b/apps/efka/src/efka_micro_service_sup.erl @@ -8,12 +8,13 @@ %%%------------------------------------------------------------------- -module(efka_micro_service_sup). -author("anlicheng"). +-include("efka_tables.hrl"). -behaviour(supervisor). %% API -export([start_link/0]). --export([register_server/1, delete_server/1]). +-export([register_service/1, delete_service/1]). %% Supervisor callbacks -export([init/1]). @@ -46,15 +47,18 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - {ok, {SupFlags, []}}. + MicroServices = micro_service_model:get_running_services(), + Specs = lists:map(fun(Service) -> child_spec(Service) end, MicroServices), + + {ok, {SupFlags, Specs}}. %%%=================================================================== %%% Internal functions %%%=================================================================== --spec register_server(ServerId :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -register_server(ServerId) when is_binary(ServerId) -> - case supervisor:start_child(?MODULE, child_spec(ServerId)) of +-spec register_service(ServiceId :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +register_service(ServiceId) when is_binary(ServiceId) -> + case supervisor:start_child(?MODULE, child_spec(ServiceId)) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; {error, {'already_started', Pid}} when is_pid(Pid) -> @@ -63,17 +67,17 @@ register_server(ServerId) when is_binary(ServerId) -> {error, Error} end. --spec delete_server(ServerId :: binary()) -> ok. -delete_server(ServerId) when is_binary(ServerId) -> - ChildId = efka_server:get_name(ServerId), +-spec delete_service(ServiceId :: binary()) -> ok. +delete_service(ServiceId) when is_binary(ServiceId) -> + ChildId = efka_micro_service:get_name(ServiceId), ok = supervisor:terminate_child(?MODULE, ChildId), supervisor:delete_child(?MODULE, ChildId). -child_spec(ServerId) when is_binary(ServerId) -> - Name = efka_server:get_name(ServerId), +child_spec(S = #micro_service{service_id = ServiceId}) when is_binary(ServiceId) -> + Name = efka_micro_service:get_name(ServiceId), #{ id => Name, - start => {efka_micro_service, start_link, [Name, ServerId]}, + start => {efka_micro_service, start_link, [Name, S]}, restart => permanent, shutdown => 2000, type => worker, diff --git a/apps/efka/src/mnesia/micro_service_model.erl b/apps/efka/src/mnesia/micro_service_model.erl new file mode 100644 index 0000000..1d17b13 --- /dev/null +++ b/apps/efka/src/mnesia/micro_service_model.erl @@ -0,0 +1,55 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 04. 7月 2023 12:31 +%%%------------------------------------------------------------------- +-module(micro_service_model). +-author("aresei"). +-include("efka_tables.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB, micro_service). + +%% API +-export([create_table/0]). +-export([insert/1, start_service/1, stop_service/1, get_running_services/0]). + +create_table() -> + %% id生成器 + mnesia:create_table(micro_service, [ + {attributes, record_info(fields, micro_service)}, + {record_name, micro_service}, + {disc_copies, [node()]}, + {type, ordered_set} + ]). + +insert(MicroService = #micro_service{}) -> + case mnesia:transaction(fun() -> mnesia:write(micro_service, MicroService, write) end) of + {'atomic', Res} -> + Res; + {'aborted', Reason} -> + {error, Reason} + end. + +stop_service(ServiceId) when is_binary(ServiceId) -> + ok. + +start_service(ServiceId) when is_binary(ServiceId) -> + ok. + +-spec get_running_services() -> [#micro_service{}]. +get_running_services() -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB), E#micro_service.status =:= 1]), + qlc:e(Q) + end, + + case mnesia:transaction(Fun) of + {'atomic', Res} -> + Res; + {'aborted', _} -> + [] + end. \ No newline at end of file