From 82325ac44063b42d69158c90676fd128de71ae7b Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 16:48:09 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_client.erl | 62 +++++++++++------------------------ 1 file changed, 19 insertions(+), 43 deletions(-) diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl index b1cbf1a..ef85625 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/efka_client.erl @@ -42,6 +42,7 @@ -define(PACKET_TYPE_REQUEST_PARAM, 12). %% 微服务事件上报 -define(PACKET_TYPE_EVENT, 15). +-define(PACKET_TYPE_AI_EVENT, 16). %% API -export([start_link/3]). @@ -75,8 +76,8 @@ send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) -> {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). --spec send_log(Message :: binary() | map()) -> no_return(). -send_log(Message) when is_binary(Message); is_map(Message) -> +-spec send_log(Message :: binary()) -> no_return(). +send_log(Message) when is_binary(Message) -> gen_server:cast(?MODULE, {send_log, Message}). %% efka_server为了统一,r对象为字符串;需要2次json_decode @@ -174,14 +175,6 @@ init([MicroServiceId, Host, Port]) -> handle_call({controller_process, ControllerPid}, _From, State) -> {reply, ok, State#state{controller_process = ControllerPid}}; -handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]), - Packet = <>, - - ok = gen_tcp:send(Socket, Packet), - Ref = make_ref(), - {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; - handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> Packet = <>, ok = gen_tcp:send(Socket, Packet), @@ -202,9 +195,15 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({send_log, Message}, State = #state{socket = Socket}) -> - Body = jiffy:encode(#{<<"l">> => Message}, [force_utf8]), - Packet = <<0:32, ?PACKET_TYPE_LOG:8, Body/binary>>, +handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) -> + Body = jiffy:encode(#{<<"data">> => Fields, <<"tag">> => Tags}, [force_utf8]), + Packet = <<0:32, ?PACKET_TYPE_METRIC_DATA, Body/binary>>, + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +handle_cast({send_log, LogMessage}, State = #state{socket = Socket}) -> + Packet = <<0:32, ?PACKET_TYPE_LOG:8, LogMessage/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; @@ -216,6 +215,13 @@ handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> {noreply, State}; +handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) -> + Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), + Packet = <<0:32, ?PACKET_TYPE_AI_EVENT:8, Body/binary>>, + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + handle_cast(_Info, State = #state{}) -> {noreply, State}. @@ -286,36 +292,6 @@ handle_info({tcp, <>}, {noreply, State}; -%% 收到设备状态的轮询请求 -%% #{ -%% <<"c">> => 1, -%% <<"r">> => #{ -%% <<"edge_status">> => Status, -%% <<"message">> => maps:get(Status, StatusMap) -%% } -%% } - -handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> - Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of - true -> - Ref = make_ref(), - ControllerPid ! {poll, Ref, Command}, - receive - {poll_reply, Ref, {ok, Reply}} when is_binary(Reply) -> - <<1:8, Reply/binary>>; - {poll_reply, Ref, {error, Reason}} when is_binary(Reason) -> - <<0:8, Reason/binary>> - after 5000 -> - <<0:8, "服务执行超时"/utf8>> - end; - false -> - <<0:8, "处理进程异常"/utf8>> - end, - Packet = <>, - ok = gen_tcp:send(Socket, Packet), - - {noreply, State}; - %% 其他消息为非法消息 handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> lager:debug("[efka_client] get unknown packet: ~p", [Packet]),