This commit is contained in:
anlicheng 2025-05-06 23:58:05 +08:00
parent 847f79e92a
commit 7c7fcf329b
5 changed files with 177 additions and 71 deletions

View File

@ -167,6 +167,7 @@ init([MicroServiceId, Host, Port]) ->
handle_call({controller_process, ControllerPid}, _From, State) -> handle_call({controller_process, ControllerPid}, _From, State) ->
{reply, ok, State#state{controller_process = ControllerPid}}; {reply, ok, State#state{controller_process = ControllerPid}};
%% done
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_REQUEST_METRIC:8>>, Packet = <<PacketId:32, ?PACKET_REQUEST_METRIC:8>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
@ -174,6 +175,7 @@ handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket
Ref = make_ref(), Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
%% done
handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>, Packet = <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
@ -187,6 +189,7 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket,
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% done
handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket = Socket}) -> handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket = Socket}) ->
%% Line Protocol实现数据的传输 %% Line Protocol实现数据的传输
Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()),
@ -197,12 +200,14 @@ handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket
{noreply, State}; {noreply, State};
%% done
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Packet = <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>, Packet = <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
{noreply, State}; {noreply, State};
%% done
handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) -> handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) ->
Packet = <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>, Packet = <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),

View File

@ -304,7 +304,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_PRAMAS:8, ParamsBin/binary
}, },
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
ServicePid when is_pid(ServicePid) -> ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_params(ServicePid, Params) of case efka_micro_service:push_params(ServicePid, Params, 15000) of
ok -> ok ->
Reply = #efka_response{ Reply = #efka_response{
code = 1, code = 1,
@ -333,7 +333,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/bina
}, },
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
ServicePid when is_pid(ServicePid) -> ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_metrics(ServicePid, Metrics) of case efka_micro_service:push_metrics(ServicePid, Metrics, 15000) of
ok -> ok ->
Reply = #efka_response { Reply = #efka_response {
code = 1, code = 1,

View File

