From 040198e8d5debb5e53e9466c18ffbd3d028c1213 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 19 May 2025 23:52:34 +0800 Subject: [PATCH] fix --- apps/efka/include/efka_tables.hrl | 2 +- apps/efka/src/efka_inetd.erl | 25 ++++++++++++++++--------- apps/efka/src/efka_manifest.erl | 5 ++--- apps/efka/src/efka_service.erl | 10 +++++++++- apps/efka/src/efka_service_sup.erl | 12 ------------ config/sys.config | 2 +- 6 files changed, 29 insertions(+), 27 deletions(-) diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index 7579eeb..0f42a03 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -13,7 +13,7 @@ service_id :: binary(), tar_url :: binary(), %% 工作目录 - root_dir :: binary(), + root_dir :: string(), params :: binary(), metrics :: binary(), %% 状态: 0: 停止, 1: 运行中 diff --git a/apps/efka/src/efka_inetd.erl b/apps/efka/src/efka_inetd.erl index 561bf45..f0c9850 100644 --- a/apps/efka/src/efka_inetd.erl +++ b/apps/efka/src/efka_inetd.erl @@ -71,16 +71,23 @@ init([]) -> handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> %% 创建目录 {ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId), - case check_download_url(TarUrl) of - ok -> - {ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl), - efka_inetd_task:deploy(TaskPid), - lager:debug("[efka_inetd] start task_id: ~p, tar_url: ~p", [TaskId, TarUrl]), - {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; - {error, Reason} -> - lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), - {reply, {error, <<"download url error">>}, State} + ServicePid = efka_service:get_pid(ServiceId), + case is_pid(ServicePid) andalso efka_service:is_running(ServicePid) of + true -> + {reply, {error, <<"the service is running, stop first">>}, State}; + false -> + case check_download_url(TarUrl) of + ok -> + {ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl), + efka_inetd_task:deploy(TaskPid), + lager:debug("[efka_inetd] start task_id: ~p, tar_url: ~p", [TaskId, TarUrl]), + + {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; + {error, Reason} -> + lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), + {reply, {error, <<"download url error">>}, State} + end end; handle_call(_Request, _From, State = #state{}) -> diff --git a/apps/efka/src/efka_manifest.erl b/apps/efka/src/efka_manifest.erl index d9eab70..debc722 100644 --- a/apps/efka/src/efka_manifest.erl +++ b/apps/efka/src/efka_manifest.erl @@ -24,9 +24,8 @@ %% API -export([new/1, startup/1]). --spec new(WorkDir0 :: binary()) -> {ok, #manifest{}} | {error, Reason :: binary()}. -new(WorkDir0) when is_binary(WorkDir0) -> - WorkDir = binary_to_list(WorkDir0), +-spec new(WorkDir :: string()) -> {ok, #manifest{}} | {error, Reason :: binary()}. +new(WorkDir) when is_list(WorkDir) -> case file:read_file(WorkDir ++ "manifest.json") of {ok, ManifestInfo} -> Settings = catch jiffy:decode(ManifestInfo, [return_maps]), diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index c7d152e..4588ab7 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -20,7 +20,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_config/3, request_config/1, invoke/3]). +-export([push_config/3, request_config/1, invoke/3, is_running/1]). -export([metric_data/3, send_event/3]). %% gen_server callbacks @@ -59,6 +59,10 @@ push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) -> invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) -> gen_server:cast(Pid, {invoke, Ref, self(), Payload}). +-spec is_running(Pid :: pid()) -> boolean(). +is_running(Pid) when is_pid(Pid) -> + gen_server:call(Pid, is_running). + request_config(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_config). @@ -148,6 +152,10 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol {reply, {error, <<"serivce stopped">>}, State} end; +%% 获取服务的运行状态 +handle_call(is_running, _From, State = #state{running_status = RunningStatus}) -> + {reply, RunningStatus, State}; + %% 请求参数项 done handle_call(request_config, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> Params = service_model:get_params(ServiceId), diff --git a/apps/efka/src/efka_service_sup.erl b/apps/efka/src/efka_service_sup.erl index 8fca867..94c9078 100644 --- a/apps/efka/src/efka_service_sup.erl +++ b/apps/efka/src/efka_service_sup.erl @@ -46,18 +46,6 @@ start_link() -> | ignore | {error, Reason :: term()}). init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - - service_model:insert(#service{ - service_id = <<"test1234">>, - tar_url = <<"http://118.178.229.213:3000/anlicheng/ekfa/archive1/main.tar.gz">>, - %% 工作目录 - root_dir = <<"/usr/local/code/tmp/test/">>, - params = <<"">>, - metrics = <<"">>, - %% 状态: 0: 停止, 1: 运行中 - status = 1 - }), - MicroServiceIds = service_model:get_all_service_ids(), Specs = lists:map(fun(ServiceId) -> child_spec(ServiceId) end, MicroServiceIds), diff --git a/config/sys.config b/config/sys.config index 2f237ab..7c12877 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,6 +1,6 @@ [ {efka, [ - {root_dir, "/usr/local/code/efka/"}, + {root_dir, "/usr/local/code/efka"}, {tcp_server, [ {port, 18088}