From 7c7fcf329bc7e21bfedfd380974fca68c9e20250 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 23:58:05 +0800 Subject: [PATCH] fix --- apps/efka/src/client/efka_client.erl | 5 + apps/efka/src/efka_agent.erl | 4 +- apps/efka/src/efka_micro_service.erl | 152 ++++++++++++------- apps/efka/src/efka_tcp_channel.erl | 22 +-- apps/efka/src/mnesia/micro_service_model.erl | 65 +++++++- 5 files changed, 177 insertions(+), 71 deletions(-) diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index 8ffb812..27787e2 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -167,6 +167,7 @@ init([MicroServiceId, Host, Port]) -> handle_call({controller_process, ControllerPid}, _From, State) -> {reply, ok, State#state{controller_process = ControllerPid}}; +%% done handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Packet = <>, ok = gen_tcp:send(Socket, Packet), @@ -174,6 +175,7 @@ handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; +%% done handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Packet = <>, ok = gen_tcp:send(Socket, Packet), @@ -187,6 +189,7 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +%% done handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket = Socket}) -> %% 基于Line Protocol实现数据的传输 Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), @@ -197,12 +200,14 @@ handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket {noreply, State}; +%% done handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> Packet = <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; +%% done handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) -> Packet = <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>, ok = gen_tcp:send(Socket, Packet), diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index b844620..3302fa1 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -304,7 +304,7 @@ handle_info({server_push_message, PacketId, < - case efka_micro_service:push_params(ServicePid, Params) of + case efka_micro_service:push_params(ServicePid, Params, 15000) of ok -> Reply = #efka_response{ code = 1, @@ -333,7 +333,7 @@ handle_info({server_push_message, PacketId, < - case efka_micro_service:push_metrics(ServicePid, Metrics) of + case efka_micro_service:push_metrics(ServicePid, Metrics, 15000) of ok -> Reply = #efka_response { code = 1, diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index bf3dcf3..29aefa3 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -20,7 +20,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_params/2, push_metrics/2, request_params/1, request_metrics/1]). +-export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]). -export([metric_data/2, send_event/3, send_ai_event/3]). %% gen_server callbacks @@ -29,7 +29,7 @@ -export([test/0, test1/0]). -record(state, { - service :: #micro_service{}, + service_id :: binary(), %% 通道id信息 channel_pid :: pid() | undefined, @@ -65,11 +65,11 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). -push_params(Pid, Args) when is_pid(Pid), is_binary(Args) -> - gen_server:call(Pid, {push_params, Args}). +push_params(Pid, Params, Timeout) when is_pid(Pid), is_binary(Params), is_integer(Timeout) -> + gen_server:call(Pid, {push_params, Params, Timeout - 1000}, Timeout). -push_metrics(Pid, Metrics) when is_pid(Pid), is_binary(Metrics) -> - gen_server:call(Pid, {push_metrics, Metrics}). +push_metrics(Pid, Metrics, Timeout) when is_pid(Pid), is_binary(Metrics), is_integer(Timeout) -> + gen_server:call(Pid, {push_metrics, Metrics, Timeout - 1000}, Timeout). request_params(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_params). @@ -99,10 +99,10 @@ stop_service(Pid) when is_pid(Pid) -> gen_server:call(Pid, stop_service). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Name :: atom(), Service :: #micro_service{}) -> +-spec(start_link(Name :: atom(), Service :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, Service = #micro_service{}) when is_atom(Name) -> - gen_server:start_link({local, Name}, ?MODULE, [Service], []). +start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) -> + gen_server:start_link({local, Name}, ?MODULE, [ServiceId], []). %%%=================================================================== %%% gen_server callbacks @@ -113,29 +113,35 @@ start_link(Name, Service = #micro_service{}) when is_atom(Name) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0}]) -> - case efka_manifest:new(WorkDir0) of - {ok, Manifest} -> - init0(Service, Manifest); - {error, Reason} -> - lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), - ignore +init([ServiceId]) -> + case micro_service_model:get_service(ServiceId) of + error -> + lager:notice("[efka_micro_service] service_id: ~p, not found", [ServiceId]), + ignore; + {ok, Service = #micro_service{work_dir = WorkDir0}} -> + case efka_manifest:new(WorkDir0) of + {ok, Manifest} -> + init0(Service, Manifest); + {error, Reason} -> + lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), + ignore + end end. -init0(Service = #micro_service{service_id = ServiceId, status = 1}, Manifest) -> +init0(#micro_service{service_id = ServiceId, status = 1}, Manifest) -> %% 数据的状态和运行状态是2回事 case efka_manifest:startup(Manifest) of {ok, Port} -> {os_pid, OSPid} = erlang:port_info(Port, os_pid), lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), - {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; + {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), - {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} + {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} end; -init0(Service, Manifest) -> +init0(#micro_service{service_id = ServiceId, status = 0}, Manifest) -> lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), - {ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}. + {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}. %% @private %% @doc Handling call messages @@ -148,7 +154,8 @@ init0(Service, Manifest) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 绑定channel -handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service = #micro_service{status = Status}}) -> +handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) -> + Status = micro_service_model:get_status(ServiceId), case {Status, is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid)} of {1, false} -> erlang:monitor(process, ChannelPid), @@ -159,21 +166,54 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol {reply, {error, <<"serivce stopped">>}, State} end; -%% 推送配置项目 -handle_call({push_params, Args}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> - case is_pid(ChannelPid) of - true -> - ok; - false -> - ok - end, +%% 请求参数项 done +handle_call(request_params, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> + Params = micro_service_model:get_params(ServiceId), + {reply, {ok, Params}, State}; - ok; +%% 请求采集项 done +handle_call(request_metrics, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> + Metrics = micro_service_model:get_metrics(ServiceId), + {reply, {ok, Metrics}, State}; + +%% 推送配置项目 +handle_call({push_params, Params, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + {ok, Ref} = efka_tcp_channel:push_params(ChannelPid, self(), Params), + receive + {channel_reply, Ref, {ok, Result}} -> + {reply, {ok, Result}, State}; + {channel_reply, Ref, {error, Error}} -> + {reply, {error, Error}, State} + after Timeout -> + {reply, {error, <<"push_params timeout">>}, State} + end; + false -> + {reply, {error, <<"channel is not alive">>}, State} + end; + +%% 推送采集项 +handle_call({push_metrics, Metrics, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + {ok, Ref} = efka_tcp_channel:push_metrics(ChannelPid, self(), Metrics), + receive + {channel_reply, Ref, {ok, Result}} -> + {reply, {ok, Result}, State}; + {channel_reply, Ref, {error, Error}} -> + {reply, {error, Error}, State} + after Timeout -> + {reply, {error, <<"push_metrics timeout">>}, State} + end; + false -> + {reply, {error, <<"channel is not alive">>}, State} + end; %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; -handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service = Service = #micro_service{service_id = ServiceId}}) -> +handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service_id = ServiceId}) -> %% 异步启动服务 case efka_manifest:startup(Manifest) of {ok, Port} -> @@ -181,7 +221,8 @@ handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPE micro_service_model:start_service(ServiceId), {os_pid, OSPid} = erlang:port_info(Port, os_pid), lager:debug("start_service port: ~p, os_pid: ~p", [Port, OSPid]), - {reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service#micro_service{status = 1}}}; + ok = micro_service_model:change_status(ServiceId, 1), + {reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {error, Reason} -> %% 启动失败不能更新数据库里面的状态 {reply, {error, Reason}, State} @@ -192,7 +233,7 @@ handle_call(stop_service, _From, State = #state{running_status = ?STATUS_STOPPED lager:debug("stop service port: ~p, os_pid: ~p", [Port, OSPid]), {reply, {error, <<"service not running">>}, State}; -handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service = #micro_service{service_id = ServiceId}}) when is_port(Port) -> +handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service_id = ServiceId}) when is_port(Port) -> %% 优先使用微服务提供的stop指令, 没有提供的情况下,使用kill指令 kill_os_pid(OSPid), @@ -200,7 +241,9 @@ handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING erlang:is_port(Port) andalso erlang:port_close(Port), lager:debug("port: ~p, os_pid: ~p, will closed", [Port, OSPid]), - {reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED, service = Service#micro_service{status = 0}}}; + ok = micro_service_model:change_status(ServiceId, 0), + + {reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}}; handle_call(_Request, _From, State = #state{}) -> {reply, ok, State}. @@ -211,15 +254,15 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({metric_data, LineProtocolData}, State = #state{service = #micro_service{service_id = ServiceId}}) -> +handle_cast({metric_data, LineProtocolData}, State = #state{service_id = ServiceId}) -> efka_agent:metric_data(ServiceId, LineProtocolData), {noreply, State}; -handle_cast({send_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> +handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) -> efka_agent:event(ServiceId, EventType, Params), {noreply, State}; -handle_cast({send_ai_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> +handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = ServiceId}) -> efka_agent:ai_event(ServiceId, EventType, Params), {noreply, State}; @@ -233,32 +276,35 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 重启服务 -handle_info({timeout, _, reboot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> - lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), - {noreply, State}; -handle_info({timeout, _, reboot_service}, State = #state{manifest = Manifest, service = #micro_service{service_id = ServiceId, status = 1}}) -> - case efka_manifest:startup(Manifest) of - {ok, Port} -> - {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), - {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; - {error, Reason} -> - lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), - {noreply, State#state{running_status = ?STATUS_STOPPED}} +handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) -> + case micro_service_model:get_status(ServiceId) of + 0 -> + lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), + {noreply, State}; + 1 -> + case efka_manifest:startup(Manifest) of + {ok, Port} -> + {os_pid, OSPid} = erlang:port_info(Port, os_pid), + lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), + {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; + {error, Reason} -> + lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), + {noreply, State#state{running_status = ?STATUS_STOPPED}} + end end; -handle_info({Port, {data, Data}}, State = #state{port = Port, service = #micro_service{service_id = ServiceId}}) -> +handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) -> lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]), {noreply, State}; %% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 -handle_info({Port, {exit_status, Code}}, State = #state{service = #micro_service{service_id = ServiceId}}) -> +handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) -> lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]), % erlang:start_timer(5000, self(), reboot_service), {noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}}; %% 处理channel进程的退出 -handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service = #micro_service{service_id = ServiceId}}) -> +handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]), {noreply, State#state{channel_pid = undefined}}. diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index d721c2b..153b7eb 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -13,7 +13,7 @@ %% API -export([start_link/1]). --export([push_metric/2, push_param/2]). +-export([push_metrics/3, push_params/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,16 +56,16 @@ %%% API %%%=================================================================== --spec push_param(ChannelPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}. -push_param(ChannelPid, Params) when is_pid(ChannelPid), is_binary(Params) -> +-spec push_params(ChannelPid :: pid(), ReceiverPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}. +push_params(ChannelPid, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params) -> Ref = make_ref(), - gen_server:cast(ChannelPid, {push_param, Ref, self(), Params}), + gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}), {ok, Ref}. --spec push_metric(ChannelPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}. -push_metric(ChannelPid, Metrics) when is_pid(ChannelPid), is_binary(Metrics) -> +-spec push_metrics(ChannelPid :: pid(), ReceiverPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}. +push_metrics(ChannelPid, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) -> Ref = make_ref(), - gen_server:cast(ChannelPid, {push_metric, Ref, self(), Metrics}), + gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}), {ok, Ref}. %% @doc Spawns the server and registers the local name (unique) @@ -108,12 +108,12 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 推送参数项目 -handle_cast({push_param, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> +handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> ok = gen_tcp:send(Socket, <>), {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; %% 推送采集项目 -handle_cast({push_metric, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> +handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> ok = gen_tcp:send(Socket, <>), {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; @@ -144,8 +144,8 @@ handle_info({tcp, Socket, <>} %% 请求参数 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - {ok, Args} = efka_micro_service:request_params(ServicePid), - ok = gen_tcp:send(Socket, <>), + {ok, Params} = efka_micro_service:request_params(ServicePid), + ok = gen_tcp:send(Socket, <>), {noreply, State}; %% 请求采集项目 diff --git a/apps/efka/src/mnesia/micro_service_model.erl b/apps/efka/src/mnesia/micro_service_model.erl index 1afc194..e200d15 100644 --- a/apps/efka/src/mnesia/micro_service_model.erl +++ b/apps/efka/src/mnesia/micro_service_model.erl @@ -15,7 +15,8 @@ %% API -export([create_table/0]). --export([insert/1, start_service/1, stop_service/1, get_all_services/0]). +-export([insert/1, get_all_services/0]). +-export([get_metrics/1, get_params/1, set_metrics/2, set_params/2, get_service/1, get_status/1, change_status/2]). create_table() -> %% id生成器 @@ -34,13 +35,13 @@ insert(MicroService = #micro_service{}) -> {error, Reason} end. -stop_service(ServiceId) when is_binary(ServiceId) -> +change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewStatus) -> Fun = fun() -> case mnesia:read(?TAB, ServiceId, write) of [] -> mnesia:abort(<<"service not found">>); [Service] -> - mnesia:write(?TAB, Service#micro_service{status = 0}, write) + mnesia:write(?TAB, Service#micro_service{status = NewStatus}, write) end end, case mnesia:transaction(Fun) of @@ -50,13 +51,14 @@ stop_service(ServiceId) when is_binary(ServiceId) -> {error, Reason} end. -start_service(ServiceId) when is_binary(ServiceId) -> +-spec set_params(ServiceId :: binary(), Params :: binary()) -> ok | {error, Reason :: any()}. +set_params(ServiceId, Params) when is_binary(ServiceId), is_binary(Params) -> Fun = fun() -> case mnesia:read(?TAB, ServiceId, write) of [] -> mnesia:abort(<<"service not found">>); [S] -> - mnesia:write(?TAB, S#micro_service{status = 1}, write) + mnesia:write(?TAB, S#micro_service{params = Params}, write) end end, case mnesia:transaction(Fun) of @@ -66,6 +68,59 @@ start_service(ServiceId) when is_binary(ServiceId) -> {error, Reason} end. +-spec set_metrics(ServiceId :: binary(), Metrics :: binary()) -> ok | {error, Reason :: any()}. +set_metrics(ServiceId, Metrics) when is_binary(ServiceId), is_binary(Metrics) -> + Fun = fun() -> + case mnesia:read(?TAB, ServiceId, write) of + [] -> + mnesia:abort(<<"service not found">>); + [S] -> + mnesia:write(?TAB, S#micro_service{metrics = Metrics}, write) + end + end, + case mnesia:transaction(Fun) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. + +-spec get_params(ServiceId :: binary()) -> Params :: binary(). +get_params(ServiceId) when is_binary(ServiceId) -> + case mnesia:dirty_read(?TAB, ServiceId) of + [] -> + <<"">>; + [#micro_service{params = Params}] -> + Params + end. + +-spec get_metrics(ServiceId :: binary()) -> Params :: binary(). +get_metrics(ServiceId) when is_binary(ServiceId) -> + case mnesia:dirty_read(?TAB, ServiceId) of + [] -> + <<"">>; + [#micro_service{metrics = Metrics}] -> + Metrics + end. + +-spec get_status(ServiceId :: binary()) -> Status :: integer(). +get_status(ServiceId) when is_binary(ServiceId) -> + case mnesia:dirty_read(?TAB, ServiceId) of + [] -> + 0; + [#micro_service{status = Status}] -> + Status + end. + +-spec get_service(ServiceId :: binary()) -> error | {ok, Service :: #micro_service{}}. +get_service(ServiceId) when is_binary(ServiceId) -> + case mnesia:dirty_read(?TAB, ServiceId) of + [] -> + error; + [Service] -> + {ok, Service} + end. + -spec get_all_services() -> [#micro_service{}]. get_all_services() -> Fun = fun() ->