fix inetd

This commit is contained in:
anlicheng 2025-05-09 16:40:15 +08:00
parent 41c1a7cf78
commit ef56328f70
5 changed files with 99 additions and 105 deletions

View File

@ -75,11 +75,13 @@ request_config() ->
-spec device_offline(DeviceUUID :: binary()) -> no_return(). -spec device_offline(DeviceUUID :: binary()) -> no_return().
device_offline(DeviceUUID) when is_binary(DeviceUUID) -> device_offline(DeviceUUID) when is_binary(DeviceUUID) ->
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}). EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}, [force_utf8]),
send_event(1, EventBody).
-spec device_online(DeviceUUID :: binary()) -> no_return(). -spec device_online(DeviceUUID :: binary()) -> no_return().
device_online(DeviceUUID) when is_binary(DeviceUUID) -> device_online(DeviceUUID) when is_binary(DeviceUUID) ->
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}). EventBody = jiffy:encode(#{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}, [force_utf8]),
send_event(1, EventBody).
-spec send_event(EventType :: integer(), Params :: binary()) -> no_return(). -spec send_event(EventType :: integer(), Params :: binary()) -> no_return().
send_event(EventType, Params) when is_integer(EventType), is_binary(Params) -> send_event(EventType, Params) when is_integer(EventType), is_binary(Params) ->

View File

@ -243,7 +243,7 @@ handle_info({server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, State
{error, Reason} when is_binary(Reason) -> {error, Reason} when is_binary(Reason) ->
#push_reply{code = 0, message = Reason} #push_reply{code = 0, message = Reason}
end, end,
is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), is_pid(TransportPid) andalso efka_transport:push_response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State}; {noreply, State};
@ -254,7 +254,7 @@ handle_info({server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>
case efka_service:get_pid(ServiceId) of case efka_service:get_pid(ServiceId) of
undefined -> undefined ->
Reply = #push_reply{code = 0, message = <<"service not run">>}, Reply = #push_reply{code = 0, message = <<"service not run">>},
is_pid(TransportPid) andalso efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), is_pid(TransportPid) andalso efka_transport:push_response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State}; {noreply, State};
ServicePid when is_pid(ServicePid) -> ServicePid when is_pid(ServicePid) ->
Ref = make_ref(), Ref = make_ref(),
@ -264,8 +264,7 @@ handle_info({server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end, end;
{noreply, State};
%% %%
handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}}, handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}},
@ -276,7 +275,8 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay
case efka_service:get_pid(ServiceId) of case efka_service:get_pid(ServiceId) of
undefined -> undefined ->
Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>}, Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>},
safe_push_response(PacketId, message_pb:encode_msg(Reply), State); safe_push_response(PacketId, message_pb:encode_msg(Reply), State),
{noreply, State};
ServicePid when is_pid(ServicePid) -> ServicePid when is_pid(ServicePid) ->
Ref = make_ref(), Ref = make_ref(),
efka_service:invoke(ServicePid, Ref, Payload), efka_service:invoke(ServicePid, Ref, Payload),
@ -284,9 +284,7 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, pay
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end, end;
{noreply, State};
%% %%
handle_info({server_command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transport_pid = TransportPid, status = Status}) -> handle_info({server_command, ?COMMAND_AUTH, <<Auth:8>>}, State = #state{transport_pid = TransportPid, status = Status}) ->

View File

@ -71,10 +71,23 @@ init([]) ->
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> handle_call({deploy, TaskId, ServiceId, TarUrl}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) ->
{ok, TaskPid} = efka_inetd_task:start_link(TaskId, RootDir, ServiceId, TarUrl), %%
efka_inetd_task:deploy(TaskPid), {ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId),
case check_lock(ServiceRootDir, TarUrl) of
true ->
{reply, ok, State};
false ->
case check_download_url(TarUrl) of
ok ->
{ok, TaskPid} = efka_inetd_task:start_link(TaskId, ServiceRootDir, ServiceId, TarUrl),
efka_inetd_task:deploy(TaskPid),
{reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}}; {reply, ok, State#state{task_map = maps:put(TaskPid, {TaskId, ServiceId}, TaskMap)}};
{error, Reason} ->
lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]),
{reply, {error, <<"download url error">>}, State}
end
end;
handle_call(_Request, _From, State = #state{}) -> handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}. {reply, ok, State}.
@ -136,3 +149,38 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
-spec ensure_dirs(RootDir :: string(), ServerId :: binary()) -> {ok, ServerRootDir :: string()}.
ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) ->
%%
ServiceRootDir = RootDir ++ "/" ++ binary_to_list(ServerId) ++ "/",
ok = filelib:ensure_dir(ServiceRootDir),
{ok, ServiceRootDir}.
-spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean().
check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) ->
FileName = DirName ++ ".efka.lock",
case filelib:is_file(FileName) of
true ->
{ok, Content} = file:read_file(FileName),
Content =:= TarUrl;
false ->
false
end.
%% head请求先判定下载地址是否正确
-spec check_download_url(Url :: string() | binary()) -> ok | {error, Reason :: term()}.
check_download_url(Url) when is_binary(Url) ->
check_download_url(binary_to_list(Url));
check_download_url(Url) when is_list(Url) ->
SslOpts = [
{ssl, [
%
{verify, verify_none}
]}
],
case httpc:request(head, {Url, []}, SslOpts, [{sync, true}]) of
{ok, {{_, 200, "OK"}, _Headers, _}} ->
ok;
{error, Reason} ->
{error, Reason}
end.

