diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index 25dbd56..6333c9a 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -11,7 +11,6 @@ %% 用来保存微服务 -record(micro_service, { service_id :: binary(), - service_name :: binary(), from :: binary(), %% 工作目录 work_dir :: binary(), diff --git a/apps/efka/src/efka_downloader.erl b/apps/efka/src/efka_downloader.erl index 378ed8d..5027d28 100644 --- a/apps/efka/src/efka_downloader.erl +++ b/apps/efka/src/efka_downloader.erl @@ -27,12 +27,12 @@ %%%=================================================================== test() -> - {ok, Pid} = start_monitor(), - Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive/main.tar.gz", + {ok, {Pid, _}} = start_monitor(), + Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive1/main.tar.gz", TargetDir = "/tmp/", - Ref = download(Pid, Url, TargetDir), + download(Pid, Url, TargetDir), receive - {download_response, Ref, Info} -> + Info -> efka_logger:debug("info is: ~p", [Info]) end, ok. @@ -97,7 +97,8 @@ handle_cast({download, Url, TargetDir}, State = #state{}) -> EndTs = os:timestamp(), %% 计算操作的时间,单位为毫秒 CostMs = timer:now_diff(EndTs, StartTs) div 1000, - {stop, {ok, CostMs}, State}; + lager:debug("[efka_downloader] download url: ~p, cost: ~p(ms)", [Url, CostMs]), + {stop, normal, State}; {error, Reason} -> %% 出错需要删除掉文件 file:delete(FullFilename), @@ -143,7 +144,10 @@ receive_data(RequestId, FullFilename) -> receive {http, {RequestId, stream_start, _Headers}} -> {ok, File} = file:open(FullFilename, [write, binary]), - receive_data1(RequestId, File) + receive_data1(RequestId, File); + {http, {RequestId, {{_, 404, Status}, _Headers, Body}}} -> + lager:debug("[efka_downloader] http_status: ~p, body: ~p", [Status, Body]), + {error, Status} end. %% 接受文件数据 receive_data1(RequestId, File) -> diff --git a/apps/efka/src/efka_inetd.erl b/apps/efka/src/efka_inetd.erl index b3439f8..d3482cb 100644 --- a/apps/efka/src/efka_inetd.erl +++ b/apps/efka/src/efka_inetd.erl @@ -10,6 +10,7 @@ %%%------------------------------------------------------------------- -module(efka_inetd). -author("anlicheng"). +-include("efka_tables.hrl"). -behaviour(gen_server). @@ -68,8 +69,8 @@ init([]) -> {stop, Reason :: term(), NewState :: #state{}}). handle_call({deploy, TaskId, ServerId, From}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> %% 创建目录 - {ok, ServerRootDir} = ensure_dirs(RootDir, ServerId, From), - case check_lock(ServerRootDir) of + {ok, WorkDir} = ensure_dirs(RootDir, ServerId, From), + case check_lock(WorkDir) of true -> {reply, ok, State}; false -> @@ -77,9 +78,19 @@ handle_call({deploy, TaskId, ServerId, From}, _From, State = #state{root_dir = R case check_download_url(DownloadUrl) of ok -> {ok, {TaskPid, MRef}} = efka_downloader:start_monitor(), - efka_downloader:download(TaskPid, DownloadUrl, ServerRootDir), - {reply, ok, State#state{task_map = maps:put(MRef, TaskId, TaskMap)}}; - + efka_downloader:download(TaskPid, DownloadUrl, WorkDir), + %% 保存服务的上下文 + Service = #micro_service{ + service_id = ServerId, + from = From, + %% 工作目录 + work_dir = list_to_binary(WorkDir), + params = <<"">>, + metrics = <<"">>, + %% 状态: 0: 停止, 1: 运行中 + status = 0 + }, + {reply, ok, State#state{task_map = maps:put(MRef, {TaskId, Service}, TaskMap)}}; {error, Reason} -> lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [DownloadUrl, Reason]), {reply, {error, <<"download url error">>}, State} @@ -104,6 +115,28 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +handle_info({'DOWN', Ref, process, _Pid, Reason}, State = #state{task_map = TaskMap}) -> + case maps:take(Ref, TaskMap) of + error -> + {noreply, State}; + {{TaskId, Service}, NTaskMap} -> + lager:debug("[efka_inetd] task_id: ~p", [TaskId]), + %% 汇报taskId的执行进度 + case Reason of + normal -> + %% 正常启动服务 + case start_service(Service) of + {ok, Pid} -> + micro_service_model:insert(Service#micro_service{status = 1}); + {error, Reason} -> + micro_service_model:insert(Service#micro_service{status = 0}) + end; + {error, Reason} -> + lager:debug("[efka_inetd] service: ~p, download get error: ~p", [Reason]), + {noreply, State#state{task_map = NTaskMap}} + end + end; + handle_info(_Info, State = #state{}) -> {noreply, State}. @@ -175,4 +208,14 @@ make_download_url(From) when is_binary(From) -> {ok, BaseUrl} = application:get_env(efka, repository_url), From1 = binary_to_list(From), Basename = lists:flatten(string:replace(From1, ":", "-")) ++ ".tar.gz", - BaseUrl ++ Basename. \ No newline at end of file + BaseUrl ++ Basename. + +-spec start_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_service(S = #micro_service{service_id = ServiceId}) -> + %% 正常启动服务 + case efka_micro_service:get_pid(ServiceId) of + undefined -> + efka_micro_service_sup:start_service(S); + Pid -> + {ok, Pid} + end. \ No newline at end of file diff --git a/apps/efka/src/efka_micro_service_sup.erl b/apps/efka/src/efka_micro_service_sup.erl index b003989..3639ba9 100644 --- a/apps/efka/src/efka_micro_service_sup.erl +++ b/apps/efka/src/efka_micro_service_sup.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([register_service/1, delete_service/1]). +-export([start_service/1, delete_service/1]). %% Supervisor callbacks -export([init/1]). @@ -68,8 +68,8 @@ init([]) -> %%% Internal functions %%%=================================================================== --spec register_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -register_service(Service) -> +-spec start_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. +start_service(Service) -> case supervisor:start_child(?MODULE, child_spec(Service)) of {ok, Pid} when is_pid(Pid) -> {ok, Pid};