remove lock.file
This commit is contained in:
parent
7705cd1f71
commit
a4af3af2e4
@ -169,7 +169,7 @@ handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, t
|
|||||||
case Reply of
|
case Reply of
|
||||||
ok ->
|
ok ->
|
||||||
AuthBin = auth_request(),
|
AuthBin = auth_request(),
|
||||||
efka_transport:auth_request(TransportPid, AuthBin, 50000),
|
efka_transport:auth_request(TransportPid, AuthBin),
|
||||||
{noreply, State#state{status = ?STATE_AUTH}};
|
{noreply, State#state{status = ?STATE_AUTH}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
|
lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
|
||||||
@ -184,8 +184,7 @@ handle_info({auth_reply, Reply}, State = #state{status = ?STATE_AUTH, transport_
|
|||||||
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
|
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
|
||||||
case Code of
|
case Code of
|
||||||
0 ->
|
0 ->
|
||||||
lager:debug("[efka_agent] auth failed, message: ~p", [Message]),
|
lager:debug("[efka_agent] auth success, message: ~p", [Message]),
|
||||||
|
|
||||||
%% 上传缓冲区里面的所有数据
|
%% 上传缓冲区里面的所有数据
|
||||||
CacheItems = cache_model:get_all_cache(),
|
CacheItems = cache_model:get_all_cache(),
|
||||||
lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) ->
|
lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) ->
|
||||||
@ -293,7 +292,7 @@ handle_info({server_command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transpor
|
|||||||
{1, ?STATE_DENIED} ->
|
{1, ?STATE_DENIED} ->
|
||||||
%% 重新激活, 需要重新校验
|
%% 重新激活, 需要重新校验
|
||||||
AuthRequestBin = auth_request(),
|
AuthRequestBin = auth_request(),
|
||||||
efka_transport:auth_request(TransportPid, AuthRequestBin, 50000),
|
efka_transport:auth_request(TransportPid, AuthRequestBin),
|
||||||
{noreply, State#state{status = ?STATE_AUTH}};
|
{noreply, State#state{status = ?STATE_AUTH}};
|
||||||
{0, _} ->
|
{0, _} ->
|
||||||
%% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息
|
%% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息
|
||||||
|
|||||||
@ -15,10 +15,6 @@ start(_StartType, _StartArgs) ->
|
|||||||
start_mnesia(),
|
start_mnesia(),
|
||||||
%% 加速内存的回收
|
%% 加速内存的回收
|
||||||
erlang:system_flag(fullsweep_after, 16),
|
erlang:system_flag(fullsweep_after, 16),
|
||||||
|
|
||||||
%% 仓库的基础url
|
|
||||||
application:set_env(efka, repository_url, "http://118.178.229.213:3000/anlicheng/ekfa/"),
|
|
||||||
|
|
||||||
efka_sup:start_link().
|
efka_sup:start_link().
|
||||||
|
|
||||||
stop(_State) ->
|
stop(_State) ->
|
||||||
|
|||||||
@ -55,8 +55,6 @@ start_link() ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
% Url = "http://118.178.229.213:3000/anlicheng/ekfa/archive1/main.tar.gz",
|
|
||||||
|
|
||||||
{ok, RootDir} = application:get_env(efka, root_dir),
|
{ok, RootDir} = application:get_env(efka, root_dir),
|
||||||
{ok, #state{root_dir = RootDir}}.
|
{ok, #state{root_dir = RootDir}}.
|
||||||
|
|
||||||
@ -73,20 +71,16 @@ init([]) ->
|
|||||||
handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) ->
|
handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) ->
|
||||||
%% 创建目录
|
%% 创建目录
|
||||||
{ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId),
|
{ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId),
|
||||||
case check_lock(ServiceRootDir, TarUrl) of
|
case check_download_url(TarUrl) of
|
||||||
true ->
|
ok ->
|
||||||
{reply, ok, State};
|
{ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl),
|
||||||
false ->
|
efka_inetd_task:deploy(TaskPid),
|
||||||
case check_download_url(TarUrl) of
|
lager:debug("[efka_inetd] start task_id: ~p, tar_url: ~p", [TaskId, TarUrl]),
|
||||||
ok ->
|
|
||||||
{ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl),
|
|
||||||
efka_inetd_task:deploy(TaskPid),
|
|
||||||
|
|
||||||
{reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}};
|
{reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]),
|
lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]),
|
||||||
{reply, {error, <<"download url error">>}, State}
|
{reply, {error, <<"download url error">>}, State}
|
||||||
end
|
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_call(_Request, _From, State = #state{}) ->
|
handle_call(_Request, _From, State = #state{}) ->
|
||||||
@ -156,17 +150,6 @@ ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) ->
|
|||||||
ok = filelib:ensure_dir(ServiceRootDir),
|
ok = filelib:ensure_dir(ServiceRootDir),
|
||||||
{ok, ServiceRootDir}.
|
{ok, ServiceRootDir}.
|
||||||
|
|
||||||
-spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean().
|
|
||||||
check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) ->
|
|
||||||
FileName = DirName ++ ".efka.lock",
|
|
||||||
case filelib:is_file(FileName) of
|
|
||||||
true ->
|
|
||||||
{ok, Content} = file:read_file(FileName),
|
|
||||||
Content =:= TarUrl;
|
|
||||||
false ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% 通过head请求先判定下载地址是否正确
|
%% 通过head请求先判定下载地址是否正确
|
||||||
-spec check_download_url(Url :: string() | binary()) -> ok | {error, Reason :: term()}.
|
-spec check_download_url(Url :: string() | binary()) -> ok | {error, Reason :: term()}.
|
||||||
check_download_url(Url) when is_binary(Url) ->
|
check_download_url(Url) when is_binary(Url) ->
|
||||||
|
|||||||
@ -114,7 +114,7 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) ->
|
do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) ->
|
||||||
case download(binary_to_list(TarUrl), ServiceRootDir) of
|
case download(binary_to_list(TarUrl), ServiceRootDir) of
|
||||||
{ok, TarFile, CostTs} ->
|
{ok, TarFile, CostTs} ->
|
||||||
Log = io_lib:format("download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]),
|
Log = io_lib:format("[efka_inetd_task] download: ~p completed, cost time: ~p(ms)", [binary_to_list(TarUrl), CostTs]),
|
||||||
efka_inetd_task_log:stash(TaskId, list_to_binary(Log)),
|
efka_inetd_task_log:stash(TaskId, list_to_binary(Log)),
|
||||||
|
|
||||||
%% 创建工作目录
|
%% 创建工作目录
|
||||||
@ -123,11 +123,9 @@ do_deploy(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is
|
|||||||
|
|
||||||
%% 清理目录下的文件
|
%% 清理目录下的文件
|
||||||
Result = delete_directory(WorkDir),
|
Result = delete_directory(WorkDir),
|
||||||
lager:debug("delete_directory result is: ~p", [Result]),
|
lager:debug("[efka_inetd_task] delete_directory result is: ~p", [Result]),
|
||||||
case tar_extract(TarFile, WorkDir) of
|
case tar_extract(TarFile, WorkDir) of
|
||||||
ok ->
|
ok ->
|
||||||
%% 创建lock文件
|
|
||||||
touch_lock(ServiceRootDir, TarUrl),
|
|
||||||
%% 更新数据
|
%% 更新数据
|
||||||
ok = service_model:insert(#service{
|
ok = service_model:insert(#service{
|
||||||
service_id = ServiceId,
|
service_id = ServiceId,
|
||||||
@ -172,17 +170,6 @@ delete_directory(Dir) when is_list(Dir) ->
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
-spec touch_lock(string(), binary()) -> boolean().
|
|
||||||
touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) ->
|
|
||||||
FileName = DirName ++ ".efka.lock",
|
|
||||||
filelib:is_file(FileName) andalso file:delete(FileName),
|
|
||||||
case file:write_file(FileName, TarUrl) of
|
|
||||||
ok ->
|
|
||||||
true;
|
|
||||||
{error, _} ->
|
|
||||||
false
|
|
||||||
end.
|
|
||||||
|
|
||||||
%% 解压文件到指定目录
|
%% 解压文件到指定目录
|
||||||
-spec tar_extract(string(), string()) -> ok | {error, term()}.
|
-spec tar_extract(string(), string()) -> ok | {error, term()}.
|
||||||
tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) ->
|
tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) ->
|
||||||
|
|||||||
@ -15,7 +15,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/3]).
|
-export([start_link/3]).
|
||||||
-export([connect/1, auth_request/3, send/3, async_call_reply/3, stop/1]).
|
-export([connect/1, auth_request/2, send/3, async_call_reply/3, stop/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
@ -34,9 +34,9 @@
|
|||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec auth_request(Pid :: pid(), AuthBin :: binary(), Timeout :: integer()) -> no_return().
|
-spec auth_request(Pid :: pid(), AuthBin :: binary()) -> no_return().
|
||||||
auth_request(Pid, AuthBin, Timeout) when is_pid(Pid), is_binary(AuthBin), is_integer(Timeout) ->
|
auth_request(Pid, AuthBin) when is_pid(Pid), is_binary(AuthBin) ->
|
||||||
gen_server:cast(Pid, {auth_request, AuthBin, Timeout}).
|
gen_server:cast(Pid, {auth_request, AuthBin}).
|
||||||
|
|
||||||
-spec connect(Pid :: pid()) -> no_return().
|
-spec connect(Pid :: pid()) -> no_return().
|
||||||
connect(Pid) when is_pid(Pid) ->
|
connect(Pid) when is_pid(Pid) ->
|
||||||
@ -108,14 +108,18 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% auth校验
|
%% auth校验
|
||||||
handle_cast({auth_request, AuthRequestBin, Timeout}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) ->
|
handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) ->
|
||||||
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH, AuthRequestBin/binary>>),
|
ok = ssl:send(Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH, AuthRequestBin/binary>>),
|
||||||
%% 需要等待auth返回的结果
|
%% 需要等待auth返回的结果
|
||||||
receive
|
receive
|
||||||
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
|
{ssl, Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH, ReplyBin/binary>>} ->
|
||||||
ParentPid ! {auth_reply, {ok, ReplyBin}},
|
ParentPid ! {auth_reply, {ok, ReplyBin}},
|
||||||
|
{noreply, State#state{packet_id = PacketId + 1}};
|
||||||
|
{ssl, Socket, Info} ->
|
||||||
|
lager:warning("[efka_transport] get invalid auth_reply: ~p", [Info]),
|
||||||
|
ParentPid ! {auth_reply, {error, invalid_auth_reply}},
|
||||||
{noreply, State#state{packet_id = PacketId + 1}}
|
{noreply, State#state{packet_id = PacketId + 1}}
|
||||||
after Timeout ->
|
after 5000 ->
|
||||||
ParentPid ! {auth_reply, {error, timeout}},
|
ParentPid ! {auth_reply, {error, timeout}},
|
||||||
{noreply, State#state{packet_id = PacketId + 1}}
|
{noreply, State#state{packet_id = PacketId + 1}}
|
||||||
end;
|
end;
|
||||||
|
|||||||
@ -1,6 +1,6 @@
|
|||||||
[
|
[
|
||||||
{efka, [
|
{efka, [
|
||||||
{root_dir, "/tmp/efka/"},
|
{root_dir, "/usr/local/code/efka/"},
|
||||||
|
|
||||||
{tcp_server, [
|
{tcp_server, [
|
||||||
{port, 18088}
|
{port, 18088}
|
||||||
@ -12,7 +12,7 @@
|
|||||||
]},
|
]},
|
||||||
|
|
||||||
{auth, [
|
{auth, [
|
||||||
{uuid, "144868a8964ed0259d02c335bddc82c2"},
|
{uuid, "qbxmjyzrkpntfgswaevodhluicqzxplkm"},
|
||||||
{username, "test"},
|
{username, "test"},
|
||||||
{salt, "salt2345"},
|
{salt, "salt2345"},
|
||||||
{token, "token124"}
|
{token, "token124"}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user