diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 71affba..326b8da 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -36,6 +36,8 @@ %% 系统对应的pid os_pid :: undefined | integer(), + %% 配置信息 + manifest = #{}, %% 当前服务的运行状态 running_status = ?STATUS_STOPPED }). @@ -87,22 +89,37 @@ start_link(Name, Service = #micro_service{}) when is_atom(Name) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Service = #micro_service{service_id = ServiceId, status = Status}]) -> - %% 数据的状态和运行状态是2回事 - case Status == 1 of - true -> - case boot_service(Service) of - {ok, Port} -> - {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), - {ok, #state{service = Service, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; +init([Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0, status = Status}]) -> + WorkDir = binary_to_list(WorkDir0), + case file:read_file(WorkDir ++ "manifest.json") of + {ok, ManifestInfo} -> + Manifest = jiffy:decode(ManifestInfo, [return_maps]), + case check_manifest(Manifest) of + ok -> + %% 数据的状态和运行状态是2回事 + case Status == 1 of + true -> + StartCmd = maps:get(<<"start">>, Manifest), + case boot_service(WorkDir0, StartCmd) of + {ok, Port} -> + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), + {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; + {error, Reason} -> + lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} + end; + false -> + lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), + {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} + end; {error, Reason} -> - lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), - {ok, #state{service = Service, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} + lager:notice("[efka_micro_service] service: ~p, check manifest.json get error: ~p", [ServiceId, Reason]), + ignore end; - false -> - lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), - {ok, #state{service = Service, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} + {error, Reason} -> + lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), + ignore end. %% @private @@ -130,9 +147,10 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; -handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, service = Service = #micro_service{service_id = ServiceId}}) -> +handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service = Service = #micro_service{work_dir = WorkDir0, service_id = ServiceId}}) -> %% 异步启动服务 - case boot_service(Service) of + StartCmd = maps:get(<<"start">>, Manifest), + case boot_service(WorkDir0, StartCmd) of {ok, Port} -> %% 更新数据库状态 micro_service_model:start_service(ServiceId), @@ -149,9 +167,18 @@ handle_call(stop_service, _From, State = #state{running_status = ?STATUS_STOPPED lager:debug("stop service port: ~p, os_pid: ~p", [Port, OSPid]), {reply, {error, <<"service not running">>}, State}; -handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service = #micro_service{service_id = ServiceId}}) when is_port(Port) -> +handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, manifest = Manifest, + service = Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0}}) when is_port(Port) -> + + %% 优先使用微服务提供的stop指令, 没有提供的情况下,使用kill指令 + case maps:find(<<"stop">>, Manifest) of + error -> + kill_os_pid(OSPid); + {ok, StopCmd} -> + kill_service(WorkDir0, StopCmd, OSPid) + end, + micro_service_model:stop_service(ServiceId), - kill_os_pid(OSPid), erlang:is_port(Port) andalso erlang:port_close(Port), lager:debug("port: ~p, os_pid: ~p, will closed", [Port, OSPid]), @@ -180,8 +207,9 @@ handle_cast(_Request, State = #state{}) -> handle_info({timeout, _, reboot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), {noreply, State}; -handle_info({timeout, _, reboot_service}, State = #state{service = Service = #micro_service{service_id = ServiceId, status = 1}}) -> - case boot_service(Service) of +handle_info({timeout, _, reboot_service}, State = #state{manifest = Manifest, service = #micro_service{work_dir = WorkDir0, service_id = ServiceId, status = 1}}) -> + StartCmd = maps:get(<<"start">>, Manifest), + case boot_service(WorkDir0, StartCmd) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), @@ -230,6 +258,86 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +%% 检查配置是否合法 +-spec check_manifest(Manifest :: map()) -> ok | {error, Reason :: binary()}. +check_manifest(Manifest) when is_map(Manifest) -> + RequiredKeys = [<<"serivce_id">>, <<"start">>, <<"health_check">>, <<"stop">>], + check_manifest0(RequiredKeys, Manifest). + +check_manifest0([], _Manifest) -> + true; +check_manifest0([<<"serivce_id">>|T], Manifest) -> + case maps:find(<<"serivce_id">>, Manifest) of + error -> + {error, <<"miss serivce_id">>}; + {ok, Val} when is_binary(Val) -> + check_manifest0(T, Manifest); + {ok, _} -> + {error, <<"serivce_id is not string">>} + end; +check_manifest0([<<"health_check">>|T], Manifest) -> + case maps:find(<<"health_check">>, Manifest) of + error -> + {error, <<"miss health_check">>}; + {ok, Url} when is_binary(Url) -> + case is_url(Url) of + true -> + check_manifest0(T, Manifest); + false -> + {error, <<"health_check is not a invalid url">>} + end; + {ok, _} -> + {error, <<"health_check is not string">>} + end; +check_manifest0([<<"start">>|T], Manifest) -> + case maps:find(<<"start">>, Manifest) of + error -> + {error, <<"miss start">>}; + {ok, Cmd} when is_binary(Cmd) -> + %% 不能包含空格 + case binary:match(Cmd, <<" ">>) of + nomatch -> + check_manifest0(T, Manifest); + _ -> + {error, <<"start cmd cannot contain args">>} + end; + %% 对参数项目不进行检查 + {ok, [Cmd|_Args]} when is_binary(Cmd) -> + %% 不能包含空格 + case binary:match(Cmd, <<" ">>) of + nomatch -> + check_manifest0(T, Manifest); + _ -> + {error, <<"start cmd cannot contain args">>} + end; + {ok, _} -> + {error, <<"start is not string">>} + end; +check_manifest0([<<"stop">>|T], Manifest) -> + case maps:find(<<"stop">>, Manifest) of + error -> + check_manifest0(T, Manifest); + {ok, Cmd} when is_binary(Cmd) -> + %% 不能包含空格 + case binary:match(Cmd, <<" ">>) of + nomatch -> + check_manifest0(T, Manifest); + _ -> + {error, <<"stop cmd cannot contain args">>} + end; + %% 对参数项目不进行检查 + {ok, [Cmd|_Args]} when is_binary(Cmd) -> + %% 不能包含空格 + case binary:match(Cmd, <<" ">>) of + nomatch -> + check_manifest0(T, Manifest); + _ -> + {error, <<"stop cmd cannot contain args">>} + end; + {ok, _} -> + {error, <<"stop is not string">>} + end. + %% 关闭系统进程 -spec kill_os_pid(port() | undefined) -> no_return(). kill_os_pid(undefined) -> @@ -239,38 +347,75 @@ kill_os_pid(OSPid) when is_integer(OSPid) -> lager:debug("kill cmd is: ~p", [Cmd]), os:cmd(Cmd). -%% 启动微服务 --spec boot_service(MicroService :: #micro_service{}) -> {ok, Port :: port()} | {error, Reason :: binary()}. -boot_service(#micro_service{work_dir = WorkDir0, service_id = ServiceId}) -> - lager:debug("who call me here: ~p", ["funck"]), +%% 执行命令 +-spec kill_service(WorkDir0 :: binary(), ExecCmd0 :: binary() | [binary()], OSPid :: integer()) -> no_return(). +kill_service(WorkDir0, ExecCmd0, OSPid) when is_binary(WorkDir0), is_binary(ExecCmd0) -> WorkDir = binary_to_list(WorkDir0), - case file:read_file(WorkDir ++ "manifest.json") of - {ok, ManifestInfo} -> - Manifest = jiffy:decode(ManifestInfo, [return_maps]), - case maps:find(<<"exec">>, Manifest) of - error -> - {error, <<"manifest `exec` not found">>}; - {ok, ExecCmd0} -> - PortSettings = [ - {cd, WorkDir}, - exit_status - ], - PortSettings1 = case maps:find(<<"args">>, Manifest) of - error -> - PortSettings; - {ok, Args} -> - [{args, [binary_to_list(A) || A <- Args]}|PortSettings] - end, + PortSettings = [ + {cd, WorkDir}, + exit_status + ], - ExecCmd = binary_to_list(ExecCmd0), - RealExecCmd = filename:absname_join(WorkDir, ExecCmd), - lager:debug("[efka_micro_service] service: ~p, real_exec: ~p", [ServiceId, RealExecCmd]), + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings), + receive + {Port, {exit_status, Code}} -> + lager:debug("[service] exit with code: ~p", [Code]) + after 5000 -> + kill_os_pid(OSPid) + end, + is_port(Port) andalso erlang:port_close(Port); +kill_service(WorkDir0, [ExecCmd0|Args], OSPid) when is_binary(WorkDir0), is_binary(ExecCmd0) -> + WorkDir = binary_to_list(WorkDir0), + PortSettings = [ + {cd, WorkDir}, + {args, [binary_to_list(A) || A <- Args]}, + exit_status + ], + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), - Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings1), - {ok, Port} + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings), + receive + {Port, {exit_status, Code}} -> + lager:debug("[service] exit with code: ~p", [Code]) + after 5000 -> + kill_os_pid(OSPid) + end, + is_port(Port) andalso erlang:port_close(Port). - end; - {error, Reason} -> - lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), - {error, <<"manifest.json not found">>} +%% 启动微服务 +-spec boot_service(WorkDir :: binary(), ExecCmd :: binary() | [binary()]) -> {ok, Port :: port()} | {error, Reason :: binary()}. +boot_service(WorkDir0, ExecCmd0) when is_binary(WorkDir0), is_binary(ExecCmd0) -> + WorkDir = binary_to_list(WorkDir0), + PortSettings = [ + {cd, WorkDir}, + exit_status + ], + + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings), + {ok, Port}; +boot_service(WorkDir0, [ExecCmd0|Args]) when is_binary(WorkDir0), is_binary(ExecCmd0) -> + WorkDir = binary_to_list(WorkDir0), + PortSettings = [ + {cd, WorkDir}, + {args, [binary_to_list(A) || A <- Args]}, + exit_status + ], + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), + + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings), + {ok, Port}. + +-spec is_url(binary()) -> boolean(). +is_url(Input) when is_binary(Input) -> + try + uri_string:parse(Input), + true + catch + _:_ -> false end. \ No newline at end of file