@ -20,7 +20,7 @@
%% API %% API
-export([start_link/2]). -export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
-export([push_params/2, push_metrics/2, request_params/1, request_metrics/1]). -export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]).
-export([metric_data/2, send_event/3, send_ai_event/3]). -export([metric_data/2, send_event/3, send_ai_event/3]).
%% gen_server callbacks %% gen_server callbacks
@ -29,7 +29,7 @@
-export([test/0, test1/0]). -export([test/0, test1/0]).
-record(state, { -record(state, {
service :: #micro_service{}, service_id :: binary(),
%% id信息 %% id信息
channel_pid :: pid() | undefined, channel_pid :: pid() | undefined,
@ -65,11 +65,11 @@ get_name(ServiceId) when is_binary(ServiceId) ->
get_pid(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)). whereis(get_name(ServiceId)).
push_params(Pid, Args) when is_pid(Pid), is_binary(Args) -> push_params(Pid, Params, Timeout) when is_pid(Pid), is_binary(Params), is_integer(Timeout) ->
gen_server:call(Pid, {push_params, Args}). gen_server:call(Pid, {push_params, Params, Timeout - 1000}, Timeout).
push_metrics(Pid, Metrics) when is_pid(Pid), is_binary(Metrics) -> push_metrics(Pid, Metrics, Timeout) when is_pid(Pid), is_binary(Metrics), is_integer(Timeout) ->
gen_server:call(Pid, {push_metrics, Metrics}). gen_server:call(Pid, {push_metrics, Metrics, Timeout - 1000}, Timeout).
request_params(Pid) when is_pid(Pid) -> request_params(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_params). gen_server:call(Pid, request_params).
@ -99,10 +99,10 @@ stop_service(Pid) when is_pid(Pid) ->
gen_server:call(Pid, stop_service). gen_server:call(Pid, stop_service).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), Service :: #micro_service{}) -> -spec(start_link(Name :: atom(), Service :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}). {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, Service = #micro_service{}) when is_atom(Name) -> start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) ->
gen_server:start_link({local, Name}, ?MODULE, [Service], []). gen_server:start_link({local, Name}, ?MODULE, [ServiceId], []).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -113,29 +113,35 @@ start_link(Name, Service = #micro_service{}) when is_atom(Name) ->
-spec(init(Args :: term()) -> -spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([Service = #micro_service{service_id = ServiceId, work_dir = WorkDir0}]) -> init([ServiceId]) ->
case efka_manifest:new(WorkDir0) of case micro_service_model:get_service(ServiceId) of
{ok, Manifest} -> error ->
init0(Service, Manifest); lager:notice("[efka_micro_service] service_id: ~p, not found", [ServiceId]),
{error, Reason} -> ignore;
lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]), {ok, Service = #micro_service{work_dir = WorkDir0}} ->
ignore case efka_manifest:new(WorkDir0) of
{ok, Manifest} ->
init0(Service, Manifest);
{error, Reason} ->
lager:notice("[efka_micro_service] service: ~p, read manifest.json get error: ~p", [ServiceId, Reason]),
ignore
end
end. end.
init0(Service = #micro_service{service_id = ServiceId, status = 1}, Manifest) -> init0(#micro_service{service_id = ServiceId, status = 1}, Manifest) ->
%% 2 %% 2
case efka_manifest:startup(Manifest) of case efka_manifest:startup(Manifest) of
{ok, Port} -> {ok, Port} ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid), {os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]), lager:debug("[efka_micro_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ServiceId, Port, OSPid]),
{ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} -> {error, Reason} ->
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
{ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}} {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}
end; end;
init0(Service, Manifest) -> init0(#micro_service{service_id = ServiceId, status = 0}, Manifest) ->
lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"), lager:debug("[efka_micro_service] service: ~p current status is 0, not boot"),
{ok, #state{service = Service, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}. {ok, #state{service_id = ServiceId, manifest = Manifest, running_status = ?STATUS_STOPPED, port = undefined, os_pid = undefined}}.
%% @private %% @private
%% @doc Handling call messages %% @doc Handling call messages
@ -148,7 +154,8 @@ init0(Service, Manifest) ->
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% channel %% channel
handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service = #micro_service{status = Status}}) -> handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = OldChannelPid, service_id = ServiceId}) ->
Status = micro_service_model:get_status(ServiceId),
case {Status, is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid)} of case {Status, is_pid(OldChannelPid) andalso is_process_alive(OldChannelPid)} of
{1, false} -> {1, false} ->
erlang:monitor(process, ChannelPid), erlang:monitor(process, ChannelPid),
@ -159,21 +166,54 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol
{reply, {error, <<"serivce stopped">>}, State} {reply, {error, <<"serivce stopped">>}, State}
end; end;
%% %% done
handle_call({push_params, Args}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> handle_call(request_params, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) ->
case is_pid(ChannelPid) of Params = micro_service_model:get_params(ServiceId),
true -> {reply, {ok, Params}, State};
ok;
false ->
ok
end,
ok; %% done
handle_call(request_metrics, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) ->
Metrics = micro_service_model:get_metrics(ServiceId),
{reply, {ok, Metrics}, State};
%%
handle_call({push_params, Params, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
{ok, Ref} = efka_tcp_channel:push_params(ChannelPid, self(), Params),
receive
{channel_reply, Ref, {ok, Result}} ->
{reply, {ok, Result}, State};
{channel_reply, Ref, {error, Error}} ->
{reply, {error, Error}, State}
after Timeout ->
{reply, {error, <<"push_params timeout">>}, State}
end;
false ->
{reply, {error, <<"channel is not alive">>}, State}
end;
%%
handle_call({push_metrics, Metrics, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
{ok, Ref} = efka_tcp_channel:push_metrics(ChannelPid, self(), Metrics),
receive
{channel_reply, Ref, {ok, Result}} ->
{reply, {ok, Result}, State};
{channel_reply, Ref, {error, Error}} ->
{reply, {error, Error}, State}
after Timeout ->
{reply, {error, <<"push_metrics timeout">>}, State}
end;
false ->
{reply, {error, <<"channel is not alive">>}, State}
end;
%% : %% :
handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) ->
{reply, {error, <<"service is running">>}, State}; {reply, {error, <<"service is running">>}, State};
handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service = Service = #micro_service{service_id = ServiceId}}) -> handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPED, manifest = Manifest, service_id = ServiceId}) ->
%% %%
case efka_manifest:startup(Manifest) of case efka_manifest:startup(Manifest) of
{ok, Port} -> {ok, Port} ->
@ -181,7 +221,8 @@ handle_call(start_service, _From, State = #state{running_status = ?STATUS_STOPPE
micro_service_model:start_service(ServiceId), micro_service_model:start_service(ServiceId),
{os_pid, OSPid} = erlang:port_info(Port, os_pid), {os_pid, OSPid} = erlang:port_info(Port, os_pid),
lager:debug("start_service port: ~p, os_pid: ~p", [Port, OSPid]), lager:debug("start_service port: ~p, os_pid: ~p", [Port, OSPid]),
{reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service#micro_service{status = 1}}}; ok = micro_service_model:change_status(ServiceId, 1),
{reply, ok, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{error, Reason} -> {error, Reason} ->
%% %%
{reply, {error, Reason}, State} {reply, {error, Reason}, State}
@ -192,7 +233,7 @@ handle_call(stop_service, _From, State = #state{running_status = ?STATUS_STOPPED
lager:debug("stop service port: ~p, os_pid: ~p", [Port, OSPid]), lager:debug("stop service port: ~p, os_pid: ~p", [Port, OSPid]),
{reply, {error, <<"service not running">>}, State}; {reply, {error, <<"service not running">>}, State};
handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service = Service = #micro_service{service_id = ServiceId}}) when is_port(Port) -> handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid, service_id = ServiceId}) when is_port(Port) ->
%% 使stop指令, 使kill指令 %% 使stop指令, 使kill指令
kill_os_pid(OSPid), kill_os_pid(OSPid),
@ -200,7 +241,9 @@ handle_call(stop_service, _From, State = #state{running_status = ?STATUS_RUNNING
erlang:is_port(Port) andalso erlang:port_close(Port), erlang:is_port(Port) andalso erlang:port_close(Port),
lager:debug("port: ~p, os_pid: ~p, will closed", [Port, OSPid]), lager:debug("port: ~p, os_pid: ~p, will closed", [Port, OSPid]),
{reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED, service = Service#micro_service{status = 0}}}; ok = micro_service_model:change_status(ServiceId, 0),
{reply, ok, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
handle_call(_Request, _From, State = #state{}) -> handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}. {reply, ok, State}.
@ -211,15 +254,15 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, LineProtocolData}, State = #state{service = #micro_service{service_id = ServiceId}}) -> handle_cast({metric_data, LineProtocolData}, State = #state{service_id = ServiceId}) ->
efka_agent:metric_data(ServiceId, LineProtocolData), efka_agent:metric_data(ServiceId, LineProtocolData),
{noreply, State}; {noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_agent:event(ServiceId, EventType, Params), efka_agent:event(ServiceId, EventType, Params),
{noreply, State}; {noreply, State};
handle_cast({send_ai_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_agent:ai_event(ServiceId, EventType, Params), efka_agent:ai_event(ServiceId, EventType, Params),
{noreply, State}; {noreply, State};
@ -233,32 +276,35 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% %%
handle_info({timeout, _, reboot_service}, State = #state{service = #micro_service{service_id = ServiceId, status = 0}}) -> handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, manifest = Manifest}) ->
lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]), case micro_service_model:get_status(ServiceId) of
{noreply, State}; 0 ->
handle_info({timeout, _, reboot_service}, State = #state{manifest = Manifest, service = #micro_service{service_id = ServiceId, status = 1}}) -> lager:debug("[efka_micro_service] service_id: ~p, is stopped, ignore boot_service", [ServiceId]),
case efka_manifest:startup(Manifest) of {noreply, State};
{ok, Port} -> 1 ->
{os_pid, OSPid} = erlang:port_info(Port, os_pid), case efka_manifest:startup(Manifest) of
lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]), {ok, Port} ->
{noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}}; {os_pid, OSPid} = erlang:port_info(Port, os_pid),
{error, Reason} -> lager:debug("[efka_micro_service] reboot service success: ~p, port: ~p, os_pid: ~p", [ServiceId, Port, OSPid]),
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]), {noreply, State#state{running_status = ?STATUS_RUNNING, port = Port, os_pid = OSPid}};
{noreply, State#state{running_status = ?STATUS_STOPPED}} {error, Reason} ->
lager:debug("[efka_micro_service] service: ~p, boot_service get error: ~p", [ServiceId, Reason]),
{noreply, State#state{running_status = ?STATUS_STOPPED}}
end
end; end;
handle_info({Port, {data, Data}}, State = #state{port = Port, service = #micro_service{service_id = ServiceId}}) -> handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]), lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
{noreply, State}; {noreply, State};
%% port的消息, Port的被动关闭会触发Port和State.port的值是相等的 %% port的消息, Port的被动关闭会触发Port和State.port的值是相等的
handle_info({Port, {exit_status, Code}}, State = #state{service = #micro_service{service_id = ServiceId}}) -> handle_info({Port, {exit_status, Code}}, State = #state{service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]), lager:debug("[efka_micro_service] service_id: ~p, port: ~p, exit with code: ~p", [ServiceId, Port, Code]),
% erlang:start_timer(5000, self(), reboot_service), % erlang:start_timer(5000, self(), reboot_service),
{noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}}; {noreply, State#state{port = undefined, os_pid = undefined, running_status = ?STATUS_STOPPED}};
%% channel进程的退出 %% channel进程的退出
handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service = #micro_service{service_id = ServiceId}}) -> handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]), lager:debug("[efka_micro_service] service: ~p, channel exited: ~p", [ServiceId, Reason]),
{noreply, State#state{channel_pid = undefined}}. {noreply, State#state{channel_pid = undefined}}.

View File

@ -13,7 +13,7 @@
%% API %% API
-export([start_link/1]). -export([start_link/1]).
-export([push_metric/2, push_param/2]). -export([push_metrics/3, push_params/3]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -56,16 +56,16 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
-spec push_param(ChannelPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}. -spec push_params(ChannelPid :: pid(), ReceiverPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}.
push_param(ChannelPid, Params) when is_pid(ChannelPid), is_binary(Params) -> push_params(ChannelPid, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params) ->
Ref = make_ref(), Ref = make_ref(),
gen_server:cast(ChannelPid, {push_param, Ref, self(), Params}), gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}),
{ok, Ref}. {ok, Ref}.
-spec push_metric(ChannelPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}. -spec push_metrics(ChannelPid :: pid(), ReceiverPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}.
push_metric(ChannelPid, Metrics) when is_pid(ChannelPid), is_binary(Metrics) -> push_metrics(ChannelPid, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) ->
Ref = make_ref(), Ref = make_ref(),
gen_server:cast(ChannelPid, {push_metric, Ref, self(), Metrics}), gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}),
{ok, Ref}. {ok, Ref}.
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
@ -108,12 +108,12 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% %%
handle_cast({push_param, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_PARAM:8, Params/binary>>), ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_PARAM:8, Params/binary>>),
{noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
%% %%
handle_cast({push_metric, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_METRIC:8, Metrics/binary>>), ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_METRIC:8, Metrics/binary>>),
{noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
@ -144,8 +144,8 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, ServiceId/binary>>}
%% %%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
{ok, Args} = efka_micro_service:request_params(ServicePid), {ok, Params} = efka_micro_service:request_params(ServicePid),
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Args/binary>>), ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Params/binary>>),
{noreply, State}; {noreply, State};
%% %%

View File

@ -15,7 +15,8 @@
%% API %% API
-export([create_table/0]). -export([create_table/0]).
-export([insert/1, start_service/1, stop_service/1, get_all_services/0]). -export([insert/1, get_all_services/0]).
-export([get_metrics/1, get_params/1, set_metrics/2, set_params/2, get_service/1, get_status/1, change_status/2]).
create_table() -> create_table() ->
%% id生成器 %% id生成器
@ -34,13 +35,13 @@ insert(MicroService = #micro_service{}) ->
{error, Reason} {error, Reason}
end. end.
stop_service(ServiceId) when is_binary(ServiceId) -> change_status(ServiceId, NewStatus) when is_binary(ServiceId), is_integer(NewStatus) ->
Fun = fun() -> Fun = fun() ->
case mnesia:read(?TAB, ServiceId, write) of case mnesia:read(?TAB, ServiceId, write) of
[] -> [] ->
mnesia:abort(<<"service not found">>); mnesia:abort(<<"service not found">>);
[Service] -> [Service] ->
mnesia:write(?TAB, Service#micro_service{status = 0}, write) mnesia:write(?TAB, Service#micro_service{status = NewStatus}, write)
end end
end, end,
case mnesia:transaction(Fun) of case mnesia:transaction(Fun) of
@ -50,13 +51,14 @@ stop_service(ServiceId) when is_binary(ServiceId) ->
{error, Reason} {error, Reason}
end. end.
start_service(ServiceId) when is_binary(ServiceId) -> -spec set_params(ServiceId :: binary(), Params :: binary()) -> ok | {error, Reason :: any()}.
set_params(ServiceId, Params) when is_binary(ServiceId), is_binary(Params) ->
Fun = fun() -> Fun = fun() ->
case mnesia:read(?TAB, ServiceId, write) of case mnesia:read(?TAB, ServiceId, write) of
[] -> [] ->
mnesia:abort(<<"service not found">>); mnesia:abort(<<"service not found">>);
[S] -> [S] ->
mnesia:write(?TAB, S#micro_service{status = 1}, write) mnesia:write(?TAB, S#micro_service{params = Params}, write)
end end
end, end,
case mnesia:transaction(Fun) of case mnesia:transaction(Fun) of
@ -66,6 +68,59 @@ start_service(ServiceId) when is_binary(ServiceId) ->
{error, Reason} {error, Reason}
end. end.
-spec set_metrics(ServiceId :: binary(), Metrics :: binary()) -> ok | {error, Reason :: any()}.
set_metrics(ServiceId, Metrics) when is_binary(ServiceId), is_binary(Metrics) ->
Fun = fun() ->
case mnesia:read(?TAB, ServiceId, write) of
[] ->
mnesia:abort(<<"service not found">>);
[S] ->
mnesia:write(?TAB, S#micro_service{metrics = Metrics}, write)
end
end,
case mnesia:transaction(Fun) of
{'atomic', ok} ->
ok;
{'aborted', Reason} ->
{error, Reason}
end.
-spec get_params(ServiceId :: binary()) -> Params :: binary().
get_params(ServiceId) when is_binary(ServiceId) ->
case mnesia:dirty_read(?TAB, ServiceId) of
[] ->
<<"">>;
[#micro_service{params = Params}] ->
Params
end.
-spec get_metrics(ServiceId :: binary()) -> Params :: binary().
get_metrics(ServiceId) when is_binary(ServiceId) ->
case mnesia:dirty_read(?TAB, ServiceId) of
[] ->
<<"">>;
[#micro_service{metrics = Metrics}] ->
Metrics
end.
-spec get_status(ServiceId :: binary()) -> Status :: integer().
get_status(ServiceId) when is_binary(ServiceId) ->
case mnesia:dirty_read(?TAB, ServiceId) of
[] ->
0;
[#micro_service{status = Status}] ->
Status
end.
-spec get_service(ServiceId :: binary()) -> error | {ok, Service :: #micro_service{}}.
get_service(ServiceId) when is_binary(ServiceId) ->
case mnesia:dirty_read(?TAB, ServiceId) of
[] ->
error;
[Service] ->
{ok, Service}
end.
-spec get_all_services() -> [#micro_service{}]. -spec get_all_services() -> [#micro_service{}].
get_all_services() -> get_all_services() ->
Fun = fun() -> Fun = fun() ->