diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index 5ca2f10..f31ce01 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -13,6 +13,8 @@ service_id :: binary(), service_name :: binary(), from :: binary(), + %% 工作目录 + work_dir :: binary(), params :: binary(), metrics :: binary(), %% 状态: 0: 停止, 1: 运行中 diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 7a14921..c9d9875 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -158,7 +158,7 @@ handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, ?STATE_ %% transport进程退出 handle_event(info, {'EXIT', TransportPid, Reason}, _StateName, State = #state{transport_pid = TransportPid}) -> - efka_logger:warning("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), + efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), erlang:start_timer(500000, self(), create_transport), {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}; diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index b4ba9fd..fb5426f 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -9,22 +9,25 @@ %%%------------------------------------------------------------------- -module(efka_micro_service). -author("anlicheng"). +-include("efka_tables.hrl"). -behaviour(gen_server). %% API -export([start_link/2]). --export([get_name/1, start_service/1, stop_service/1]). +-export([get_name/1, start_service/1, stop_service/1, attach_channel/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - server_id :: binary(), + service :: #micro_service{}, %% 通道id信息 channel_pid :: pid() | undefined, %% 当前服务的状态 - status = 0 + status = 0, + %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) + port :: undefined | port() }). %%%=================================================================== @@ -33,7 +36,11 @@ -spec get_name(ServerId :: binary()) -> atom(). get_name(ServerId) when is_binary(ServerId) -> - list_to_atom("efka_server:" ++ binary_to_list(ServerId)). + list_to_atom("efka_service:" ++ binary_to_list(ServerId)). + +-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. +attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> + gen_server:call(Pid, {attach_channel, ChannelPid}). start_service(Pid) when is_pid(Pid) -> gen_server:call(Pid, start_service). @@ -42,10 +49,10 @@ stop_service(Pid) when is_pid(Pid) -> gen_server:call(Pid, stop_service). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Name :: atom(), ServerId :: binary()) -> +-spec(start_link(Name :: atom(), Service :: #micro_service{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, ServerId) when is_atom(Name), is_binary(ServerId) -> - gen_server:start_link({local, Name}, ?MODULE, [ServerId], []). +start_link(Name, Service = #micro_service{}) when is_atom(Name) -> + gen_server:start_link({local, Name}, ?MODULE, [Service], []). %%%=================================================================== %%% gen_server callbacks @@ -56,8 +63,9 @@ start_link(Name, ServerId) when is_atom(Name), is_binary(ServerId) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ServerId]) -> - {ok, #state{server_id = ServerId}}. +init([Service]) -> + erlang:start_timer(0, self(), boot_service), + {ok, #state{service = Service}}. %% @private %% @doc Handling call messages @@ -69,6 +77,25 @@ init([ServerId]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). +%% 绑定channel +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = undefined}) -> + erlang:monitor(process, ChannelPid), + {reply, ok, State#state{channel_pid = ChannelPid}}; + +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service = Service}) when is_pid(OldChannelPid) -> + lager:notice("[efka_micro_service] service_id: ~p, attach_channel old channel exists: ~p", [ Service#micro_service.service_id, OldChannelPid]), + {reply, {error, <<"channel exists">>}, State#state{channel_pid = ChannelPid}}; + +%% 启动服务 +handle_call(start_service, _From, State = #state{service = Service = #micro_service{service_id = ServiceId}}) -> + ok = micro_service_model:start_service(ServiceId), + {reply, ok, State#state{service = Service#micro_service{status = 0}}}; + +%% 停止服务 +handle_call(stop_service, _From, State = #state{service = Service = #micro_service{service_id = ServiceId}}) -> + ok = micro_service_model:stop_service(ServiceId), + {reply, ok, State#state{service = Service#micro_service{status = 1}}}; + handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -87,8 +114,47 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> - {noreply, State}. +handle_info({timeout, _, boot_service}, State = #state{service = #micro_service{work_dir = WorkDir0}}) -> + WorkDir = binary_to_list(WorkDir0), + {ok, ManifestInfo} = file:read_file(WorkDir ++ "manifest.json"), + Manifest = jiffy:decode(ManifestInfo, [return_maps]), + + lager:debug("Manifest: ~p", [Manifest]), + case maps:find(<<"exec">>, Manifest) of + error -> + lager:debug("not start command"), + {noreply, State}; + {ok, ExecCmd0} -> + PortSettings = [ + {cd, WorkDir}, + exit_status + ], + PortSettings1 = case maps:find(<<"args">>, Manifest) of + error -> + PortSettings; + {ok, Args} -> + [{args, [binary_to_list(A) || A <- Args]}|PortSettings] + end, + + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), + lager:debug("real: ~p", [RealExecCmd]), + + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings1), + lager:debug("port is: ~p", [Port]), + {noreply, State#state{port = Port}} + end; + +handle_info({Port, {data, Data}}, State = #state{port = Port, service = Service}) -> + lager:debug("[efka_micro_service] service_id: ~p, port exit with code: ~p", [Service#micro_service.service_id, Data]), + {noreply, State}; +%% 处理port的消息 +handle_info({Port, {exit_status, Code}}, State = #state{port = Port, service = Service}) -> + lager:debug("[efka_micro_service] service_id: ~p, port exit with code: ~p", [Service#micro_service.service_id, Code]), + %erlang:start_timer(5000, self(), boot_service), + %erlang:port_close(Port), + + {noreply, State#state{port = undefined}}. %% @private %% @doc This function is called by a gen_server when it is about to @@ -97,7 +163,17 @@ handle_info(_Info, State = #state{}) -> %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> +terminate(Reason, _State = #state{port = Port}) -> + lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]), + case is_port(Port) of + true -> + case erlang:port_info(Port, os_pid) of + undefined -> + ok; + {os_pid, OsPid} -> + os:cmd(io_lib:format("kill -9 ~p", [OsPid])) + end + end, ok. %% @private diff --git a/apps/efka/src/efka_micro_service_sup.erl b/apps/efka/src/efka_micro_service_sup.erl index 9f4222a..361e53e 100644 --- a/apps/efka/src/efka_micro_service_sup.erl +++ b/apps/efka/src/efka_micro_service_sup.erl @@ -50,7 +50,19 @@ init([]) -> MicroServices = micro_service_model:get_running_services(), Specs = lists:map(fun(Service) -> child_spec(Service) end, MicroServices), - {ok, {SupFlags, Specs}}. + Spec1 = child_spec(#micro_service{ + service_id = <<"test1234">>, + service_name = <<"测试服务">>, + from = <<"master">>, + %% 工作目录 + work_dir = <<"/tmp/test/">>, + params = <<"">>, + metrics = <<"">>, + %% 状态: 0: 停止, 1: 运行中 + status = 1 + }), + + {ok, {SupFlags, [Spec1|Specs]}}. %%%=================================================================== %%% Internal functions diff --git a/apps/efka/src/mnesia/micro_service_model.erl b/apps/efka/src/mnesia/micro_service_model.erl index 1d17b13..f44cdbe 100644 --- a/apps/efka/src/mnesia/micro_service_model.erl +++ b/apps/efka/src/mnesia/micro_service_model.erl @@ -35,10 +35,36 @@ insert(MicroService = #micro_service{}) -> end. stop_service(ServiceId) when is_binary(ServiceId) -> - ok. + Fun = fun() -> + case mnesia:read(?TAB, ServiceId, write) of + [] -> + mnesia:abort(<<"service not found">>); + [Service] -> + mnesia:write(?TAB, Service#micro_service{status = 0}, write) + end + end, + case mnesia:transaction(Fun) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. start_service(ServiceId) when is_binary(ServiceId) -> - ok. + Fun = fun() -> + case mnesia:read(?TAB, ServiceId, write) of + [] -> + mnesia:abort(<<"service not found">>); + [S] -> + mnesia:write(?TAB, S#micro_service{status = 1}, write) + end + end, + case mnesia:transaction(Fun) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. -spec get_running_services() -> [#micro_service{}]. get_running_services() ->