diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index eaa6495..192544c 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -45,6 +45,9 @@ %%%% 主动推送的消息类型子分类, 需要返回值 -define(PUSH_DEPLOY, 16#01). --define(PUSH_SERVICE_CONFIG, 16#02). --define(PUSH_INVOKE, 16#03). --define(PUSH_TASK_LOG, 16#04). +-define(PUSH_START_SERVICE, 16#02). +-define(PUSH_STOP_SERVICE, 16#03). + +-define(PUSH_SERVICE_CONFIG, 16#04). +-define(PUSH_INVOKE, 16#05). +-define(PUSH_TASK_LOG, 16#05). diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 0cf1b1a..720c168 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -225,11 +225,37 @@ handle_info({server_push, PacketId, <>}, State %% 短暂的等待,efka_inetd收到消息后就立即返回了 Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of ok -> - #async_call_reply{code = 1, message = <<"">>}; + #async_call_reply{code = 1, result = <<"ok">>}; {error, Reason} when is_binary(Reason) -> #async_call_reply{code = 0, message = Reason} end, - is_pid(TransportPid) andalso efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), + efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), + + {noreply, State}; + +%% 启动微服务 +handle_info({server_push, PacketId, <>}, State = #state{transport_pid = TransportPid}) -> + %% 短暂的等待,efka_inetd收到消息后就立即返回了 + Reply = case efka_inetd:start_service(ServiceId) of + ok -> + #async_call_reply{code = 1, result = <<"ok">>}; + {error, Reason} when is_binary(Reason) -> + #async_call_reply{code = 0, message = Reason} + end, + efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), + + {noreply, State}; + +%% 停止微服务 +handle_info({server_push, PacketId, <>}, State = #state{transport_pid = TransportPid}) -> + %% 短暂的等待,efka_inetd收到消息后就立即返回了 + Reply = case efka_inetd:stop_service(ServiceId) of + ok -> + #async_call_reply{code = 1, result = <<"ok">>}; + {error, Reason} when is_binary(Reason) -> + #async_call_reply{code = 0, message = Reason} + end, + efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), {noreply, State}; @@ -240,7 +266,7 @@ handle_info({server_push, PacketId, <> case efka_service:get_pid(ServiceId) of undefined -> Reply = #async_call_reply{code = 0, message = <<"service not run">>}, - is_pid(TransportPid) andalso efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), + efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), {noreply, State}; ServicePid when is_pid(ServicePid) -> Ref = make_ref(), diff --git a/apps/efka/src/efka_inetd_task.erl b/apps/efka/src/efka_inetd_task.erl index 12010e8..cc4176a 100644 --- a/apps/efka/src/efka_inetd_task.erl +++ b/apps/efka/src/efka_inetd_task.erl @@ -176,8 +176,8 @@ delete_directory(Dir) when is_list(Dir) -> %% 解压文件到指定目录 -spec tar_extract(string(), string()) -> ok | {error, term()}. tar_extract(TarFile, TargetDir) when is_list(TarFile), is_list(TargetDir) -> - %% 判断文件的后缀名来判断 - erl_tar:extract(TarFile, [compressed, {cwd, TargetDir}, verbose]). + %% 判断文件的后缀名来判断, options: verbose + erl_tar:extract(TarFile, [compressed, {cwd, TargetDir}]). %% 下载文件 -spec download(Url :: string(), TargetDir :: string()) -> diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 65cc549..6d053fa 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -88,6 +88,8 @@ start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([ServiceId]) -> + %% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用 + erlang:process_flag(trap_exit, true), case service_model:get_service(ServiceId) of error -> lager:notice("[efka_service] service_id: ~p, not found", [ServiceId]), @@ -189,7 +191,7 @@ handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, case efka_manifest:startup(Manifest) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_service] service_id: ~p, reboot success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), + lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), {noreply, State#state{port = Port, os_pid = OSPid}}; {error, Reason} -> lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ServiceId, Reason]), @@ -217,6 +219,12 @@ handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) try_reboot(), {noreply, State#state{port = undefined, os_pid = undefined}}; +%% 处理port的退出消息 +handle_info({'EXIT', Port, Reason}, State = #state{port = Port, service_id = ServiceId}) -> + lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ServiceId, Port, Reason]), + try_reboot(), + {noreply, State#state{port = undefined, os_pid = undefined}}; + %% 处理channel进程的退出 handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]), diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index 31957d8..33c62e3 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -46,7 +46,9 @@ connect(Pid) when is_pid(Pid) -> send(Pid, Method, Packet) when is_pid(Pid), is_integer(Method), is_binary(Packet) -> gen_server:cast(Pid, {send, Method, Packet}). --spec async_call_reply(Pid :: pid(), PacketId :: integer(), Response :: binary()) -> no_return(). +-spec async_call_reply(Pid :: pid() | undefined, PacketId :: integer(), Response :: binary()) -> no_return(). +async_call_reply(undefined, PacketId, Response) when is_integer(PacketId), is_binary(Response) -> + ok; async_call_reply(Pid, PacketId, Response) when is_pid(Pid), is_integer(PacketId), is_binary(Response) -> gen_server:cast(Pid, {async_call_reply, PacketId, Response}). diff --git a/apps/efka/src/mnesia/service_model.erl b/apps/efka/src/mnesia/service_model.erl index 0c2d79e..721d39b 100644 --- a/apps/efka/src/mnesia/service_model.erl +++ b/apps/efka/src/mnesia/service_model.erl @@ -155,7 +155,7 @@ get_running_services() -> display_services() -> F = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB), E]), + Q = qlc:q([E || E <- mnesia:table(?TAB)]), qlc:e(Q) end, case mnesia:transaction(F) of