ekfa/apps/efka/src/efka_micro_service.erl
2025-05-05 22:37:53 +08:00

341 lines
14 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%% 1. 需要管理服务的整个生命周期,包括: 启动,停止
%%% 2. 需要监控服务的状态通过port的方式
%%% @end
%%% Created : 18. 4月 2025 16:50
%%%-------------------------------------------------------------------
-module(efka_micro_service).
-author("anlicheng").
-include("efka_tables.hrl").
-behaviour(gen_server).
%% 当前微服务的状态
-define(STATUS_STOPPED, stopped).
-define(STATUS_RUNNING, running).
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-export([test/0, test1/0]).
-record(state, {
service :: #micro_service{},
%% 通道id信息
channel_pid :: pid() | undefined,
%% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid)
port :: undefined | port(),
%% 系统对应的pid
os_pid :: undefined | integer(),
%% 配置信息
manifest = #{},
%% 当前服务的运行状态
running_status = ?STATUS_STOPPED
}).
%%%===================================================================
%%% API
%%%===================================================================
test() ->
Pid = get_pid(<<"test1234">>),
stop_service(Pid).
test1() ->
Pid = get_pid(<<"test1234">>),
start_service(Pid).
-spec get_name(ServiceId :: binary()) -> atom().
get_name(ServiceId) when is_binary(ServiceId) ->
list_to_atom("efka_service:" ++ binary_to_list(ServiceId)).
-spec get_pid(ServiceId :: binary()) -> undefined | pid().
get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)).
-spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}.
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_server:call(Pid, {attach_channel, ChannelPid}).
-spec start_service(Pid :: pid()) -> ok | {error, Reason :: binary()}.
start_service(Pid) when is_pid(Pid) ->
gen_server:call(Pid, start_service).
-spec stop_service(Pid :: pid()) -> ok | {error, Reason :: binary()}.
stop_service(Pid) when is_pid(Pid) ->
gen_server:call(Pid, stop_service).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Service :: #micro_service{}) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, Service = #micro_service{}) when is_atom(Name) ->
gen_server:start_link({local, Name}, ?MODULE, [Service], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0}]) ->
WorkDir = binary_to_list(WorkDir0),
case file:read_file(WorkDir ++ "manifest.json") of
{ok, ManifestInfo} ->
Manifest = catch jiffy:decode(ManifestInfo, [return_maps]),
case check_manifest(Manifest) of
ok ->
init0(Service, Manifest);
{error, Reason} ->
lager:notice("[efka_micro_service] service: ~p, check manifest.json get error: ~p", [ServiceId, Reason]),
ignore
end;
{error, Reason} ->
lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]),
ignore
end.
init0(Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0, status = 1}, Manifest) ->
%% 数据的状态和运行状态是2回事
case boot_service(WorkDir0, Manifest) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]),
{ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
{ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}
end;
init0(Service, Manifest) ->
lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"),
{ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 绑定channel
handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service = #micro_service{status = Status}}) ->
case {Status, is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid)} of
{1, false} ->
erlang:monitor(process, ChannelPid),
{reply, ok, State#state{channel_pid = ChannelPid}};
{1, true} ->
{reply, {error, <<"channel exists">>}, State};
{0, _} ->
{reply, {error, <<"serivce stopped">>}, State}
end;
%% 启动服务: 当前服务如果正常运行,则不允许重启
handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) ->
{reply, {error, <<"service is running">>}, State};
handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service = Service = #micro_service{work_dir = WorkDir0, service_id = ServiceId}}) ->
%% 异步启动服务
case boot_service(WorkDir0, Manifest) of
{ok, Port} ->
%% 更新数据库状态
micro_service_model:start_service(ServiceId),
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("start_service port: ~p, os_pid: ~p", [Port, OSPid]),
{reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service#micro_service{status = 1}}};
{error, Reason} ->
%% 启动失败不能更新数据库里面的状态
{reply, {error, Reason}, State}
end;
%% 停止服务, 主动停止的时候会改变服务配置的status字段
handle_call(stop_service, _From, State = #state{running_status = ?STATUS_STOPPED, port = Port, os_pid = OSPid}) ->
lager:debug("stop service port: ~p, os_pid: ~p", [Port, OSPid]),
{reply, {error, <<"service not running">>}, State};
handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service = #micro_service{service_id = ServiceId}}) when is_port(Port) ->
%% 优先使用微服务提供的stop指令, 没有提供的情况下使用kill指令
kill_os_pid(OSPid),
micro_service_model:stop_service(ServiceId),
erlang:is_port(Port) andalso erlang:port_close(Port),
lager:debug("port: ~p, os_pid: ~p, will closed", [Port, OSPid]),
{reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED, service = Service#micro_service{status = 0}}};
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 重启服务
handle_info({timeout, _, reboot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) ->
lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]),
{noreply, State};
handle_info({timeout, _, reboot_service}, State = #state{manifest = Manifest, service = #micro_service{work_dir = WorkDir0, service_id = ServiceId, status = 1}}) ->
case boot_service(WorkDir0, Manifest) of
{ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
{noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} ->
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
{noreply, State#state{running_status = ?STATUS_STOPPED}}
end;
handle_info({Port, {data, Data}}, State = #state{port = Port, service = #micro_service{service_id = ServiceId}}) ->
lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
{noreply, State};
%% 处理port的消息, Port的被动关闭会触发因此这个时候的Port和State.port的值是相等的
handle_info({Port, {exit_status, Code}}, State = #state{service = #micro_service{service_id = ServiceId}}) ->
lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]),
% erlang:start_timer(5000, self(), reboot_service),
{noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
%% 处理channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service = #micro_service{service_id = ServiceId}}) ->
lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]),
{noreply, State#state{channel_pid = undefined}}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{os_pid = OSPid}) ->
lager:debug("[efka_micro_service] terminate with reason: ~p", [Reason]),
kill_os_pid(OSPid),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 检查配置是否合法
-spec check_manifest(Manifest :: map()) -> ok | {error, Reason :: binary()}.
check_manifest(Manifest) when is_map(Manifest) ->
RequiredKeys = [<<"serivce_id">>, <<"exec">>, <<"args">>, <<"health_check">>],
check_manifest0(RequiredKeys, Manifest);
check_manifest(_Manifest) ->
{error, <<"invalid manifest json">>}.
check_manifest0([], _Manifest) ->
ok;
check_manifest0([<<"serivce_id">>|T], Manifest) ->
case maps:find(<<"serivce_id">>, Manifest) of
error ->
{error, <<"miss serivce_id">>};
{ok, Val} when is_binary(Val) ->
check_manifest0(T, Manifest);
{ok, _} ->
{error, <<"serivce_id is not string">>}
end;
check_manifest0([<<"health_check">>|T], Manifest) ->
case maps:find(<<"health_check">>, Manifest) of
error ->
{error, <<"miss health_check">>};
{ok, Url} when is_binary(Url) ->
case is_url(Url) of
true ->
check_manifest0(T, Manifest);
false ->
{error, <<"health_check is not a invalid url">>}
end;
{ok, _} ->
{error, <<"health_check is not string">>}
end;
check_manifest0([<<"exec">>|T], Manifest) ->
case maps:find(<<"exec">>, Manifest) of
error ->
{error, <<"miss start">>};
{ok, Cmd} when is_binary(Cmd) ->
%% 不能包含空格
case binary:match(Cmd, <<" ">>) of
nomatch ->
check_manifest0(T, Manifest);
_ ->
{error, <<"start cmd cannot contain args">>}
end
end;
check_manifest0([<<"args">>|T], Manifest) ->
case maps:find(<<"args">>, Manifest) of
error ->
check_manifest0(T, Manifest);
%% 对参数项目不进行检查
{ok, Args} when is_list(Args) ->
check_manifest0(T, Manifest);
{ok, _} ->
{error, <<"args must be list">>}
end.
%% 关闭系统进程
-spec kill_os_pid(port() | undefined) -> no_return().
kill_os_pid(undefined) ->
ok;
kill_os_pid(OSPid) when is_integer(OSPid) ->
Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])),
lager:debug("kill cmd is: ~p", [Cmd]),
os:cmd(Cmd).
%% 启动微服务
-spec boot_service(WorkDir :: binary(), Manifest :: map()) -> {ok, Port :: port()} | {error, Reason :: binary()}.
boot_service(WorkDir0, Manifest) when is_binary(WorkDir0), is_map(Manifest) ->
ExecCmd0 = maps:get(<<"exec">>, Manifest),
Args0 = maps:get(<<"args">>, Manifest, []),
WorkDir = binary_to_list(WorkDir0),
PortSettings = [
{cd, WorkDir},
{args, [binary_to_list(A) || A <- Args0]},
exit_status
],
ExecCmd = binary_to_list(ExecCmd0),
RealExecCmd = filename:absname_join(WorkDir, ExecCmd),
Port = erlang:open_port({spawn_executable, RealExecCmd}, PortSettings),
{ok, Port}.
-spec is_url(binary()) -> boolean().
is_url(Input) when is_binary(Input) ->
try
uri_string:parse(Input),
true
catch
_:_ -> false
end.