diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index 89bcc22..e9a1a9d 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -13,7 +13,7 @@ service_id :: binary(), tar_url :: binary(), %% 工作目录 - work_dir :: binary(), + root_dir :: binary(), params :: binary(), metrics :: binary(), %% 状态: 0: 停止, 1: 运行中 diff --git a/apps/efka/src/efka_downloader.erl b/apps/efka/src/efka_downloader.erl index 17f8f84..79a6203 100644 --- a/apps/efka/src/efka_downloader.erl +++ b/apps/efka/src/efka_downloader.erl @@ -148,32 +148,3 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -%% 处理头部信息, 解析可能的文件名 -receive_data(RequestId, FullFilename) -> - receive - {http, {RequestId, stream_start, _Headers}} -> - {ok, File} = file:open(FullFilename, [write, binary]), - receive_data1(RequestId, File); - {http, {RequestId, {{_, 404, Status}, _Headers, Body}}} -> - lager:debug("[efka_downloader] http_status: ~p, body: ~p", [Status, Body]), - {error, Status} - end. -%% 接受文件数据 -receive_data1(RequestId, File) -> - receive - {http, {RequestId, {error, Reason}}} -> - ok = file:close(File), - {error, Reason}; - {http, {RequestId, stream_end, _Headers}} -> - ok = file:close(File), - ok; - {http, {RequestId, stream, Data}} -> - file:write(File, Data), - receive_data1(RequestId, File) - end. - --spec get_filename_from_url(Url :: string()) -> string(). -get_filename_from_url(Url) when is_list(Url) -> - URIMap = uri_string:parse(Url), - Path = maps:get(path, URIMap), - filename:basename(Path). \ No newline at end of file diff --git a/apps/efka/src/efka_inetd.erl b/apps/efka/src/efka_inetd.erl index 7501bdd..717ecfd 100644 --- a/apps/efka/src/efka_inetd.erl +++ b/apps/efka/src/efka_inetd.erl @@ -18,7 +18,6 @@ %% API -export([start_link/0]). -export([deploy/3]). --export([ensure_dirs/2, check_lock/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -55,6 +54,8 @@ start_link() -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> + erlang:process_flag(trap_exit, true), + {ok, RootDir} = application:get_env(efka, root_dir), {ok, #state{root_dir = RootDir}}. @@ -68,35 +69,11 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({deploy, TaskId, ServerId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> - %% 创建目录 - {ok, WorkDir} = ensure_dirs(RootDir, ServerId), - case check_lock(WorkDir) of - true -> - {reply, ok, State}; - false -> - case check_download_url(TarUrl) of - ok -> - {ok, {TaskPid, Ref}} = efka_downloader:start_monitor(), - efka_downloader:download(TaskPid, Ref, binary_to_list(TarUrl), WorkDir), +handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> + {ok, TaskPid} = efka_inetd_task:start_link(TaskId, RootDir, ServiceId, TarUrl), + efka_inetd_task:deploy(TaskPid), - %% 保存服务的上下文 - Service = #micro_service{ - service_id = ServerId, - tar_url = TarUrl, - %% 工作目录 - work_dir = list_to_binary(WorkDir), - params = <<"">>, - metrics = <<"">>, - %% 状态: 0: 停止, 1: 运行中 - status = 0 - }, - {reply, ok, State#state{task_map = maps:put(Ref, {TaskId, Service}, TaskMap)}}; - {error, Reason} -> - lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), - {reply, {error, <<"download url error">>}, State} - end - end; + {reply, ok, State}; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -116,38 +93,6 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 下载完整 -handle_info({downloader_reply, Ref, ok}, State = #state{task_map = TaskMap}) -> - {{TaskId, Service}, NTaskMap} = maps:take(Ref, TaskMap), - lager:debug("[efka_inetd] task_id: ~p", [TaskId]), - %% 汇报taskId的执行进度 - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 1), - %% 正常启动服务 - case start_service(Service) of - {ok, Pid} when is_pid(Pid) -> - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"boot">>, 1), - - micro_service_model:insert(Service#micro_service{status = 1}); - {error, Reason} -> - lager:debug("[efka_inetd] boot service get error: ~p", [Reason]), - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"boot">>, 0), - micro_service_model:insert(Service#micro_service{status = 0}) - end, - - {noreply, State#state{task_map = NTaskMap}}; - -%% 下载失败 -handle_info({downloader_reply, Ref, {error, Reason}}, State = #state{task_map = TaskMap}) -> - {{TaskId, _Service}, NTaskMap} = maps:take(Ref, TaskMap), - lager:debug("[efka_inetd] service: ~p, download get error: ~p", [Reason]), - %% 下载完整 - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 0), - {noreply, State#state{task_map = NTaskMap}}; - -%% 下载任务异常退出 -handle_info({'DOWN', _Ref, process, _Pid, normal}, State) -> - {noreply, State}; - %% 下载任务异常退出 handle_info({'DOWN', Ref, process, _Pid, {error, Reason}}, State = #state{task_map = TaskMap}) -> {{TaskId, _Service}, NTaskMap} = maps:take(Ref, TaskMap), @@ -182,45 +127,3 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec ensure_dirs(RootDir :: string(), ServerId :: binary()) -> {ok, ServerRootDir :: string()}. -ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) -> - %% 根目录 - ServerRootDir = RootDir ++ "/" ++ binary_to_list(ServerId) ++ "/", - ok = filelib:ensure_dir(ServerRootDir), - %% 工作逻辑,压缩文件需要解压到工作目录 - WorkDir = ServerRootDir ++ "/work_dir/", - ok = filelib:ensure_dir(WorkDir), - - {ok, ServerRootDir}. - --spec check_lock(DirName :: string()) -> boolean(). -check_lock(DirName) when is_list(DirName) -> - FileName = DirName ++ ".efka.lock", - filelib:is_file(FileName). - -%% 通过head请求先判定下载地址是否正确 -check_download_url(Url) when is_binary(Url) -> - check_download_url(binary_to_list(Url)); -check_download_url(Url) when is_list(Url) -> - SslOpts = [ - {ssl, [ - % 完全禁用证书验证 - {verify, verify_none} - ]} - ], - case httpc:request(head, {Url, []}, SslOpts, [{sync, true}]) of - {ok, {{_, 200, "OK"}, _Headers, _}} -> - ok; - {error, Reason} -> - {error, Reason} - end. - --spec start_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -start_service(S = #micro_service{service_id = ServiceId}) -> - %% 正常启动服务 - case efka_micro_service:get_pid(ServiceId) of - undefined -> - efka_micro_service_sup:start_service(S); - Pid -> - {ok, Pid} - end. \ No newline at end of file diff --git a/apps/efka/src/efka_inetd_task.erl b/apps/efka/src/efka_inetd_task.erl new file mode 100644 index 0000000..5ee8438 --- /dev/null +++ b/apps/efka/src/efka_inetd_task.erl @@ -0,0 +1,298 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 07. 5月 2025 15:47 +%%%------------------------------------------------------------------- +-module(efka_inetd_task). +-author("anlicheng"). +-include("efka_tables.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/4]). +-export([deploy/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + root_dir :: string(), + task_id :: integer(), + service_id :: binary(), + tar_url :: binary() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec deploy(Pid :: pid()) -> no_return(). +deploy(Pid) when is_pid(Pid) -> + ReceiverPid = self(), + gen_server:cast(Pid, {deploy, ReceiverPid}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(TaskId :: integer(), RootDir :: string(), ServiceId :: binary(), TarUrl :: binary()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(TaskId, RootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(RootDir), is_binary(ServiceId), is_binary(TarUrl) -> + gen_server:start_link(?MODULE, [TaskId, RootDir, ServiceId, TarUrl], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([TaskId, RootDir, ServiceId, TarUrl]) -> + {ok, #state{task_id = TaskId, root_dir = RootDir, service_id = ServiceId, tar_url = TarUrl}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({deploy, ReceiverPid}, State = #state{task_id = TaskId, root_dir = RootDir, service_id = ServiceId, tar_url = TarUrl}) -> + %% 创建目录 + {ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId), + case check_lock(ServiceRootDir, TarUrl) of + true -> + + {reply, ok, State}; + false -> + case check_download_url(TarUrl) of + ok -> + case download(binary_to_list(TarUrl), ServiceRootDir) of + {ok, TarFile} -> + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 1), + {ok, WorkDir} = make_work_dir(ServiceRootDir), + case tar_extract(TarFile, WorkDir) of + ok -> + %% 创建lock文件 + touch_lock(ServiceRootDir, TarUrl), + %% 更新数据 + ok = micro_service_model:insert(#micro_service{ + service_id = ServiceId, + tar_url = TarUrl, + %% 工作目录 + root_dir = ServiceRootDir, + params = <<"">>, + metrics = <<"">>, + %% 状态: 0: 停止, 1: 运行中 + status = 0 + }), + + %% 正常启动服务 + case boot_service(ServiceId) of + {ok, Pid} when is_pid(Pid) -> + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"boot">>, 1), + ok; + {error, Reason} -> + lager:debug("boot service get error: ~p", [Reason]), + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"boot">>, 0) + end; + {error, Reason} -> + ok + end; + {error, Reason} -> + lager:debug("xx"), + ok + end; + {error, Reason} -> + lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), + {reply, {error, <<"download url error">>}, State} + end + end, + + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec ensure_dirs(RootDir :: string(), ServerId :: binary()) -> {ok, ServerRootDir :: string()}. +ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) -> + %% 根目录 + ServiceRootDir = RootDir ++ "/" ++ binary_to_list(ServerId) ++ "/", + ok = filelib:ensure_dir(ServiceRootDir), + {ok, ServiceRootDir}. + +%% 工作逻辑,压缩文件需要解压到工作目录 +-spec make_work_dir(ServiceRootDir :: string()) -> {ok, WorkDir :: string()}. +make_work_dir(ServiceRootDir) when is_list(ServiceRootDir) -> + %% 工作逻辑,压缩文件需要解压到工作目录 + WorkDir = ServiceRootDir ++ "/work_dir/", + ok = filelib:ensure_dir(WorkDir), + {ok, WorkDir}. + +-spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). +check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> + FileName = DirName ++ ".efka.lock", + case filelib:is_file(FileName) of + true -> + {ok, Content} = file:read_file(FileName), + Content =:= TarUrl; + false -> + false + end. + +-spec touch_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). +touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> + FileName = DirName ++ ".efka.lock", + filelib:is_file(FileName) andalso file:delete(FileName), + case file:write_file(FileName, TarUrl) of + ok -> + true; + {error, _} -> + false + end. + +%% 通过head请求先判定下载地址是否正确 +check_download_url(Url) when is_binary(Url) -> + check_download_url(binary_to_list(Url)); +check_download_url(Url) when is_list(Url) -> + SslOpts = [ + {ssl, [ + % 完全禁用证书验证 + {verify, verify_none} + ]} + ], + case httpc:request(head, {Url, []}, SslOpts, [{sync, true}]) of + {ok, {{_, 200, "OK"}, _Headers, _}} -> + ok; + {error, Reason} -> + {error, Reason} + end. + +-spec boot_service(ServiceId :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +boot_service(ServiceId) -> + %% 正常启动服务 + case efka_micro_service:get_pid(ServiceId) of + undefined -> + efka_micro_service_sup:start_service(ServiceId); + Pid -> + {ok, Pid} + end. + +%% 解压文件到指定目录 +-spec tar_extract(TarFile :: string(), TargetDir :: string()) -> ok | {error, Reason :: term()}. +tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) -> + %% 判断文件的后缀名来判断 + Ext = filename:extension(TarFile), + case Ext of + ".tar" -> + erl_tar:extract(TarFile, [{cwd, TargetDir}, verbose]); + ".gz" -> + erl_tar:extract(TarFile, [compressed, {cwd, TargetDir}, verbose]) + end. + +%% 下载文件 +-spec download(Url :: string(), TargetDir :: string()) -> {ok, TarFile :: string()} | {error, Reason :: any()}. +download(Url, TargetDir) when is_list(Url), is_list(TargetDir) -> + SslOpts = [ + {ssl, [ + % 完全禁用证书验证 + {verify, verify_none} + ]} + ], + + TargetFile = get_filename_from_url(Url), + FullFilename = TargetDir ++ TargetFile, + + StartTs = os:timestamp(), + case httpc:request(get, {Url, []}, SslOpts, [{sync, false}, {stream, self}]) of + {ok, RequestId} -> + case receive_data(RequestId, FullFilename) of + ok -> + EndTs = os:timestamp(), + %% 计算操作的时间,单位为毫秒 + CostMs = timer:now_diff(EndTs, StartTs) div 1000, + lager:debug("[efka_downloader] download url: ~p, cost: ~p(ms)", [Url, CostMs]), + {ok, FullFilename}; + {error, Reason} -> + %% 出错需要删除掉文件 + file:delete(FullFilename), + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +%% 处理头部信息, 解析可能的文件名 +receive_data(RequestId, FullFilename) -> + receive + {http, {RequestId, stream_start, _Headers}} -> + {ok, File} = file:open(FullFilename, [write, binary]), + receive_data0(RequestId, File); + {http, {RequestId, {{_, 404, Status}, _Headers, Body}}} -> + lager:debug("[efka_downloader] http_status: ~p, body: ~p", [Status, Body]), + {error, Status} + end. +%% 接受文件数据 +receive_data0(RequestId, File) -> + receive + {http, {RequestId, {error, Reason}}} -> + ok = file:close(File), + {error, Reason}; + {http, {RequestId, stream_end, _Headers}} -> + ok = file:close(File), + ok; + {http, {RequestId, stream, Data}} -> + file:write(File, Data), + receive_data0(RequestId, File) + end. + +-spec get_filename_from_url(Url :: string()) -> string(). +get_filename_from_url(Url) when is_list(Url) -> + URIMap = uri_string:parse(Url), + Path = maps:get(path, URIMap), + filename:basename(Path). \ No newline at end of file diff --git a/apps/efka/src/efka_micro_service_sup.erl b/apps/efka/src/efka_micro_service_sup.erl index 243d2f4..159130f 100644 --- a/apps/efka/src/efka_micro_service_sup.erl +++ b/apps/efka/src/efka_micro_service_sup.erl @@ -52,9 +52,8 @@ init([]) -> Spec1 = child_spec(#micro_service{ service_id = <<"test1234">>, - from = <<"master">>, %% 工作目录 - work_dir = <<"/usr/local/code/tmp/test/">>, + root_dir = <<"/usr/local/code/tmp/test/">>, params = <<"">>, metrics = <<"">>, %% 状态: 0: 停止, 1: 运行中 @@ -67,9 +66,9 @@ init([]) -> %%% Internal functions %%%=================================================================== --spec start_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -start_service(Service) -> - case supervisor:start_child(?MODULE, child_spec(Service)) of +-spec start_service(ServiceId :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_service(ServiceId) when is_binary(ServiceId) -> + case supervisor:start_child(?MODULE, child_spec(ServiceId)) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; {error, {'already_started', Pid}} when is_pid(Pid) -> @@ -84,11 +83,11 @@ delete_service(ServiceId) when is_binary(ServiceId) -> ok = supervisor:terminate_child(?MODULE, ChildId), supervisor:delete_child(?MODULE, ChildId). -child_spec(S = #micro_service{service_id = ServiceId}) when is_binary(ServiceId) -> +child_spec(ServiceId) when is_binary(ServiceId) -> Name = efka_micro_service:get_name(ServiceId), #{ id => Name, - start => {efka_micro_service, start_link, [Name, S]}, + start => {efka_micro_service, start_link, [Name, ServiceId]}, restart => permanent, shutdown => 2000, type => worker,