This commit is contained in:
anlicheng 2025-05-06 15:32:05 +08:00
parent ba391f4b6e
commit f9059e4934
4 changed files with 62 additions and 16 deletions

View File

@ -11,7 +11,6 @@
%%
-record(micro_service, {
service_id :: binary(),
service_name :: binary(),
from :: binary(),
%%
work_dir :: binary(),

View File

@ -27,12 +27,12 @@
%%%===================================================================
test() ->
{ok, Pid} = start_monitor(),
Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive/main.tar.gz",
{ok, {Pid, _}} = start_monitor(),
Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive1/main.tar.gz",
TargetDir = "/tmp/",
Ref = download(Pid, Url, TargetDir),
download(Pid, Url, TargetDir),
receive
{download_response, Ref, Info} ->
Info ->
efka_logger:debug("info is: ~p", [Info])
end,
ok.
@ -97,7 +97,8 @@ handle_cast({download, Url, TargetDir}, State = #state{}) ->
EndTs = os:timestamp(),
%%
CostMs = timer:now_diff(EndTs, StartTs) div 1000,
{stop, {ok, CostMs}, State};
lager:debug("[efka_downloader] download url: ~p, cost: ~p(ms)", [Url, CostMs]),
{stop, normal, State};
{error, Reason} ->
%%
file:delete(FullFilename),
@ -143,7 +144,10 @@ receive_data(RequestId, FullFilename) ->
receive
{http, {RequestId, stream_start, _Headers}} ->
{ok, File} = file:open(FullFilename, [write, binary]),
receive_data1(RequestId, File)
receive_data1(RequestId, File);
{http, {RequestId, {{_, 404, Status}, _Headers, Body}}} ->
lager:debug("[efka_downloader] http_status: ~p, body: ~p", [Status, Body]),
{error, Status}
end.
%%
receive_data1(RequestId, File) ->

View File

@ -10,6 +10,7 @@
%%%-------------------------------------------------------------------
-module(efka_inetd).
-author("anlicheng").
-include("efka_tables.hrl").
-behaviour(gen_server).
@ -68,8 +69,8 @@ init([]) ->
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({deploy, TaskId, ServerId, From}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) ->
%%
{ok, ServerRootDir} = ensure_dirs(RootDir, ServerId, From),
case check_lock(ServerRootDir) of
{ok, WorkDir} = ensure_dirs(RootDir, ServerId, From),
case check_lock(WorkDir) of
true ->
{reply, ok, State};
false ->
@ -77,9 +78,19 @@ handle_call({deploy, TaskId, ServerId, From}, _From, State = #state{root_dir = R
case check_download_url(DownloadUrl) of
ok ->
{ok, {TaskPid, MRef}} = efka_downloader:start_monitor(),
efka_downloader:download(TaskPid, DownloadUrl, ServerRootDir),
{reply, ok, State#state{task_map = maps:put(MRef, TaskId, TaskMap)}};
efka_downloader:download(TaskPid, DownloadUrl, WorkDir),
%%
Service = #micro_service{
service_id = ServerId,
from = From,
%%
work_dir = list_to_binary(WorkDir),
params = <<"">>,
metrics = <<"">>,
%% 0: , 1:
status = 0
},
{reply, ok, State#state{task_map = maps:put(MRef, {TaskId, Service}, TaskMap)}};
{error, Reason} ->
lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [DownloadUrl, Reason]),
{reply, {error, <<"download url error">>}, State}
@ -104,6 +115,28 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'DOWN', Ref, process, _Pid, Reason}, State = #state{task_map = TaskMap}) ->
case maps:take(Ref, TaskMap) of
error ->
{noreply, State};
{{TaskId, Service}, NTaskMap} ->
lager:debug("[efka_inetd] task_id: ~p", [TaskId]),
%% taskId的执行进度
case Reason of
normal ->
%%
case start_service(Service) of
{ok, Pid} ->
micro_service_model:insert(Service#micro_service{status = 1});
{error, Reason} ->
micro_service_model:insert(Service#micro_service{status = 0})
end;
{error, Reason} ->
lager:debug("[efka_inetd] service: ~p, download get error: ~p", [Reason]),
{noreply, State#state{task_map = NTaskMap}}
end
end;
handle_info(_Info, State = #state{}) ->
{noreply, State}.
@ -175,4 +208,14 @@ make_download_url(From) when is_binary(From) ->
{ok, BaseUrl} = application:get_env(efka, repository_url),
From1 = binary_to_list(From),
Basename = lists:flatten(string:replace(From1, ":", "-")) ++ ".tar.gz",
BaseUrl ++ Basename.
BaseUrl ++ Basename.
-spec start_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
start_service(S = #micro_service{service_id = ServiceId}) ->
%%
case efka_micro_service:get_pid(ServiceId) of
undefined ->
efka_micro_service_sup:start_service(S);
Pid ->
{ok, Pid}
end.

View File

@ -14,7 +14,7 @@
%% API
-export([start_link/0]).
-export([register_service/1, delete_service/1]).
-export([start_service/1, delete_service/1]).
%% Supervisor callbacks
-export([init/1]).
@ -68,8 +68,8 @@ init([]) ->
%%% Internal functions
%%%===================================================================
-spec register_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
register_service(Service) ->
-spec start_service(Service :: #micro_service{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
start_service(Service) ->
case supervisor:start_child(?MODULE, child_spec(Service)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};