This commit is contained in:
anlicheng 2025-05-08 11:05:02 +08:00
parent d581929e67
commit f789e48d51
3 changed files with 32 additions and 112 deletions

View File

@ -42,7 +42,7 @@
%% API %% API
-export([start_link/3]). -export([start_link/3]).
-export([device_offline/1, device_online/1]). -export([device_offline/1, device_online/1]).
-export([send_metric_data/4, request_metric/0, request_param/0, send_event/2, controller_process/1]). -export([send_metric_data/4, request_config/0, send_event/2, controller_process/1]).
-export([test/0]). -export([test/0]).
@ -71,19 +71,9 @@ send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUI
gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}). gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}).
%% efka_server为了统一r对象为字符串2json_decode %% efka_server为了统一r对象为字符串2json_decode
-spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}. -spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}.
request_metric() -> request_config() ->
{ok, Ref} = gen_server:call(?MODULE, {request_metric, self()}), {ok, Ref} = gen_server:call(?MODULE, {request_config, self()}),
case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of
{ok, Reply} ->
{ok, jiffy:decode(Reply, [return_maps])};
Error ->
Error
end.
-spec request_param() -> {ok, Result :: map()} | {error, Reason :: any()}.
request_param() ->
{ok, Ref} = gen_server:call(?MODULE, {request_param, self()}),
case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of
{ok, Reply} -> {ok, Reply} ->
{ok, jiffy:decode(Reply, [return_maps])}; {ok, jiffy:decode(Reply, [return_maps])};
@ -166,16 +156,8 @@ handle_call({controller_process, ControllerPid}, _From, State) ->
{reply, ok, State#state{controller_process = ControllerPid}}; {reply, ok, State#state{controller_process = ControllerPid}};
%% done %% done
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_REQUEST_METRIC:8>>, Packet = <<PacketId:32, ?PACKET_REQUEST_CONFIG:8>>,
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
%% done
handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>,
ok = gen_tcp:send(Socket, Packet), ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(), Ref = make_ref(),
@ -239,15 +221,15 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>},
end; end;
%% efka推送的参数设置 %% efka推送的参数设置
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_PUSH_PARAM:8, Params/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> handle_info({tcp, Socket, <<PacketId:32, ?PACKET_PUSH_CONFIG:8, ConfigJson/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true -> true ->
Ref = make_ref(), Ref = make_ref(),
ControllerPid ! {push_param, Ref, Params}, ControllerPid ! {push_config, Ref, ConfigJson},
receive receive
{push_param_reply, Ref, ok} -> {push_config_reply, Ref, ok} ->
<<1:8>>; <<1:8>>;
{push_param_reply, Ref, {error, Reason}} when is_binary(Reason) -> {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>> <<0:8, Reason/binary>>
after 5000 -> after 5000 ->
<<0:8, "服务执行超时"/utf8>> <<0:8, "服务执行超时"/utf8>>
@ -260,29 +242,6 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_PUSH_PARAM:8, Params/binary>>},
{noreply, State}; {noreply, State};
%% efka推送的采集项消息
handle_info({tcp, <<PacketId:32, ?PACKET_PUSH_METRIC:8, Metrics/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
ControllerPid ! {push_metric, Ref, Metrics},
receive
{push_metric_reply, Ref, ok} ->
<<1:8>>;
{push_metric_reply, Ref, {error, Reason}} when is_binary(Reason) ->
<<0:8, Reason/binary>>
after 5000 ->
<<0:8, "服务执行超时"/utf8>>
end;
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% %%
handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) ->
lager:debug("[efka_client] get unknown packet: ~p", [Packet]), lager:debug("[efka_client] get unknown packet: ~p", [Packet]),

View File

@ -20,7 +20,7 @@
%% API %% API
-export([start_link/2]). -export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
-export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]). -export([push_config/3, request_config/1]).
-export([metric_data/3, send_event/3, send_ai_event/3]). -export([metric_data/3, send_event/3, send_ai_event/3]).
%% gen_server callbacks %% gen_server callbacks
@ -67,17 +67,11 @@ get_name(ServiceId) when is_binary(ServiceId) ->
get_pid(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)). whereis(get_name(ServiceId)).
push_params(Pid, Ref, Params) when is_pid(Pid), is_binary(Params) -> push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) ->
gen_server:cast(Pid, {push_params, Ref, self(), Params}). gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}).
push_metrics(Pid, Ref, Metrics) when is_pid(Pid), is_binary(Metrics) -> request_config(Pid) when is_pid(Pid) ->
gen_server:cast(Pid, {push_metrics, Ref, self(), Metrics}). gen_server:call(Pid, request_config).
request_params(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_params).
request_metrics(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_metrics).
metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) -> metric_data(Pid, DeviceUUID, Data) when is_pid(Pid), is_binary(DeviceUUID), is_binary(Data) ->
gen_server:cast(Pid, {metric_data, DeviceUUID, Data}). gen_server:cast(Pid, {metric_data, DeviceUUID, Data}).
@ -169,15 +163,10 @@ handle_call({attach_channel, ChannelPid}, _From, State = #state{channel_pid = Ol
end; end;
%% done %% done
handle_call(request_params, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) -> handle_call(request_config, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) ->
Params = micro_service_model:get_params(ServiceId), Params = micro_service_model:get_params(ServiceId),
{reply, {ok, Params}, State}; {reply, {ok, Params}, State};
%% done
handle_call(request_metrics, _From, State = #state{service_id = ServiceId, running_status = ?STATUS_RUNNING}) ->
Metrics = micro_service_model:get_metrics(ServiceId),
{reply, {ok, Metrics}, State};
%% : %% :
handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) -> handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) ->
{reply, {error, <<"service is running">>}, State}; {reply, {error, <<"service is running">>}, State};
@ -232,27 +221,16 @@ handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = Serv
{noreply, State}; {noreply, State};
%% %%
handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true -> true ->
efka_tcp_channel:push_params(ChannelPid, Ref, self(), Params), efka_tcp_channel:push_config(ChannelPid, Ref, self(), ConfigJson),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false -> false ->
ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State} {reply, State}
end; end;
%%
handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:push_metrics(ChannelPid, Ref, self(), Metrics),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false ->
ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}},
{noreply, State}
end;
handle_cast(_Request, State = #state{}) -> handle_cast(_Request, State = #state{}) ->
{noreply, State}. {noreply, State}.

View File

@ -13,7 +13,7 @@
%% API %% API
-export([start_link/1]). -export([start_link/1]).
-export([push_metrics/4, push_params/4]). -export([push_config/4]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -28,15 +28,13 @@
-define(PACKET_METRIC_DATA, 3). -define(PACKET_METRIC_DATA, 3).
%% %%
-define(PACKET_RESPONSE, 7). -define(PACKET_RESPONSE, 7).
%% efka下发给微服务参数
-define(PACKET_PUSH_PARAM, 5). %% efka下发给微服务配置
%% efka下发给微服务采集项 -define(PACKET_PUSH_CONFIG, 5).
-define(PACKET_PUSH_METRIC, 6).
%% efka获取自身的采集项 %% efka获取自身的采集项
-define(PACKET_REQUEST_METRIC, 10). -define(PACKET_REQUEST_CONFIG, 10).
%% efka获取自身的参数
-define(PACKET_REQUEST_PARAM, 12).
%% %%
-define(PACKET_EVENT, 15). -define(PACKET_EVENT, 15).
-define(PACKET_AI_EVENT, 16). -define(PACKET_AI_EVENT, 16).
@ -56,13 +54,9 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
-spec push_params(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Params :: binary()) -> no_return(). -spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return().
push_params(ChannelPid, Ref, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params), is_reference(Ref) -> push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) ->
gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}). gen_server:cast(ChannelPid, {push_config, Ref, ReceiverPid, ConfigJson}).
-spec push_metrics(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Metrics :: binary()) -> no_return().
push_metrics(ChannelPid, Ref, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) ->
gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Socket :: gen_tcp:socket()) -> -spec(start_link(Socket :: gen_tcp:socket()) ->
@ -104,13 +98,8 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% %%
handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> handle_cast({push_params, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_PARAM:8, Params/binary>>), ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_CONFIG:8, ConfigJson/binary>>),
{noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
%%
handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_METRIC:8, Metrics/binary>>),
{noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
handle_cast(_Request, State = #state{}) -> handle_cast(_Request, State = #state{}) ->
@ -139,15 +128,9 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, ServiceId/binary>>}
end; end;
%% %%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_PARAM:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_CONFIG:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
{ok, Params} = efka_micro_service:request_params(ServicePid), {ok, ConfigJson} = efka_micro_service:request_config(ServicePid),
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Params/binary>>), ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, ConfigJson/binary>>),
{noreply, State};
%%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_METRIC:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
{ok, Metrics} = efka_micro_service:request_metrics(ServicePid),
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Metrics/binary>>),
{noreply, State}; {noreply, State};
%% %%