diff --git a/apps/efka/src/efka_manifest.erl b/apps/efka/src/efka_manifest.erl new file mode 100644 index 0000000..61a2746 --- /dev/null +++ b/apps/efka/src/efka_manifest.erl @@ -0,0 +1,120 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 05. 5月 2025 22:39 +%%%------------------------------------------------------------------- +-module(efka_manifest). +-author("anlicheng"). + +-record(manifest, { + work_dir = "" :: string(), + service_id = <<"">> :: binary(), + exec = <<"">>:: binary(), + args = [], + health_check = <<"">> +}). + +-type manifest() :: #manifest{}. + +-export_type([manifest/0]). + +%% API +-export([new/1, startup/1, kill_os_pid/1]). + +-spec new(WorkDir0 :: binary()) -> {ok, #manifest{}} | {error, Reason :: binary()}. +new(WorkDir0) when is_binary(WorkDir0) -> + WorkDir = binary_to_list(WorkDir0), + case file:read_file(WorkDir ++ "manifest.json") of + {ok, ManifestInfo} -> + Manifest = catch jiffy:decode(ManifestInfo, [return_maps]), + case check_manifest(Manifest) of + {ok, Manifest} -> + {ok, Manifest#manifest{work_dir = WorkDir}}; + {error, Reason} -> + {error, Reason} + end; + {error, Reason} -> + {error, Reason} + end. + +-spec startup(Manifest :: #manifest{}) -> {ok, Port :: port()} | {error, Reason :: binary()}. +startup(#manifest{work_dir = WorkDir, exec = ExecCmd0, args = Args0}) -> + PortSettings = [ + {cd, WorkDir}, + {args, [binary_to_list(A) || A <- Args0]}, + exit_status + ], + ExecCmd = binary_to_list(ExecCmd0), + RealExecCmd = filename:absname_join(WorkDir, ExecCmd), + + Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings), + {ok, Port}. + +%% 检查配置是否合法 +-spec check_manifest(Manifest :: map()) -> ok | {error, Reason :: binary()}. +check_manifest(Manifest) when is_map(Manifest) -> + RequiredKeys = [<<"serivce_id">>, <<"exec">>, <<"args">>, <<"health_check">>], + check_manifest0(RequiredKeys, Manifest, #manifest{}); +check_manifest(_Manifest) -> + {error, <<"invalid manifest json">>}. + +check_manifest0([], _Settings, Manifest) -> + {ok, Manifest}; +check_manifest0([<<"serivce_id">>|T], Settings, Manifest) -> + case maps:find(<<"serivce_id">>, Settings) of + error -> + {error, <<"miss serivce_id">>}; + {ok, ServiceId} when is_binary(ServiceId) -> + check_manifest0(T, Settings, Manifest#manifest{service_id = ServiceId}); + {ok, _} -> + {error, <<"serivce_id is not string">>} + end; +check_manifest0([<<"health_check">>|T], Settings, Manifest) -> + case maps:find(<<"health_check">>, Settings) of + error -> + {error, <<"miss health_check">>}; + {ok, Url} when is_binary(Url) -> + case is_url(Url) of + true -> + check_manifest0(T, Settings, Manifest#manifest{health_check = Url}); + false -> + {error, <<"health_check is not a invalid url">>} + end; + {ok, _} -> + {error, <<"health_check is not string">>} + end; +check_manifest0([<<"exec">>|T], Settings, Manifest) -> + case maps:find(<<"exec">>, Settings) of + error -> + {error, <<"miss start">>}; + {ok, Exec} when is_binary(Exec) -> + %% 不能包含空格 + case binary:match(Exec, <<" ">>) of + nomatch -> + check_manifest0(T, Settings, Manifest#manifest{exec = Exec}); + _ -> + {error, <<"start cmd cannot contain args">>} + end + end; +check_manifest0([<<"args">>|T], Settings, Manifest) -> + case maps:find(<<"args">>, Settings) of + error -> + check_manifest0(T, Settings, Manifest#manifest{args = []}); + %% 对参数项目不进行检查 + {ok, Args} when is_list(Args) -> + check_manifest0(T, Settings, Manifest#manifest{args = Args}); + {ok, _} -> + {error, <<"args must be list">>} + end. + +-spec is_url(binary()) -> boolean(). +is_url(Input) when is_binary(Input) -> + try + uri_string:parse(Input), + true + catch + _:_ -> false + end. \ No newline at end of file diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 04c4a13..d9e5bf1 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -37,7 +37,8 @@ os_pid :: undefined | integer(), %% 配置信息 - manifest = #{}, + manifest :: undefined | efka_manifest:manifest(), + %% 当前服务的运行状态 running_status = ?STATUS_STOPPED }). @@ -90,29 +91,21 @@ start_link(Name, Service = #micro_service{}) when is_atom(Name) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0}]) -> - WorkDir = binary_to_list(WorkDir0), - case file:read_file(WorkDir ++ "manifest.json") of - {ok, ManifestInfo} -> - Manifest = catch jiffy:decode(ManifestInfo, [return_maps]), - case check_manifest(Manifest) of - ok -> - init0(Service, Manifest); - {error, Reason} -> - lager:notice("[efka_micro_service] service: ~p, check manifest.json get error: ~p", [ServiceId, Reason]), - ignore - end; + case efka_manifest:new(WorkDir0) of + {ok, Manifest} -> + init0(Service, Manifest); {error, Reason} -> lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), ignore end. -init0(Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0, status = 1}, Manifest) -> +init0(Service = #micro_service{service_id = ServiceId, status = 1}, Manifest) -> %% 数据的状态和运行状态是2回事 - case boot_service(WorkDir0, Manifest) of + case efka_manifest:startup(Manifest) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), - {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; + {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} @@ -146,9 +139,9 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; -handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service = Service = #micro_service{work_dir = WorkDir0, service_id = ServiceId}}) -> +handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service = Service = #micro_service{service_id = ServiceId}}) -> %% 异步启动服务 - case boot_service(WorkDir0, Manifest) of + case efka_manifest:startup(Manifest) of {ok, Port} -> %% 更新数据库状态 micro_service_model:start_service(ServiceId), @@ -197,8 +190,8 @@ handle_cast(_Request, State = #state{}) -> handle_info({timeout, _, reboot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), {noreply, State}; -handle_info({timeout, _, reboot_service}, State = #state{manifest = Manifest, service = #micro_service{work_dir = WorkDir0, service_id = ServiceId, status = 1}}) -> - case boot_service(WorkDir0, Manifest) of +handle_info({timeout, _, reboot_service}, State = #state{manifest = Manifest, service = #micro_service{service_id = ServiceId, status = 1}}) -> + case efka_manifest:startup(Manifest) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), @@ -247,63 +240,6 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -%% 检查配置是否合法 --spec check_manifest(Manifest :: map()) -> ok | {error, Reason :: binary()}. -check_manifest(Manifest) when is_map(Manifest) -> - RequiredKeys = [<<"serivce_id">>, <<"exec">>, <<"args">>, <<"health_check">>], - check_manifest0(RequiredKeys, Manifest); -check_manifest(_Manifest) -> - {error, <<"invalid manifest json">>}. - -check_manifest0([], _Manifest) -> - ok; -check_manifest0([<<"serivce_id">>|T], Manifest) -> - case maps:find(<<"serivce_id">>, Manifest) of - error -> - {error, <<"miss serivce_id">>}; - {ok, Val} when is_binary(Val) -> - check_manifest0(T, Manifest); - {ok, _} -> - {error, <<"serivce_id is not string">>} - end; -check_manifest0([<<"health_check">>|T], Manifest) -> - case maps:find(<<"health_check">>, Manifest) of - error -> - {error, <<"miss health_check">>}; - {ok, Url} when is_binary(Url) -> - case is_url(Url) of - true -> - check_manifest0(T, Manifest); - false -> - {error, <<"health_check is not a invalid url">>} - end; - {ok, _} -> - {error, <<"health_check is not string">>} - end; -check_manifest0([<<"exec">>|T], Manifest) -> - case maps:find(<<"exec">>, Manifest) of - error -> - {error, <<"miss start">>}; - {ok, Cmd} when is_binary(Cmd) -> - %% 不能包含空格 - case binary:match(Cmd, <<" ">>) of - nomatch -> - check_manifest0(T, Manifest); - _ -> - {error, <<"start cmd cannot contain args">>} - end - end; -check_manifest0([<<"args">>|T], Manifest) -> - case maps:find(<<"args">>, Manifest) of - error -> - check_manifest0(T, Manifest); - %% 对参数项目不进行检查 - {ok, Args} when is_list(Args) -> - check_manifest0(T, Manifest); - {ok, _} -> - {error, <<"args must be list">>} - end. - %% 关闭系统进程 -spec kill_os_pid(port() | undefined) -> no_return(). kill_os_pid(undefined) -> @@ -311,31 +247,4 @@ kill_os_pid(undefined) -> kill_os_pid(OSPid) when is_integer(OSPid) -> Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])), lager:debug("kill cmd is: ~p", [Cmd]), - os:cmd(Cmd). - -%% 启动微服务 --spec boot_service(WorkDir :: binary(), Manifest :: map()) -> {ok, Port :: port()} | {error, Reason :: binary()}. -boot_service(WorkDir0, Manifest) when is_binary(WorkDir0), is_map(Manifest) -> - ExecCmd0 = maps:get(<<"exec">>, Manifest), - Args0 = maps:get(<<"args">>, Manifest, []), - - WorkDir = binary_to_list(WorkDir0), - PortSettings = [ - {cd, WorkDir}, - {args, [binary_to_list(A) || A <- Args0]}, - exit_status - ], - ExecCmd = binary_to_list(ExecCmd0), - RealExecCmd = filename:absname_join(WorkDir, ExecCmd), - - Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings), - {ok, Port}. - --spec is_url(binary()) -> boolean(). -is_url(Input) when is_binary(Input) -> - try - uri_string:parse(Input), - true - catch - _:_ -> false - end. \ No newline at end of file + os:cmd(Cmd). \ No newline at end of file