fix
This commit is contained in:
parent
bdd124376b
commit
5a573be0fc
19
apps/efka/include/efka_tables.hrl
Normal file
19
apps/efka/include/efka_tables.hrl
Normal file
@ -0,0 +1,19 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2025, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 29. 9月 2025 15:27
|
||||
%%%-------------------------------------------------------------------
|
||||
-author("anlicheng").
|
||||
|
||||
%% 用来保存微服务
|
||||
-record(service, {
|
||||
service_id :: binary(),
|
||||
container_name :: binary(),
|
||||
%% 配置信息, 微服务启动的时候自己注册的信息
|
||||
meta_data = #{} :: map(),
|
||||
%% 状态: 0: 停止, 1: 运行中
|
||||
status = 0
|
||||
}).
|
||||
@ -55,6 +55,15 @@ init([]) ->
|
||||
modules => ['cache_model']
|
||||
},
|
||||
|
||||
#{
|
||||
id => service_model,
|
||||
start => {service_model, start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 5000,
|
||||
type => worker,
|
||||
modules => ['service_model']
|
||||
},
|
||||
|
||||
#{
|
||||
id => 'efka_subscription',
|
||||
start => {'efka_subscription', start_link, []},
|
||||
|
||||
@ -8,7 +8,6 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(cache_model).
|
||||
-author("anlicheng").
|
||||
-include("efka_tables.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([insert/1, change_status/2, set_config/2, get_config_json/1, get_status/1, get_service/1, get_all_services/0, get_running_services/0]).
|
||||
-export([insert/1, change_status/2, get_status/1, get_service/1, get_all_services/0, get_running_services/0]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
@ -36,14 +36,6 @@ insert(Service = #service{}) ->
|
||||
change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewStatus) ->
|
||||
gen_server:call(?SERVER, {change_status, ServiceId, NewStatus}).
|
||||
|
||||
-spec set_config(ServiceId :: binary(), ConfigJson :: binary()) -> ok | {error, Reason :: any()}.
|
||||
set_config(ServiceId, ConfigJson) when is_binary(ServiceId), is_binary(ConfigJson) ->
|
||||
gen_server:call(?SERVER, {set_config, ServiceId, ConfigJson}).
|
||||
|
||||
-spec get_config_json(ServiceId :: binary()) -> error | {ok, ConfigJson :: binary()}.
|
||||
get_config_json(ServiceId) when is_binary(ServiceId) ->
|
||||
gen_server:call(?SERVER, {get_config_json, ServiceId}).
|
||||
|
||||
-spec get_status(ServiceId :: binary()) -> Status :: integer().
|
||||
get_status(ServiceId) when is_binary(ServiceId) ->
|
||||
gen_server:call(?SERVER, {get_status, ServiceId}).
|
||||
@ -105,24 +97,6 @@ handle_call({change_status, ServiceId, NewStatus}, _From, State = #state{}) ->
|
||||
{reply, ok, State}
|
||||
end;
|
||||
|
||||
handle_call({set_config, ServiceId, ConfigJson}, _From, State = #state{}) ->
|
||||
case dets:lookup(?TAB, ServiceId) of
|
||||
[] ->
|
||||
{reply, {error, <<"service not found">>}, State};
|
||||
[OldService] ->
|
||||
NewService = OldService#service{config_json = ConfigJson},
|
||||
ok = dets:insert(?TAB, NewService),
|
||||
{reply, ok, State}
|
||||
end;
|
||||
|
||||
handle_call({get_config_json, ServiceId}, _From, State = #state{}) ->
|
||||
case dets:lookup(?TAB, ServiceId) of
|
||||
[] ->
|
||||
{reply, error, State};
|
||||
[#service{config_json = ConfigJson}] ->
|
||||
{reply, {ok, ConfigJson}, State}
|
||||
end;
|
||||
|
||||
handle_call({get_status, ServiceId}, _From, State = #state{}) ->
|
||||
case dets:lookup(?TAB, ServiceId) of
|
||||
[] ->
|
||||
|
||||
@ -8,6 +8,7 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(ws_channel).
|
||||
-author("licheng5").
|
||||
-include("efka_tables.hrl").
|
||||
|
||||
%% API
|
||||
-export([init/2]).
|
||||
@ -17,6 +18,7 @@
|
||||
-define(PENDING_TIMEOUT, 10 * 1000).
|
||||
|
||||
-record(state, {
|
||||
service_id :: binary(),
|
||||
service_pid :: undefined | pid(),
|
||||
is_registered = false :: boolean()
|
||||
}).
|
||||
@ -70,7 +72,13 @@ websocket_info(Info, State) ->
|
||||
{ok, State}.
|
||||
|
||||
%% 进程关闭事件
|
||||
terminate(Reason, _Req, State) ->
|
||||
terminate(Reason, _Req, State = #state{service_id = ServiceId, is_registered = IsRegistered}) ->
|
||||
case IsRegistered of
|
||||
true ->
|
||||
ok = service_model:change_status(ServiceId, 0);
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]),
|
||||
ok.
|
||||
|
||||
@ -79,14 +87,18 @@ terminate(Reason, _Req, State) ->
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
|
||||
%% 注册, 要建立程序和容器之间的关系
|
||||
handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State) ->
|
||||
handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := Params = #{<<"service_id">> := ServiceId}}, State) ->
|
||||
{ok, ServicePid} = efka_service_sup:start_service(ServiceId),
|
||||
case efka_service:attach_channel(ServicePid, self()) of
|
||||
ok ->
|
||||
Reply = json_result(Id, <<"ok">>),
|
||||
erlang:monitor(process, ServicePid),
|
||||
|
||||
{reply, {text, Reply}, State#state{service_pid = ServicePid, is_registered = true}};
|
||||
%% 更新微服务的状态
|
||||
MetaData = maps:get(<<"meta_data">>, Params, #{}),
|
||||
ok = service_model:insert(#service{service_id = ServiceId, status = 1, meta_data = MetaData}),
|
||||
|
||||
{reply, {text, Reply}, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}};
|
||||
{error, Error} ->
|
||||
lager:warning("[ws_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]),
|
||||
{stop, State}
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user