diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 3302fa1..79cc08d 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -35,7 +35,9 @@ -record(state, { transport_pid :: undefined | pid(), - status = ?STATE_DENIED + status = ?STATE_DENIED, + %% 映射关系 #{Ref => PacketId} + inflight = #{} }). %%%=================================================================== @@ -293,7 +295,7 @@ handle_info({server_push_message, PacketId, <>}, State = #state{transport_pid = TransportPid}) -> +handle_info({server_push_message, PacketId, <>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> #service_params{service_id = ServiceId, params = Params} = message_pb:decode_msg(ParamsBin, service_params), case efka_micro_service:get_pid(ServiceId) of @@ -302,27 +304,17 @@ handle_info({server_push_message, PacketId, <> }, - efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); + efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), + {noreply, State}; ServicePid when is_pid(ServicePid) -> - case efka_micro_service:push_params(ServicePid, Params, 15000) of - ok -> - Reply = #efka_response{ - code = 1, - message = <<"">> - }, - efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); - {error, Reason} -> - Reply = #efka_response{ - code = 0, - message = Reason - }, - efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)) - end + Ref = make_ref(), + efka_micro_service:push_params(ServicePid, Ref, self(), Params), + {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} end, {noreply, State}; %% 采集项目 -handle_info({server_push_message, PacketId, <>}, State = #state{transport_pid = TransportPid}) -> +handle_info({server_push_message, PacketId, <>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> #service_metrics{service_id = ServiceId, metrics = Metrics} = message_pb:decode_msg(MetricsBin, service_metrics), case efka_micro_service:get_pid(ServiceId) of @@ -331,9 +323,21 @@ handle_info({server_push_message, PacketId, <> }, - efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)); + efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)), + {noreply, State}; ServicePid when is_pid(ServicePid) -> - case efka_micro_service:push_metrics(ServicePid, Metrics, 15000) of + Ref = make_ref(), + efka_micro_service:push_metrics(ServicePid, Ref, self(), Metrics), + {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} + end; + +%% 收到来自channel的回复 +handle_info({channel_reply, Ref, Reply}, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> + case maps:take(Ref, Inflight) of + error -> + {noreply, State}; + {PacketId, NInflight} -> + case Reply of ok -> Reply = #efka_response { code = 1, @@ -346,9 +350,9 @@ handle_info({server_push_message, PacketId, <>}, State = #state{transport_pid = TransportPid, status = Status}) -> diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 29aefa3..2c80733 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -20,7 +20,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]). +-export([push_params/4, push_metrics/4, request_params/1, request_metrics/1]). -export([metric_data/2, send_event/3, send_ai_event/3]). %% gen_server callbacks @@ -65,11 +65,11 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). -push_params(Pid, Params, Timeout) when is_pid(Pid), is_binary(Params), is_integer(Timeout) -> - gen_server:call(Pid, {push_params, Params, Timeout - 1000}, Timeout). +push_params(Pid, Ref, ReceiverPid, Params) when is_pid(Pid), is_binary(Params) -> + gen_server:cast(Pid, {push_params, Ref, ReceiverPid, Params}). -push_metrics(Pid, Metrics, Timeout) when is_pid(Pid), is_binary(Metrics), is_integer(Timeout) -> - gen_server:call(Pid, {push_metrics, Metrics, Timeout - 1000}, Timeout). +push_metrics(Pid, Ref, ReceiverPid, Metrics) when is_pid(Pid), is_binary(Metrics) -> + gen_server:cast(Pid, {push_metrics, Ref, ReceiverPid, Metrics}). request_params(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_params). @@ -176,40 +176,6 @@ handle_call(request_metrics, _From, State = #state{service_id = ServiceId, runni Metrics = micro_service_model:get_metrics(ServiceId), {reply, {ok, Metrics}, State}; -%% 推送配置项目 -handle_call({push_params, Params, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> - case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of - true -> - {ok, Ref} = efka_tcp_channel:push_params(ChannelPid, self(), Params), - receive - {channel_reply, Ref, {ok, Result}} -> - {reply, {ok, Result}, State}; - {channel_reply, Ref, {error, Error}} -> - {reply, {error, Error}, State} - after Timeout -> - {reply, {error, <<"push_params timeout">>}, State} - end; - false -> - {reply, {error, <<"channel is not alive">>}, State} - end; - -%% 推送采集项 -handle_call({push_metrics, Metrics, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> - case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of - true -> - {ok, Ref} = efka_tcp_channel:push_metrics(ChannelPid, self(), Metrics), - receive - {channel_reply, Ref, {ok, Result}} -> - {reply, {ok, Result}, State}; - {channel_reply, Ref, {error, Error}} -> - {reply, {error, Error}, State} - after Timeout -> - {reply, {error, <<"push_metrics timeout">>}, State} - end; - false -> - {reply, {error, <<"channel is not alive">>}, State} - end; - %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; @@ -266,6 +232,28 @@ handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = Serv efka_agent:ai_event(ServiceId, EventType, Params), {noreply, State}; +%% 推送配置项目 +handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + efka_tcp_channel:push_params(ChannelPid, Ref, ReceiverPid, Params), + {noreply, State}; + false -> + ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}}, + {reply, State} + end; + +%% 推送采集项 +handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + efka_tcp_channel:push_metrics(ChannelPid, Ref, ReceiverPid, Metrics), + {noreply, State}; + false -> + ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}}, + {noreply, State} + end; + handle_cast(_Request, State = #state{}) -> {noreply, State}. diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index 153b7eb..58b1448 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -13,7 +13,7 @@ %% API -export([start_link/1]). --export([push_metrics/3, push_params/3]). +-export([push_metrics/4, push_params/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -56,17 +56,13 @@ %%% API %%%=================================================================== --spec push_params(ChannelPid :: pid(), ReceiverPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}. -push_params(ChannelPid, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params) -> - Ref = make_ref(), - gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}), - {ok, Ref}. +-spec push_params(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Params :: binary()) -> no_return(). +push_params(ChannelPid, Ref, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params), is_reference(Ref) -> + gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}). --spec push_metrics(ChannelPid :: pid(), ReceiverPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}. -push_metrics(ChannelPid, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) -> - Ref = make_ref(), - gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}), - {ok, Ref}. +-spec push_metrics(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Metrics :: binary()) -> no_return(). +push_metrics(ChannelPid, Ref, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) -> + gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Socket :: gen_tcp:socket()) ->