diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl new file mode 100644 index 0000000..af014ed --- /dev/null +++ b/apps/efka/include/efka_tables.hrl @@ -0,0 +1,19 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 29. 9月 2025 15:27 +%%%------------------------------------------------------------------- +-author("anlicheng"). + +%% 用来保存微服务 +-record(service, { + service_id :: binary(), + container_name :: binary(), + %% 配置信息, 微服务启动的时候自己注册的信息 + meta_data = #{} :: map(), + %% 状态: 0: 停止, 1: 运行中 + status = 0 +}). \ No newline at end of file diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index f24bc30..e7b36eb 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -55,6 +55,15 @@ init([]) -> modules => ['cache_model'] }, + #{ + id => service_model, + start => {service_model, start_link, []}, + restart => permanent, + shutdown => 5000, + type => worker, + modules => ['service_model'] + }, + #{ id => 'efka_subscription', start => {'efka_subscription', start_link, []}, diff --git a/apps/efka/src/models/cache_model.erl b/apps/efka/src/models/cache_model.erl index 4f7d7ef..2644668 100644 --- a/apps/efka/src/models/cache_model.erl +++ b/apps/efka/src/models/cache_model.erl @@ -8,7 +8,6 @@ %%%------------------------------------------------------------------- -module(cache_model). -author("anlicheng"). --include("efka_tables.hrl"). -behaviour(gen_server). diff --git a/apps/efka/src/models/service_model.erl b/apps/efka/src/models/service_model.erl index 3b069cb..182c57f 100644 --- a/apps/efka/src/models/service_model.erl +++ b/apps/efka/src/models/service_model.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([insert/1, change_status/2, set_config/2, get_config_json/1, get_status/1, get_service/1, get_all_services/0, get_running_services/0]). +-export([insert/1, change_status/2, get_status/1, get_service/1, get_all_services/0, get_running_services/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,14 +36,6 @@ insert(Service = #service{}) -> change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewStatus) -> gen_server:call(?SERVER, {change_status, ServiceId, NewStatus}). --spec set_config(ServiceId :: binary(), ConfigJson :: binary()) -> ok | {error, Reason :: any()}. -set_config(ServiceId, ConfigJson) when is_binary(ServiceId), is_binary(ConfigJson) -> - gen_server:call(?SERVER, {set_config, ServiceId, ConfigJson}). - --spec get_config_json(ServiceId :: binary()) -> error | {ok, ConfigJson :: binary()}. -get_config_json(ServiceId) when is_binary(ServiceId) -> - gen_server:call(?SERVER, {get_config_json, ServiceId}). - -spec get_status(ServiceId :: binary()) -> Status :: integer(). get_status(ServiceId) when is_binary(ServiceId) -> gen_server:call(?SERVER, {get_status, ServiceId}). @@ -105,24 +97,6 @@ handle_call({change_status, ServiceId, NewStatus}, _From, State = #state{}) -> {reply, ok, State} end; -handle_call({set_config, ServiceId, ConfigJson}, _From, State = #state{}) -> - case dets:lookup(?TAB, ServiceId) of - [] -> - {reply, {error, <<"service not found">>}, State}; - [OldService] -> - NewService = OldService#service{config_json = ConfigJson}, - ok = dets:insert(?TAB, NewService), - {reply, ok, State} - end; - -handle_call({get_config_json, ServiceId}, _From, State = #state{}) -> - case dets:lookup(?TAB, ServiceId) of - [] -> - {reply, error, State}; - [#service{config_json = ConfigJson}] -> - {reply, {ok, ConfigJson}, State} - end; - handle_call({get_status, ServiceId}, _From, State = #state{}) -> case dets:lookup(?TAB, ServiceId) of [] -> diff --git a/apps/efka/src/ws_channel.erl b/apps/efka/src/ws_channel.erl index 5d111cc..35f4a0e 100644 --- a/apps/efka/src/ws_channel.erl +++ b/apps/efka/src/ws_channel.erl @@ -8,6 +8,7 @@ %%%------------------------------------------------------------------- -module(ws_channel). -author("licheng5"). +-include("efka_tables.hrl"). %% API -export([init/2]). @@ -17,6 +18,7 @@ -define(PENDING_TIMEOUT, 10 * 1000). -record(state, { + service_id :: binary(), service_pid :: undefined | pid(), is_registered = false :: boolean() }). @@ -70,7 +72,13 @@ websocket_info(Info, State) -> {ok, State}. %% 进程关闭事件 -terminate(Reason, _Req, State) -> +terminate(Reason, _Req, State = #state{service_id = ServiceId, is_registered = IsRegistered}) -> + case IsRegistered of + true -> + ok = service_model:change_status(ServiceId, 0); + false -> + ok + end, lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]), ok. @@ -79,14 +87,18 @@ terminate(Reason, _Req, State) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% 注册, 要建立程序和容器之间的关系 -handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State) -> +handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := Params = #{<<"service_id">> := ServiceId}}, State) -> {ok, ServicePid} = efka_service_sup:start_service(ServiceId), case efka_service:attach_channel(ServicePid, self()) of ok -> Reply = json_result(Id, <<"ok">>), erlang:monitor(process, ServicePid), - {reply, {text, Reply}, State#state{service_pid = ServicePid, is_registered = true}}; + %% 更新微服务的状态 + MetaData = maps:get(<<"meta_data">>, Params, #{}), + ok = service_model:insert(#service{service_id = ServiceId, status = 1, meta_data = MetaData}), + + {reply, {text, Reply}, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}}; {error, Error} -> lager:warning("[ws_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), {stop, State}