View File

@ -22,7 +22,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-record(state, { -record(state, {
root_dir :: string(), service_root_dir :: string(),
task_id :: integer(), task_id :: integer(),
service_id :: binary(), service_id :: binary(),
tar_url :: binary() tar_url :: binary()
@ -37,10 +37,10 @@ deploy(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, deploy). gen_server:cast(Pid, deploy).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link(TaskId :: integer(), RootDir :: string(), ServiceId :: binary(), TarUrl :: binary()) -> -spec(start_link(TaskId :: integer(), ServiceRootDir :: string(), ServiceId :: binary(), TarUrl :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}). {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(TaskId, RootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(RootDir), is_binary(ServiceId), is_binary(TarUrl) -> start_link(TaskId, ServiceRootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(ServiceRootDir), is_binary(ServiceId), is_binary(TarUrl) ->
gen_server:start_link(?MODULE, [TaskId, RootDir, ServiceId, TarUrl], []). gen_server:start_link(?MODULE, [TaskId, ServiceRootDir, ServiceId, TarUrl], []).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -51,8 +51,8 @@ start_link(TaskId, RootDir, ServiceId, TarUrl) when is_integer(TaskId), is_list(
-spec(init(Args :: term()) -> -spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([TaskId, RootDir, ServiceId, TarUrl]) -> init([TaskId, ServiceRootDir, ServiceId, TarUrl]) ->
{ok, #state{task_id = TaskId, root_dir = RootDir, service_id = ServiceId, tar_url = TarUrl}}. {ok, #state{task_id = TaskId, service_root_dir = ServiceRootDir, service_id = ServiceId, tar_url = TarUrl}}.
%% @private %% @private
%% @doc Handling call messages %% @doc Handling call messages
@ -73,53 +73,38 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast(deploy, State = #state{task_id = TaskId, root_dir = RootDir, service_id = ServiceId, tar_url = TarUrl}) -> handle_cast(deploy, State = #state{task_id = TaskId, service_root_dir = ServiceRootDir, service_id = ServiceId, tar_url = TarUrl}) ->
%% case download(binary_to_list(TarUrl), ServiceRootDir) of
{ok, ServiceRootDir} = ensure_dirs(RootDir, ServiceId), {ok, TarFile} ->
case check_lock(ServiceRootDir, TarUrl) of efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download completed">>),
true -> {ok, WorkDir} = make_work_dir(ServiceRootDir),
{stop, normal, State}; %%
false -> Result = delete_directory(WorkDir),
case check_download_url(TarUrl) of lager:debug("delete_directory result is: ~p", [Result]),
case tar_extract(TarFile, WorkDir) of
ok -> ok ->
case download(binary_to_list(TarUrl), ServiceRootDir) of %% lock文件
{ok, TarFile} -> touch_lock(ServiceRootDir, TarUrl),
efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download completed">>), %%
{ok, WorkDir} = make_work_dir(ServiceRootDir), ok = service_model:insert(#service{
%% service_id = ServiceId,
Result = delete_directory(WorkDir), tar_url = TarUrl,
lager:debug("delete_directory result is: ~p", [Result]), %%
root_dir = ServiceRootDir,
case tar_extract(TarFile, WorkDir) of params = <<"">>,
ok -> metrics = <<"">>,
%% lock文件 %% 0: , 1:
touch_lock(ServiceRootDir, TarUrl), status = 0
%% }),
ok = service_model:insert(#service{ efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"deploy success">>),
service_id = ServiceId, {stop, normal, State};
tar_url = TarUrl,
%%
root_dir = ServiceRootDir,
params = <<"">>,
metrics = <<"">>,
%% 0: , 1:
status = 0
}),
efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"deploy success">>),
{stop, normal, State};
{error, Reason} ->
efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"tar decompression error">>),
{stop, {error, Reason}, State}
end;
{error, Reason} ->
efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download error">>),
{stop, {error, Reason}, State}
end;
{error, Reason} -> {error, Reason} ->
lager:debug("[efka_inetd] check_download_url: ~p, get error: ~p", [TarUrl, Reason]), efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"tar decompression error">>),
efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download url error">>), {stop, {error, Reason}, State}
{stop, {error, <<"download url error">>}, State} end;
end {error, Reason} ->
efka_agent:feedback_phase(TaskId, efka_util:timestamp(), <<"download error">>),
{stop, {error, Reason}, State}
end. end.
%% @private %% @private
@ -153,13 +138,6 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
-spec ensure_dirs(RootDir :: string(), ServerId :: binary()) -> {ok, ServerRootDir :: string()}.
ensure_dirs(RootDir, ServerId) when is_list(RootDir), is_binary(ServerId) ->
%%
ServiceRootDir = RootDir ++ "/" ++ binary_to_list(ServerId) ++ "/",
ok = filelib:ensure_dir(ServiceRootDir),
{ok, ServiceRootDir}.
%% %%
-spec make_work_dir(ServiceRootDir :: string()) -> {ok, WorkDir :: string()}. -spec make_work_dir(ServiceRootDir :: string()) -> {ok, WorkDir :: string()}.
make_work_dir(ServiceRootDir) when is_list(ServiceRootDir) -> make_work_dir(ServiceRootDir) when is_list(ServiceRootDir) ->
@ -191,17 +169,6 @@ delete_directory(Dir) when is_list(Dir) ->
{error, Reason} {error, Reason}
end. end.
-spec check_lock(DirName :: string(), TarUrl :: binary()) -> boolean().
check_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) ->
FileName = DirName ++ ".efka.lock",
case filelib:is_file(FileName) of
true ->
{ok, Content} = file:read_file(FileName),
Content =:= TarUrl;
false ->
false
end.
-spec touch_lock(DirName :: string(), TarUrl :: binary()) -> boolean(). -spec touch_lock(DirName :: string(), TarUrl :: binary()) -> boolean().
touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) -> touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) ->
FileName = DirName ++ ".efka.lock", FileName = DirName ++ ".efka.lock",
@ -213,23 +180,6 @@ touch_lock(DirName, TarUrl) when is_list(DirName), is_binary(TarUrl) ->
false false
end. end.
%% head请求先判定下载地址是否正确
check_download_url(Url) when is_binary(Url) ->
check_download_url(binary_to_list(Url));
check_download_url(Url) when is_list(Url) ->
SslOpts = [
{ssl, [
%
{verify, verify_none}
]}
],
case httpc:request(head, {Url, []}, SslOpts, [{sync, true}]) of
{ok, {{_, 200, "OK"}, _Headers, _}} ->
ok;
{error, Reason} ->
{error, Reason}
end.
%% %%
-spec tar_extract(TarFile :: string(), TargetDir :: string()) -> ok | {error, Reason :: term()}. -spec tar_extract(TarFile :: string(), TargetDir :: string()) -> ok | {error, Reason :: term()}.
tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) -> tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) ->
@ -243,7 +193,7 @@ tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) ->
end. end.
%% %%
-spec download(Url :: string(), TargetDir :: string()) -> {ok, TarFile :: string()} | {error, Reason :: any()}. -spec download(Url :: string(), TargetDir :: string()) -> {ok, TarFile :: string()} | {error, Reason :: term()}.
download(Url, TargetDir) when is_list(Url), is_list(TargetDir) -> download(Url, TargetDir) when is_list(Url), is_list(TargetDir) ->
SslOpts = [ SslOpts = [
{ssl, [ {ssl, [

View File

@ -20,7 +20,7 @@
%% API %% API
-export([start_link/2]). -export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
-export([push_config/3, request_config/1]). -export([push_config/3, request_config/1, invoke/3]).
-export([metric_data/3, send_event/3]). -export([metric_data/3, send_event/3]).
%% gen_server callbacks %% gen_server callbacks
@ -76,10 +76,6 @@ invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload
request_config(Pid) when is_pid(Pid) -> request_config(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_config). gen_server:call(Pid, request_config).
%% todo
async_request(Pid, Method, Params) when is_binary(Method), is_binary(Params) ->
ok.
metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) -> metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) ->
gen_server:cast(Pid, {metric_data, DeviceUUID, Data}). gen_server:cast(Pid, {metric_data, DeviceUUID, Data}).