diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index df0d2a1..8ffb812 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -198,15 +198,13 @@ handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> - Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), - Packet = <<0:32, ?PACKET_EVENT:8, Body/binary>>, + Packet = <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; handle_cast({send_ai_event, EventType, Params}, State = #state{socket = Socket}) -> - Body = jiffy:encode(#{<<"event_type">> => EventType, <<"params">> => Params}, [force_utf8]), - Packet = <<0:32, ?PACKET_AI_EVENT:8, Body/binary>>, + Packet = <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index baa7e57..bf3dcf3 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -80,10 +80,10 @@ request_metrics(Pid) when is_pid(Pid) -> metric_data(Pid, Data) when is_pid(Pid), is_binary(Data) -> gen_server:cast(Pid, {metric_data, Data}). -send_event(Pid, EventType, Params) -> +send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> gen_server:cast(Pid, {send_event, EventType, Params}). -send_ai_event(Pid, EventType, Params) -> +send_ai_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> gen_server:cast(Pid, {send_ai_event, EventType, Params}). -spec attach_channel(pid(), pid()) -> ok | {error, Reason :: binary()}. @@ -216,7 +216,11 @@ handle_cast({metric_data, LineProtocolData}, State = #state{service = #micro_ser {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> - efka_agent:metric_data(ServiceId, DeviceUUID, efka_util:timestamp(), Tags, Fields), + efka_agent:event(ServiceId, EventType, Params), + {noreply, State}; + +handle_cast({send_ai_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> + efka_agent:ai_event(ServiceId, EventType, Params), {noreply, State}; handle_cast(_Request, State = #state{}) -> diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index 2d82d1b..d721c2b 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -33,9 +33,6 @@ %% efka下发给微服务采集项 -define(PACKET_PUSH_METRIC, 6). -%% 微服务给efka发送log消息 --define(PACKET_LOG, 9). - %% 微服务从efka获取自身的采集项 -define(PACKET_REQUEST_METRIC, 10). %% 微服务从efka获取自身的参数 @@ -162,29 +159,14 @@ handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Data/binary>>}, State = efka_micro_service:metric_data(ServicePid, Data), {noreply, State}; -%% 远程日志 -handle_info({tcp, Socket, <<0:32, ?PACKET_LOG:8, Log/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - efka_micro_service:send_log(ServicePid, Log), - {noreply, State}; - %% Event事件 -handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - case catch jiffy:decode(EventData, [return_maps]) of - #{<<"event_type">> := EventType, <<"params">> := Params} -> - efka_micro_service:send_event(ServicePid, EventType, Params); - _ -> - ok - end, +handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + efka_micro_service:send_event(ServicePid, EventType, Params), {noreply, State}; %% AIEvent事件 -handle_info({tcp, Socket, <<0:32, ?PACKET_AI_EVENT:8, EventData/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - case catch jiffy:decode(EventData, [return_maps]) of - #{<<"event_type">> := EventType, <<"params">> := Params} -> - efka_micro_service:send_ai_event(ServicePid, EventType, Params); - _ -> - ok - end, +handle_info({tcp, Socket, <<0:32, ?PACKET_AI_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> + efka_micro_service:send_ai_event(ServicePid, EventType, Params), {noreply, State}; %% 收到端上的响应