This commit is contained in:
anlicheng 2025-05-20 12:31:28 +08:00
parent b13122674e
commit 2aca3fff45
6 changed files with 50 additions and 11 deletions

View File

@ -45,6 +45,9 @@
%%%% ,
-define(PUSH_DEPLOY, 16#01).
-define(PUSH_SERVICE_CONFIG, 16#02).
-define(PUSH_INVOKE, 16#03).
-define(PUSH_TASK_LOG, 16#04).
-define(PUSH_START_SERVICE, 16#02).
-define(PUSH_STOP_SERVICE, 16#03).
-define(PUSH_SERVICE_CONFIG, 16#04).
-define(PUSH_INVOKE, 16#05).
-define(PUSH_TASK_LOG, 16#05).

View File

@ -225,11 +225,37 @@ handle_info({server_push, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, State
%% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of
ok ->
#async_call_reply{code = 1, message = <<"">>};
#async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#async_call_reply{code = 0, message = Reason}
end,
is_pid(TransportPid) andalso efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%%
handle_info({server_push, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:start_service(ServiceId) of
ok ->
#async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#async_call_reply{code = 0, message = Reason}
end,
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
%%
handle_info({server_push, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, State = #state{transport_pid = TransportPid}) ->
%% efka_inetd收到消息后就立即返回了
Reply = case efka_inetd:stop_service(ServiceId) of
ok ->
#async_call_reply{code = 1, result = <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#async_call_reply{code = 0, message = Reason}
end,
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
@ -240,7 +266,7 @@ handle_info({server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>
case efka_service:get_pid(ServiceId) of
undefined ->
Reply = #async_call_reply{code = 0, message = <<"service not run">>},
is_pid(TransportPid) andalso efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),

View File

@ -176,8 +176,8 @@ delete_directory(Dir) when is_list(Dir) ->
%%
-spec tar_extract(string(), string()) -> ok | {error, term()}.
tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) ->
%%
erl_tar:extract(TarFile, [compressed, {cwd, TargetDir}, verbose]).
%% , options: verbose
erl_tar:extract(TarFile, [compressed, {cwd, TargetDir}]).
%%
-spec download(Url :: string(), TargetDir :: string()) ->

View File

@ -88,6 +88,8 @@ start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ServiceId]) ->
%% supervisor进程通过exit(ChildPid, shutdown)terminate函数被调用
erlang:process_flag(trap_exit, true),
case service_model:get_service(ServiceId) of
error ->
lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]),
@ -189,7 +191,7 @@ handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId,
case efka_manifest:startup(Manifest) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_service] service_id: ~p, reboot success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
{noreply, State#state{port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ServiceId, Reason]),
@ -217,6 +219,12 @@ handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId})
try_reboot(),
{noreply, State#state{port = undefined, os_pid = undefined}};
%% port的退出消息
handle_info({'EXIT', Port, Reason}, State = #state{port = Port, service_id = ServiceId}) ->
lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ServiceId, Port, Reason]),
try_reboot(),
{noreply, State#state{port = undefined, os_pid = undefined}};
%% channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]),

View File

@ -46,7 +46,9 @@ connect(Pid) when is_pid(Pid) ->
send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) ->
gen_server:cast(Pid, {send, Method, Packet}).
-spec async_call_reply(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return().
-spec async_call_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return().
async_call_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) ->
ok;
async_call_reply(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) ->
gen_server:cast(Pid, {async_call_reply, PacketId, Response}).

View File

@ -155,7 +155,7 @@ get_running_services() ->
display_services() ->
F = fun() ->
Q = qlc:q([E || E <- mnesia:table(?TAB), E]),
Q = qlc:q([E || E <- mnesia:table(?TAB)]),
qlc:e(Q)
end,
case mnesia:transaction(F) of