From a741eaa6f7e566aa21a421842da165dc97646a3d Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 7 May 2025 15:21:11 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_downloader.erl | 25 +++++++++++++++++-------- apps/efka/src/efka_inetd.erl | 31 ++++++++++++++++++++++--------- 2 files changed, 39 insertions(+), 17 deletions(-) diff --git a/apps/efka/src/efka_downloader.erl b/apps/efka/src/efka_downloader.erl index 5027d28..17f8f84 100644 --- a/apps/efka/src/efka_downloader.erl +++ b/apps/efka/src/efka_downloader.erl @@ -12,7 +12,7 @@ -behaviour(gen_server). %% API --export([start_monitor/0, download/3]). +-export([start_monitor/0, download/4]). -export([test/0]). %% gen_server callbacks @@ -30,16 +30,19 @@ test() -> {ok, {Pid, _}} = start_monitor(), Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive1/main.tar.gz", TargetDir = "/tmp/", - download(Pid, Url, TargetDir), + + Ref = make_ref(), + download(Pid, Ref, Url, TargetDir), receive Info -> efka_logger:debug("info is: ~p", [Info]) end, ok. --spec download(Pid :: pid(), Url :: string(), TargetDir :: string()) -> reference(). -download(Pid, Url, TargetDir) when is_pid(Pid), is_list(Url), is_list(TargetDir) -> - gen_server:cast(Pid, {download, Url, TargetDir}). +-spec download(Pid :: pid(), Ref :: reference(), Url :: string(), TargetDir :: string()) -> reference(). +download(Pid, Ref, Url, TargetDir) when is_pid(Pid), is_reference(Ref), is_list(Url), is_list(TargetDir) -> + ReceiverPid = self(), + gen_server:cast(Pid, {download, Ref, ReceiverPid, Url, TargetDir}). %% @doc Spawns the server and registers the local name (unique) -spec(start_monitor() -> @@ -78,7 +81,7 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({download, Url, TargetDir}, State = #state{}) -> +handle_cast({download, Ref, ReceiverPid, Url, TargetDir}, State = #state{}) -> SslOpts = [ {ssl, [ % 完全禁用证书验证 @@ -98,14 +101,20 @@ handle_cast({download, Url, TargetDir}, State = #state{}) -> %% 计算操作的时间,单位为毫秒 CostMs = timer:now_diff(EndTs, StartTs) div 1000, lager:debug("[efka_downloader] download url: ~p, cost: ~p(ms)", [Url, CostMs]), + ReceiverPid ! {downloader_reply, Ref, ok}, + {stop, normal, State}; {error, Reason} -> %% 出错需要删除掉文件 file:delete(FullFilename), - {stop, {error, Reason}, State} + ReceiverPid ! {downloader_reply, Ref, {error, Reason}}, + + {stop, normal, State} end; {error, Reason} -> - {stop, {error, Reason}, State} + ReceiverPid ! {downloader_reply, Ref, {error, Reason}}, + + {stop, normal, State} end. %% @private diff --git a/apps/efka/src/efka_inetd.erl b/apps/efka/src/efka_inetd.erl index 23a3d11..7501bdd 100644 --- a/apps/efka/src/efka_inetd.erl +++ b/apps/efka/src/efka_inetd.erl @@ -77,8 +77,9 @@ handle_call({deploy, TaskId, ServerId, TarUrl}, _From, State = #state{root_dir = false -> case check_download_url(TarUrl) of ok -> - {ok, {TaskPid, MRef}} = efka_downloader:start_monitor(), - efka_downloader:download(TaskPid, binary_to_list(TarUrl), WorkDir), + {ok, {TaskPid, Ref}} = efka_downloader:start_monitor(), + efka_downloader:download(TaskPid, Ref, binary_to_list(TarUrl), WorkDir), + %% 保存服务的上下文 Service = #micro_service{ service_id = ServerId, @@ -90,7 +91,7 @@ handle_call({deploy, TaskId, ServerId, TarUrl}, _From, State = #state{root_dir = %% 状态: 0: 停止, 1: 运行中 status = 0 }, - {reply, ok, State#state{task_map = maps:put(MRef, {TaskId, Service}, TaskMap)}}; + {reply, ok, State#state{task_map = maps:put(Ref, {TaskId, Service}, TaskMap)}}; {error, Reason} -> lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), {reply, {error, <<"download url error">>}, State} @@ -115,11 +116,11 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({'DOWN', Ref, process, _Pid, normal}, State = #state{task_map = TaskMap}) -> +%% 下载完整 +handle_info({downloader_reply, Ref, ok}, State = #state{task_map = TaskMap}) -> {{TaskId, Service}, NTaskMap} = maps:take(Ref, TaskMap), lager:debug("[efka_inetd] task_id: ~p", [TaskId]), %% 汇报taskId的执行进度 - %% 下载完整 efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 1), %% 正常启动服务 case start_service(Service) of @@ -129,18 +130,30 @@ handle_info({'DOWN', Ref, process, _Pid, normal}, State = #state{task_map = Task micro_service_model:insert(Service#micro_service{status = 1}); {error, Reason} -> lager:debug("[efka_inetd] boot service get error: ~p", [Reason]), - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"boot">>, 0, <<"boot failed">>), + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"boot">>, 0), micro_service_model:insert(Service#micro_service{status = 0}) end, {noreply, State#state{task_map = NTaskMap}}; -handle_info({'DOWN', Ref, process, _Pid, {error, Reason}}, State = #state{task_map = TaskMap}) -> +%% 下载失败 +handle_info({downloader_reply, Ref, {error, Reason}}, State = #state{task_map = TaskMap}) -> {{TaskId, _Service}, NTaskMap} = maps:take(Ref, TaskMap), - lager:debug("[efka_inetd] service: ~p, download get error: ~p", [Reason]), %% 下载完整 - efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 0, <<"下载失败">>), + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 0), + {noreply, State#state{task_map = NTaskMap}}; + +%% 下载任务异常退出 +handle_info({'DOWN', _Ref, process, _Pid, normal}, State) -> + {noreply, State}; + +%% 下载任务异常退出 +handle_info({'DOWN', Ref, process, _Pid, {error, Reason}}, State = #state{task_map = TaskMap}) -> + {{TaskId, _Service}, NTaskMap} = maps:take(Ref, TaskMap), + lager:debug("[efka_inetd] service: ~p, download get error: ~p", [Reason]), + %% 下载完整 + efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download">>, 0), {noreply, State#state{task_map = NTaskMap}};