From a4af3af2e45817122576f0bef5384c3d33ff37f3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 19 May 2025 23:34:24 +0800 Subject: [PATCH] remove lock.file --- apps/efka/src/efka_agent.erl | 7 +++---- apps/efka/src/efka_app.erl | 4 ---- apps/efka/src/efka_inetd.erl | 35 ++++++++----------------------- apps/efka/src/efka_inetd_task.erl | 17 ++------------- apps/efka/src/efka_transport.erl | 16 ++++++++------ config/sys.config | 4 ++-- 6 files changed, 26 insertions(+), 57 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 410ae77..0cf1b1a 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -169,7 +169,7 @@ handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, t case Reply of ok -> AuthBin = auth_request(), - efka_transport:auth_request(TransportPid, AuthBin, 50000), + efka_transport:auth_request(TransportPid, AuthBin), {noreply, State#state{status = ?STATE_AUTH}}; {error, Reason} -> lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]), @@ -184,8 +184,7 @@ handle_info({auth_reply, Reply}, State = #state{status = ?STATE_AUTH, transport_ #auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply), case Code of 0 -> - lager:debug("[efka_agent] auth failed, message: ~p", [Message]), - + lager:debug("[efka_agent] auth success, message: ~p", [Message]), %% 上传缓冲区里面的所有数据 CacheItems = cache_model:get_all_cache(), lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) -> @@ -293,7 +292,7 @@ handle_info({server_command, ?COMMAND_AUTH, <>}, State = #state{transpor {1, ?STATE_DENIED} -> %% 重新激活, 需要重新校验 AuthRequestBin = auth_request(), - efka_transport:auth_request(TransportPid, AuthRequestBin, 50000), + efka_transport:auth_request(TransportPid, AuthRequestBin), {noreply, State#state{status = ?STATE_AUTH}}; {0, _} -> %% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息 diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index 0e1dd43..9fbea81 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -15,10 +15,6 @@ start(_StartType, _StartArgs) -> start_mnesia(), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), - - %% 仓库的基础url - application:set_env(efka, repository_url, "http://118.178.229.213:3000/anlicheng/ekfa/"), - efka_sup:start_link(). stop(_State) -> diff --git a/apps/efka/src/efka_inetd.erl b/apps/efka/src/efka_inetd.erl index 63a4b1b..561bf45 100644 --- a/apps/efka/src/efka_inetd.erl +++ b/apps/efka/src/efka_inetd.erl @@ -55,8 +55,6 @@ start_link() -> {stop, Reason :: term()} | ignore). init([]) -> erlang:process_flag(trap_exit, true), - % Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive1/main.tar.gz", - {ok, RootDir} = application:get_env(efka, root_dir), {ok, #state{root_dir = RootDir}}. @@ -73,20 +71,16 @@ init([]) -> handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> %% 创建目录 {ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId), - case check_lock(ServiceRootDir, TarUrl) of - true -> - {reply, ok, State}; - false -> - case check_download_url(TarUrl) of - ok -> - {ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl), - efka_inetd_task:deploy(TaskPid), + case check_download_url(TarUrl) of + ok -> + {ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl), + efka_inetd_task:deploy(TaskPid), + lager:debug("[efka_inetd] start task_id: ~p, tar_url: ~p", [TaskId, TarUrl]), - {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; - {error, Reason} -> - lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), - {reply, {error, <<"download url error">>}, State} - end + {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; + {error, Reason} -> + lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), + {reply, {error, <<"download url error">>}, State} end; handle_call(_Request, _From, State = #state{}) -> @@ -156,17 +150,6 @@ ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) -> ok = filelib:ensure_dir(ServiceRootDir), {ok, ServiceRootDir}. --spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). -check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> - FileName = DirName ++ ".efka.lock", - case filelib:is_file(FileName) of - true -> - {ok, Content} = file:read_file(FileName), - Content =:= TarUrl; - false -> - false - end. - %% 通过head请求先判定下载地址是否正确 -spec check_download_url(Url :: string() | binary()) -> ok | {error, Reason :: term()}. check_download_url(Url) when is_binary(Url) -> diff --git a/apps/efka/src/efka_inetd_task.erl b/apps/efka/src/efka_inetd_task.erl index 49245ab..0fcda29 100644 --- a/apps/efka/src/efka_inetd_task.erl +++ b/apps/efka/src/efka_inetd_task.erl @@ -114,7 +114,7 @@ code_change(_OldVsn, State = #state{}, _Extra) -> do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) -> case download(binary_to_list(TarUrl), ServiceRootDir) of {ok, TarFile, CostTs} -> - Log = io_lib:format("download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]), + Log = io_lib:format("[efka_inetd_task] download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]), efka_inetd_task_log:stash(TaskId, list_to_binary(Log)), %% 创建工作目录 @@ -123,11 +123,9 @@ do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is %% 清理目录下的文件 Result = delete_directory(WorkDir), - lager:debug("delete_directory result is: ~p", [Result]), + lager:debug("[efka_inetd_task] delete_directory result is: ~p", [Result]), case tar_extract(TarFile, WorkDir) of ok -> - %% 创建lock文件 - touch_lock(ServiceRootDir, TarUrl), %% 更新数据 ok = service_model:insert(#service{ service_id = ServiceId, @@ -172,17 +170,6 @@ delete_directory(Dir) when is_list(Dir) -> {error, Reason} end. --spec touch_lock(string(), binary()) -> boolean(). -touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> - FileName = DirName ++ ".efka.lock", - filelib:is_file(FileName) andalso file:delete(FileName), - case file:write_file(FileName, TarUrl) of - ok -> - true; - {error, _} -> - false - end. - %% 解压文件到指定目录 -spec tar_extract(string(), string()) -> ok | {error, term()}. tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) -> diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 375b7f6..82aa873 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -15,7 +15,7 @@ %% API -export([start_link/3]). --export([connect/1, auth_request/3, send/3, async_call_reply/3, stop/1]). +-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -34,9 +34,9 @@ %%% API %%%=================================================================== --spec auth_request(Pid :: pid(), AuthBin :: binary(), Timeout :: integer()) -> no_return(). -auth_request(Pid, AuthBin, Timeout) when is_pid(Pid), is_binary(AuthBin), is_integer(Timeout) -> - gen_server:cast(Pid, {auth_request, AuthBin, Timeout}). +-spec auth_request(Pid :: pid(), AuthBin :: binary()) -> no_return(). +auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) -> + gen_server:cast(Pid, {auth_request, AuthBin}). -spec connect(Pid :: pid()) -> no_return(). connect(Pid) when is_pid(Pid) -> @@ -108,14 +108,18 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren end; %% auth校验 -handle_cast({auth_request, AuthRequestBin, Timeout}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) -> +handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) -> ok = ssl:send(Socket, <>), %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> ParentPid ! {auth_reply, {ok, ReplyBin}}, + {noreply, State#state{packet_id = PacketId + 1}}; + {ssl, Socket, Info} -> + lager:warning("[efka_transport] get invalid auth_reply: ~p", [Info]), + ParentPid ! {auth_reply, {error, invalid_auth_reply}}, {noreply, State#state{packet_id = PacketId + 1}} - after Timeout -> + after 5000 -> ParentPid ! {auth_reply, {error, timeout}}, {noreply, State#state{packet_id = PacketId + 1}} end; diff --git a/config/sys.config b/config/sys.config index f2a2668..2f237ab 100644 --- a/config/sys.config +++ b/config/sys.config @@ -1,6 +1,6 @@ [ {efka, [ - {root_dir, "/tmp/efka/"}, + {root_dir, "/usr/local/code/efka/"}, {tcp_server, [ {port, 18088} @@ -12,7 +12,7 @@ ]}, {auth, [ - {uuid, "144868a8964ed0259d02c335bddc82c2"}, + {uuid, "qbxmjyzrkpntfgswaevodhluicqzxplkm"}, {username, "test"}, {salt, "salt2345"}, {token, "token124"}