diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 79cc08d..8eb771b 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -308,7 +308,7 @@ handle_info({server_push_message, PacketId, < Ref = make_ref(), - efka_micro_service:push_params(ServicePid, Ref, self(), Params), + efka_micro_service:push_params(ServicePid, Ref, Params), {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} end, {noreply, State}; @@ -327,12 +327,12 @@ handle_info({server_push_message, PacketId, < Ref = make_ref(), - efka_micro_service:push_metrics(ServicePid, Ref, self(), Metrics), + efka_micro_service:push_metrics(ServicePid, Ref, Metrics), {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} end; -%% 收到来自channel的回复 -handle_info({channel_reply, Ref, Reply}, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> +%% 收到来自efka_micro_service的回复 +handle_info({ems_reply, Ref, Reply}, State = #state{transport_pid = TransportPid, inflight = Inflight}) -> case maps:take(Ref, Inflight) of error -> {noreply, State}; diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 2c80733..d29968d 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -20,7 +20,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_params/4, push_metrics/4, request_params/1, request_metrics/1]). +-export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]). -export([metric_data/2, send_event/3, send_ai_event/3]). %% gen_server callbacks @@ -41,6 +41,8 @@ %% 配置信息 manifest :: undefined | efka_manifest:manifest(), + inflight = #{}, + %% 当前服务的运行状态 running_status = ?STATUS_STOPPED }). @@ -65,11 +67,11 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). -push_params(Pid, Ref, ReceiverPid, Params) when is_pid(Pid), is_binary(Params) -> - gen_server:cast(Pid, {push_params, Ref, ReceiverPid, Params}). +push_params(Pid, Ref, Params) when is_pid(Pid), is_binary(Params) -> + gen_server:cast(Pid, {push_params, Ref, self(), Params}). -push_metrics(Pid, Ref, ReceiverPid, Metrics) when is_pid(Pid), is_binary(Metrics) -> - gen_server:cast(Pid, {push_metrics, Ref, ReceiverPid, Metrics}). +push_metrics(Pid, Ref, Metrics) when is_pid(Pid), is_binary(Metrics) -> + gen_server:cast(Pid, {push_metrics, Ref, self(), Metrics}). request_params(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_params). @@ -233,22 +235,22 @@ handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = Serv {noreply, State}; %% 推送配置项目 -handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> +handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of true -> - efka_tcp_channel:push_params(ChannelPid, Ref, ReceiverPid, Params), - {noreply, State}; + efka_tcp_channel:push_params(ChannelPid, Ref, self(), Params), + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; false -> - ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}}, + ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, {reply, State} end; %% 推送采集项 -handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) -> +handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of true -> - efka_tcp_channel:push_metrics(ChannelPid, Ref, ReceiverPid, Metrics), - {noreply, State}; + efka_tcp_channel:push_metrics(ChannelPid, Ref, self(), Metrics), + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; false -> ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}}, {noreply, State} @@ -281,6 +283,16 @@ handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId, end end; +%% 处理channel的回复 +handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) -> + case maps:take(Ref, Inflight) of + error -> + {noreply, State}; + {ReceiverPid, NInflight} -> + ReceiverPid ! {ems_reply, Ref, Reply}, + {noreply, State#state{inflight = NInflight}} + end; + handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) -> lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]), {noreply, State};