diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index b0e33e1..cf5c056 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -42,7 +42,7 @@ %% API -export([start_link/3]). -export([device_offline/1, device_online/1]). --export([send_metric_data/4, request_metric/0, request_param/0, send_event/2, controller_process/1]). +-export([send_metric_data/4, request_config/0, send_event/2, controller_process/1]). -export([test/0]). @@ -71,19 +71,9 @@ send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUI gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}). %% efka_server为了统一,r对象为字符串;需要2次json_decode --spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}. -request_metric() -> - {ok, Ref} = gen_server:call(?MODULE, {request_metric, self()}), - case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of - {ok, Reply} -> - {ok, jiffy:decode(Reply, [return_maps])}; - Error -> - Error - end. - --spec request_param() -> {ok, Result :: map()} | {error, Reason :: any()}. -request_param() -> - {ok, Ref} = gen_server:call(?MODULE, {request_param, self()}), +-spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}. +request_config() -> + {ok, Ref} = gen_server:call(?MODULE, {request_config, self()}), case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of {ok, Reply} -> {ok, jiffy:decode(Reply, [return_maps])}; @@ -166,16 +156,8 @@ handle_call({controller_process, ControllerPid}, _From, State) -> {reply, ok, State#state{controller_process = ControllerPid}}; %% done -handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = <>, - ok = gen_tcp:send(Socket, Packet), - - Ref = make_ref(), - {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; - -%% done -handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = <>, +handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Packet = <>, ok = gen_tcp:send(Socket, Packet), Ref = make_ref(), @@ -239,15 +221,15 @@ handle_info({tcp, Socket, <>}, end; %% 收到efka推送的参数设置 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of true -> Ref = make_ref(), - ControllerPid ! {push_param, Ref, Params}, + ControllerPid ! {push_config, Ref, ConfigJson}, receive - {push_param_reply, Ref, ok} -> + {push_config_reply, Ref, ok} -> <<1:8>>; - {push_param_reply, Ref, {error, Reason}} when is_binary(Reason) -> + {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> <<0:8, Reason/binary>> after 5000 -> <<0:8, "服务执行超时"/utf8>> @@ -260,29 +242,6 @@ handle_info({tcp, Socket, <>}, {noreply, State}; -%% 收到efka推送的采集项消息 -handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> - Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of - true -> - Ref = make_ref(), - ControllerPid ! {push_metric, Ref, Metrics}, - receive - {push_metric_reply, Ref, ok} -> - <<1:8>>; - {push_metric_reply, Ref, {error, Reason}} when is_binary(Reason) -> - <<0:8, Reason/binary>> - after 5000 -> - <<0:8, "服务执行超时"/utf8>> - end; - false -> - <<0:8, "处理进程异常"/utf8>> - end, - - Packet = <>, - ok = gen_tcp:send(Socket, Packet), - - {noreply, State}; - %% 其他消息为非法消息 handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> lager:debug("[efka_client] get unknown packet: ~p", [Packet]), diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 7a02b42..817776d 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -20,7 +20,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). --export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]). +-export([push_config/3, request_config/1]). -export([metric_data/3, send_event/3, send_ai_event/3]). %% gen_server callbacks @@ -67,17 +67,11 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). -push_params(Pid, Ref, Params) when is_pid(Pid), is_binary(Params) -> - gen_server:cast(Pid, {push_params, Ref, self(), Params}). +push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) -> + gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}). -push_metrics(Pid, Ref, Metrics) when is_pid(Pid), is_binary(Metrics) -> - gen_server:cast(Pid, {push_metrics, Ref, self(), Metrics}). - -request_params(Pid) when is_pid(Pid) -> - gen_server:call(Pid, request_params). - -request_metrics(Pid) when is_pid(Pid) -> - gen_server:call(Pid, request_metrics). +request_config(Pid) when is_pid(Pid) -> + gen_server:call(Pid, request_config). metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) -> gen_server:cast(Pid, {metric_data, DeviceUUID, Data}). @@ -169,15 +163,10 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol end; %% 请求参数项 done -handle_call(request_params, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> +handle_call(request_config, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> Params = micro_service_model:get_params(ServiceId), {reply, {ok, Params}, State}; -%% 请求采集项 done -handle_call(request_metrics, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> - Metrics = micro_service_model:get_metrics(ServiceId), - {reply, {ok, Metrics}, State}; - %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> {reply, {error, <<"service is running">>}, State}; @@ -232,27 +221,16 @@ handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = Serv {noreply, State}; %% 推送配置项目 -handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> +handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of true -> - efka_tcp_channel:push_params(ChannelPid, Ref, self(), Params), + efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson), {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; false -> ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, {reply, State} end; -%% 推送采集项 -handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> - case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of - true -> - efka_tcp_channel:push_metrics(ChannelPid, Ref, self(), Metrics), - {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; - false -> - ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}}, - {noreply, State} - end; - handle_cast(_Request, State = #state{}) -> {noreply, State}. diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index c153816..e6e0a97 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -13,7 +13,7 @@ %% API -export([start_link/1]). --export([push_metrics/4, push_params/4]). +-export([push_config/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -28,15 +28,13 @@ -define(PACKET_METRIC_DATA, 3). %% 消息响应 -define(PACKET_RESPONSE, 7). -%% efka下发给微服务参数 --define(PACKET_PUSH_PARAM, 5). -%% efka下发给微服务采集项 --define(PACKET_PUSH_METRIC, 6). + +%% efka下发给微服务配置 +-define(PACKET_PUSH_CONFIG, 5). %% 微服务从efka获取自身的采集项 --define(PACKET_REQUEST_METRIC, 10). -%% 微服务从efka获取自身的参数 --define(PACKET_REQUEST_PARAM, 12). +-define(PACKET_REQUEST_CONFIG, 10). + %% 微服务事件上报 -define(PACKET_EVENT, 15). -define(PACKET_AI_EVENT, 16). @@ -56,13 +54,9 @@ %%% API %%%=================================================================== --spec push_params(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Params :: binary()) -> no_return(). -push_params(ChannelPid, Ref, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params), is_reference(Ref) -> - gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}). - --spec push_metrics(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Metrics :: binary()) -> no_return(). -push_metrics(ChannelPid, Ref, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) -> - gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}). +-spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). +push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> + gen_server:cast(ChannelPid, {push_config, Ref, ReceiverPid, ConfigJson}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Socket :: gen_tcp:socket()) -> @@ -104,13 +98,8 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 推送参数项目 -handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; - -%% 推送采集项目 -handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), +handle_cast({push_params, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; handle_cast(_Request, State = #state{}) -> @@ -139,15 +128,9 @@ handle_info({tcp, Socket, <>} end; %% 请求参数 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - {ok, Params} = efka_micro_service:request_params(ServicePid), - ok = gen_tcp:send(Socket, <>), - {noreply, State}; - -%% 请求采集项目 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - {ok, Metrics} = efka_micro_service:request_metrics(ServicePid), - ok = gen_tcp:send(Socket, <>), +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + {ok, ConfigJson} = efka_micro_service:request_config(ServicePid), + ok = gen_tcp:send(Socket, <>), {noreply, State}; %% 数据项