This commit is contained in:
anlicheng 2025-05-06 22:57:55 +08:00
parent cceec7c11c
commit c0d46c69c1

View File

@ -33,9 +33,6 @@
%% efka下发给微服务采集项 %% efka下发给微服务采集项
-define(PACKET_PUSH_METRIC, 6). -define(PACKET_PUSH_METRIC, 6).
%% efka发送log消息
-define(PACKET_LOG, 9).
%% efka获取自身的采集项 %% efka获取自身的采集项
-define(PACKET_REQUEST_METRIC, 10). -define(PACKET_REQUEST_METRIC, 10).
%% efka获取自身的参数 %% efka获取自身的参数
@ -47,7 +44,7 @@
%% API %% API
-export([start_link/3]). -export([start_link/3]).
-export([device_offline/1, device_online/1]). -export([device_offline/1, device_online/1]).
-export([send_metric_data/2, send_log/1, request_metric/0, request_param/0, send_event/2, controller_process/1]). -export([send_metric_data/3, request_metric/0, request_param/0, send_event/2, controller_process/1]).
-export([test/0]). -export([test/0]).
@ -71,14 +68,9 @@ test() ->
controller_process(ControllerPid) when is_pid(ControllerPid) -> controller_process(ControllerPid) when is_pid(ControllerPid) ->
gen_server:call(?MODULE, {controller_process, ControllerPid}). gen_server:call(?MODULE, {controller_process, ControllerPid}).
-spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}. -spec send_metric_data(Measurement :: binary(), Tags :: map(), Fields :: map()) -> no_return().
send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) -> send_metric_data(Measurement, Tags, Fields) when is_binary(Measurement), is_map(Fields), is_map(Tags) ->
{ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), gen_server:cast(?MODULE, {send_metric_data, Measurement, Tags, Fields}).
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
-spec send_log(Message :: binary()) -> no_return().
send_log(Message) when is_binary(Message) ->
gen_server:cast(?MODULE, {send_log, Message}).
%% efka_server为了统一r对象为字符串2json_decode %% efka_server为了统一r对象为字符串2json_decode
-spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}. -spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}.
@ -205,12 +197,6 @@ handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket
{noreply, State}; {noreply, State};
handle_cast({send_log, LogMessage}, State = #state{socket = Socket}) ->
Packet = <<0:32, ?PACKET_LOG:8, LogMessage/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]),
Packet = <<0:32, ?PACKET_EVENT:8, Body/binary>>, Packet = <<0:32, ?PACKET_EVENT:8, Body/binary>>,