From cceec7c11cd2a3ac9acbf9bb9928450bef00cb52 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 22:54:30 +0800 Subject: [PATCH] fix data --- README.md | 2 + apps/efka/include/message_pb.hrl | 5 +- apps/efka/src/{ => client}/efka_client.erl | 9 +- apps/efka/src/client/efka_point.erl | 49 ++++ apps/efka/src/efka_agent.erl | 13 +- apps/efka/src/efka_micro_service.erl | 10 +- apps/efka/src/efka_tcp_channel.erl | 7 +- apps/efka/src/proto/message_pb.erl | 282 ++++----------------- message_pb.proto | 7 +- 9 files changed, 115 insertions(+), 269 deletions(-) rename apps/efka/src/{ => client}/efka_client.erl (97%) create mode 100644 apps/efka/src/client/efka_point.erl diff --git a/README.md b/README.md index f677666..1dbf49f 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,8 @@ efka An OTP application +1. 先解决数据的上行问题 + Build ----- diff --git a/apps/efka/include/message_pb.hrl b/apps/efka/include/message_pb.hrl index fda4903..30cd5ce 100644 --- a/apps/efka/include/message_pb.hrl +++ b/apps/efka/include/message_pb.hrl @@ -81,10 +81,7 @@ -define('DATA_PB_H', true). -record(data, {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional - device_uuid = <<>> :: unicode:chardata() | undefined, % = 2, optional - timestamp = 0 :: integer() | undefined, % = 3, optional, 32 bits - tags = [] :: [{unicode:chardata(), unicode:chardata()}] | undefined, % = 4 - fields = <<>> :: unicode:chardata() | undefined % = 5, optional + metric = <<>> :: unicode:chardata() | undefined % = 2, optional }). -endif. diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/client/efka_client.erl similarity index 97% rename from apps/efka/src/efka_client.erl rename to apps/efka/src/client/efka_client.erl index 6406052..16b2bfc 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -195,8 +195,11 @@ handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({send_metric_data, Fields, Tags}, State = #state{socket = Socket}) -> - Body = jiffy:encode(#{<<"fields">> => Fields, <<"tags">> => Tags}, [force_utf8]), +handle_cast({send_metric_data, Measurement, Tags, Fields}, State = #state{socket = Socket}) -> + %% 基于Line Protocol实现数据的传输 + Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), + Body = efka_point:normalized(Point), + Packet = <<0:32, ?PACKET_METRIC_DATA, Body/binary>>, ok = gen_tcp:send(Socket, Packet), @@ -326,7 +329,7 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %% 采用32位编码 -spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). next_packet_id(PacketId) when PacketId >= 4294967295 -> - 0; + 1; next_packet_id(PacketId) -> PacketId + 1. diff --git a/apps/efka/src/client/efka_point.erl b/apps/efka/src/client/efka_point.erl new file mode 100644 index 0000000..11947a0 --- /dev/null +++ b/apps/efka/src/client/efka_point.erl @@ -0,0 +1,49 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 30. 5月 2023 11:28 +%%%------------------------------------------------------------------- +-module(efka_point). +-author("aresei"). + +-record(point, { + measurement, + tags = [], + fields = [], + time = 0 :: integer() +}). + +%% API +-export([new/4, normalized/1]). + +-spec new(Measurement :: binary(), Tags :: map(), Fields :: map(), Timestamp :: integer()) -> #point{}. +new(Measurement, Tags, Fields, Timestamp) when is_binary(Measurement), is_map(Tags), is_map(Fields), is_integer(Timestamp) -> + #point{measurement = Measurement, tags = Tags, fields = Fields, time = Timestamp}. + +-spec normalized(Point :: #point{}) -> binary(). +normalized(#point{measurement = Measurement, tags = Tags, fields = Fields, time = Time}) -> + NTags = lists:map(fun({N, V}) -> <> end, maps:to_list(Tags)), + NFields = lists:map(fun({K, V}) -> <> end, maps:to_list(Fields)), + + TagItems = lists:join(<<",">>, [Measurement | NTags]), + FieldItems = lists:join(<<",">>, NFields), + + erlang:iolist_to_binary([TagItems, <<" ">>, FieldItems, <<" ">>, integer_to_binary(Time)]). + +field_val(V) when is_integer(V) -> + integer_to_binary(V); +field_val(V) when is_float(V) -> + %% 默认按照浮点数表示 + S = float_to_list(V, [{decimals, 6}, compact]), + list_to_binary(S); +field_val(V) when is_binary(V) -> + <<$", V/binary, $">>; +field_val(true) -> + <<"true">>; +field_val(false) -> + <<"false">>; +field_val(_Other) -> + erlang:error(unsupported_type). \ No newline at end of file diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index a44bb8a..b844620 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -16,7 +16,7 @@ %% API -export([start_link/0]). --export([metric_data/5, event/3, ai_event/3, ping/13]). +-export([metric_data/2, event/3, ai_event/3, ping/13]). -export([feedback_phase/4, feedback_phase/5]). %% gen_server callbacks @@ -43,8 +43,8 @@ %%%=================================================================== %% 发送数据 -metric_data(ServiceId, DeviceUUID, Timestamp, Tags, Fields) when is_binary(ServiceId), is_binary(DeviceUUID), is_integer(Timestamp), is_map(Tags), is_binary(Fields) -> - gen_server:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, Timestamp, Tags, Fields}). +metric_data(ServiceId, LineProtocolData) when is_binary(ServiceId), is_binary(LineProtocolData) -> + gen_server:cast(?SERVER, {metric_data, ServiceId, LineProtocolData}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_server:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -104,13 +104,10 @@ handle_call(_Request, _From, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 发送数据 -handle_cast({metric_data, ServiceId, DeviceUUID, Timestamp, Tags, Fields}, State = #state{status = Status, transport_pid = TransportPid}) -> +handle_cast({metric_data, ServiceId, LineProtocolData}, State = #state{status = Status, transport_pid = TransportPid}) -> Packet = message_pb:encode_msg(#data{ service_id = ServiceId, - device_uuid = DeviceUUID, - timestamp = Timestamp, - tags = Tags, - fields = Fields + metric = LineProtocolData }), case Status =:= ?STATE_ACTIVATED of diff --git a/apps/efka/src/efka_micro_service.erl b/apps/efka/src/efka_micro_service.erl index 90660f8..baa7e57 100644 --- a/apps/efka/src/efka_micro_service.erl +++ b/apps/efka/src/efka_micro_service.erl @@ -21,7 +21,7 @@ -export([start_link/2]). -export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]). -export([push_params/2, push_metrics/2, request_params/1, request_metrics/1]). --export([metric_data/4, send_event/3, send_ai_event/3]). +-export([metric_data/2, send_event/3, send_ai_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -77,8 +77,8 @@ request_params(Pid) when is_pid(Pid) -> request_metrics(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_metrics). -metric_data(Pid, DeviceUUID, Tags, Fields) -> - gen_server:cast(Pid, {DeviceUUID, Tags, Fields}). +metric_data(Pid, Data) when is_pid(Pid), is_binary(Data) -> + gen_server:cast(Pid, {metric_data, Data}). send_event(Pid, EventType, Params) -> gen_server:cast(Pid, {send_event, EventType, Params}). @@ -211,8 +211,8 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({metric_data, DeviceUUID, Tags, Fields}, State = #state{service = #micro_service{service_id = ServiceId}}) -> - efka_agent:metric_data(ServiceId, DeviceUUID, efka_util:timestamp(), Tags, Fields), +handle_cast({metric_data, LineProtocolData}, State = #state{service = #micro_service{service_id = ServiceId}}) -> + efka_agent:metric_data(ServiceId, LineProtocolData), {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{service = #micro_service{service_id = ServiceId}}) -> diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index 0a141dc..2d82d1b 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -159,12 +159,7 @@ handle_info({tcp, Socket, <>}, State = #s %% 数据项 handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> - case catch jiffy:decode(Data, [return_maps]) of - #{<<"fields">> := Fields, <<"tag">> := Tags} -> - efka_micro_service:metric_data(ServicePid, Fields, Tags); - _ -> - ok - end, + efka_micro_service:metric_data(ServicePid, Data), {noreply, State}; %% 远程日志 diff --git a/apps/efka/src/proto/message_pb.erl b/apps/efka/src/proto/message_pb.erl index d9a740d..8c4ed85 100644 --- a/apps/efka/src/proto/message_pb.erl +++ b/apps/efka/src/proto/message_pb.erl @@ -88,7 +88,6 @@ -type '$msg'() :: auth_request() | auth_reply() | activate_push() | deploy() | efka_response() | topic_message() | service_params() | service_metrics() | data() | ping() | service_inform() | feedback_phase() | event() | ai_event(). -export_type(['$msg_name'/0, '$msg'/0]). --record('map',{key, value}). -if(?OTP_RELEASE >= 24). -dialyzer({no_underspecs, encode_msg/1}). -endif. @@ -378,7 +377,7 @@ encode_msg_service_metrics(#service_metrics{service_id = F1, metrics = F2}, Bin, encode_msg_data(Msg, TrUserData) -> encode_msg_data(Msg, <<>>, TrUserData). -encode_msg_data(#data{service_id = F1, device_uuid = F2, timestamp = F3, tags = F4, fields = F5}, Bin, TrUserData) -> +encode_msg_data(#data{service_id = F1, metric = F2}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -389,38 +388,13 @@ encode_msg_data(#data{service_id = F1, device_uuid = F2, timestamp = F3, tags = end end end, - B2 = if F2 == undefined -> B1; - true -> - begin - TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) - end - end - end, - B3 = if F3 == undefined -> B2; - true -> - begin - TrF3 = id(F3, TrUserData), - if TrF3 =:= 0 -> B2; - true -> e_type_int32(TrF3, <>, TrUserData) - end - end - end, - B4 = begin - TrF4 = id(F4, TrUserData), - if TrF4 == [] -> B3; - true -> e_field_data_tags(TrF4, B3, TrUserData) - end - end, - if F5 == undefined -> B4; + if F2 == undefined -> B1; true -> begin - TrF5 = id(F5, TrUserData), - case is_empty_string(TrF5) of - true -> B4; - false -> e_type_string(TrF5, <>, TrUserData) + TrF2 = id(F2, TrUserData), + case is_empty_string(TrF2) of + true -> B1; + false -> e_type_string(TrF2, <>, TrUserData) end end end. @@ -708,17 +682,6 @@ encode_msg_ai_event(#ai_event{service_id = F1, event_type = F2, params = F3}, Bi end end. -e_mfield_data_tags(Msg, Bin, TrUserData) -> - SubBin = 'encode_msg_map'(Msg, <<>>, TrUserData), - Bin2 = e_varint(byte_size(SubBin), Bin), - <>. - -e_field_data_tags([Elem | Rest], Bin, TrUserData) -> - Bin2 = <>, - Bin3 = e_mfield_data_tags('tr_encode_data.tags[x]'(Elem, TrUserData), Bin2, TrUserData), - e_field_data_tags(Rest, Bin3, TrUserData); -e_field_data_tags([], Bin, _TrUserData) -> Bin. - e_field_ping_ips([Elem | Rest], Bin, TrUserData) -> Bin2 = <>, Bin3 = e_type_string(id(Elem, TrUserData), Bin2, TrUserData), @@ -749,10 +712,6 @@ e_pfield_ping_memory([Value | Rest], Bin, TrUserData) -> e_pfield_ping_memory(Rest, Bin2, TrUserData); e_pfield_ping_memory([], Bin, _TrUserData) -> Bin. -'encode_msg_map'(#'map'{key = F1, value = F2}, Bin, TrUserData) -> - B1 = begin TrF1 = id(F1, TrUserData), e_type_string(TrF1, <>, TrUserData) end, - begin TrF2 = id(F2, TrUserData), e_type_string(TrF2, <>, TrUserData) end. - -compile({nowarn_unused_function,e_type_sint/3}). e_type_sint(Value, Bin, _TrUserData) when Value >= 0 -> e_varint(Value * 2, Bin); e_type_sint(Value, Bin, _TrUserData) -> e_varint(Value * -2 - 1, Bin). @@ -1351,77 +1310,56 @@ skip_32_service_metrics(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData skip_64_service_metrics(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_service_metrics(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(0, TrUserData), 'tr_decode_init_default_data.tags'([], TrUserData), id(<<>>, TrUserData), TrUserData). +decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). -dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_device_uuid(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<24, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_timestamp(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<34, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_tags(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<42, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_fields(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, R1, F@_5, TrUserData) -> #data{service_id = F@_1, device_uuid = F@_2, timestamp = F@_3, tags = 'tr_decode_repeated_finalize_data.tags'(R1, TrUserData), fields = F@_5}; -dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). +dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, _) -> #data{service_id = F@_1, metric = F@_2}; +dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, TrUserData). -dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) -> Key = X bsl N + Acc, case Key of - 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 18 -> d_field_data_device_uuid(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 24 -> d_field_data_timestamp(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 34 -> d_field_data_tags(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 42 -> d_field_data_fields(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); + 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 18 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); _ -> case Key band 7 of - 0 -> skip_varint_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 1 -> skip_64_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 2 -> skip_length_delimited_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 3 -> skip_group_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) + 0 -> skip_varint_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 1 -> skip_64_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 2 -> skip_length_delimited_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 3 -> skip_group_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData) end end; -dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, R1, F@_5, TrUserData) -> #data{service_id = F@_1, device_uuid = F@_2, timestamp = F@_3, tags = 'tr_decode_repeated_finalize_data.tags'(R1, TrUserData), fields = F@_5}. +dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, _) -> #data{service_id = F@_1, metric = F@_2}. -d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, TrUserData). -d_field_data_device_uuid(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_device_uuid(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_device_uuid(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, F@_4, F@_5, TrUserData) -> +d_field_data_metric(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_data_metric(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_data_metric(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, NewFValue, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, F@_1, NewFValue, TrUserData). -d_field_data_timestamp(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_timestamp(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_timestamp(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, F@_4, F@_5, TrUserData) -> - {NewFValue, RestF} = {begin <> = <<(X bsl N + Acc):32/unsigned-native>>, id(Res, TrUserData) end, Rest}, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, NewFValue, F@_4, F@_5, TrUserData). +skip_varint_data(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +skip_varint_data(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -d_field_data_tags(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_tags(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_tags(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, Prev, F@_5, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, {id('decode_msg_map'(Bs, TrUserData), TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, F@_3, 'tr_decode_repeated_add_elem_data.tags'(NewFValue, Prev, TrUserData), F@_5, TrUserData). - -d_field_data_fields(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_fields(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_fields(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, _, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, F@_3, F@_4, NewFValue, TrUserData). - -skip_varint_data(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> skip_varint_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -skip_varint_data(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). - -skip_length_delimited_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> skip_length_delimited_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -skip_length_delimited_data(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +skip_length_delimited_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +skip_length_delimited_data(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) -> Length = X bsl N + Acc, <<_:Length/binary, Rest2/binary>> = Rest, - dfp_read_field_def_data(Rest2, 0, 0, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(Rest2, 0, 0, F, F@_1, F@_2, TrUserData). -skip_group_data(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +skip_group_data(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) -> {_, Rest} = read_group(Bin, FNum), - dfp_read_field_def_data(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData). -skip_32_data(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). +skip_32_data(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -skip_64_data(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). +skip_64_data(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). decode_msg_ping(Bin, TrUserData) -> dfp_read_field_def_ping(Bin, @@ -1896,57 +1834,6 @@ skip_32_ai_event(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) skip_64_ai_event(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_ai_event(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -'decode_msg_map'(Bin, TrUserData) -> 'dfp_read_field_def_map'(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). - -'dfp_read_field_def_map'(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'd_field_map_key'(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -'dfp_read_field_def_map'(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'd_field_map_value'(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -'dfp_read_field_def_map'(<<>>, 0, 0, _, F@_1, F@_2, _) -> #'map'{key = F@_1, value = F@_2}; -'dfp_read_field_def_map'(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'dg_read_field_def_map'(Other, Z1, Z2, F, F@_1, F@_2, TrUserData). - -'dg_read_field_def_map'(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> 'dg_read_field_def_map'(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -'dg_read_field_def_map'(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) -> - Key = X bsl N + Acc, - case Key of - 10 -> 'd_field_map_key'(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); - 18 -> 'd_field_map_value'(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); - _ -> - case Key band 7 of - 0 -> 'skip_varint_map'(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 1 -> 'skip_64_map'(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 2 -> 'skip_length_delimited_map'(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 3 -> 'skip_group_map'(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 5 -> 'skip_32_map'(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData) - end - end; -'dg_read_field_def_map'(<<>>, 0, 0, _, F@_1, F@_2, _) -> #'map'{key = F@_1, value = F@_2}. - -'d_field_map_key'(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> 'd_field_map_key'(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -'d_field_map_key'(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - 'dfp_read_field_def_map'(RestF, 0, 0, F, NewFValue, F@_2, TrUserData). - -'d_field_map_value'(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> 'd_field_map_value'(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -'d_field_map_value'(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - 'dfp_read_field_def_map'(RestF, 0, 0, F, F@_1, NewFValue, TrUserData). - -'skip_varint_map'(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'skip_varint_map'(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -'skip_varint_map'(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'dfp_read_field_def_map'(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). - -'skip_length_delimited_map'(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> 'skip_length_delimited_map'(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -'skip_length_delimited_map'(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) -> - Length = X bsl N + Acc, - <<_:Length/binary, Rest2/binary>> = Rest, - 'dfp_read_field_def_map'(Rest2, 0, 0, F, F@_1, F@_2, TrUserData). - -'skip_group_map'(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) -> - {_, Rest} = read_group(Bin, FNum), - 'dfp_read_field_def_map'(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData). - -'skip_32_map'(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'dfp_read_field_def_map'(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). - -'skip_64_map'(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> 'dfp_read_field_def_map'(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). - read_group(Bin, FieldNum) -> {NumBytes, EndTagLen} = read_gr_b(Bin, 0, 0, 0, 0, FieldNum), <> = Bin, @@ -2138,28 +2025,14 @@ merge_msg_service_metrics(#service_metrics{service_id = PFservice_id, metrics = end}. -compile({nowarn_unused_function,merge_msg_data/3}). -merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, timestamp = PFtimestamp, tags = PFtags, fields = PFfields}, - #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, timestamp = NFtimestamp, tags = NFtags, fields = NFfields}, TrUserData) -> +merge_msg_data(#data{service_id = PFservice_id, metric = PFmetric}, #data{service_id = NFservice_id, metric = NFmetric}, _) -> #data{service_id = if NFservice_id =:= undefined -> PFservice_id; true -> NFservice_id end, - device_uuid = - if NFdevice_uuid =:= undefined -> PFdevice_uuid; - true -> NFdevice_uuid - end, - timestamp = - if NFtimestamp =:= undefined -> PFtimestamp; - true -> NFtimestamp - end, - tags = - if PFtags /= undefined, NFtags /= undefined -> 'tr_merge_data.tags'(PFtags, NFtags, TrUserData); - PFtags == undefined -> NFtags; - NFtags == undefined -> PFtags - end, - fields = - if NFfields =:= undefined -> PFfields; - true -> NFfields + metric = + if NFmetric =:= undefined -> PFmetric; + true -> NFmetric end}. -compile({nowarn_unused_function,merge_msg_ping/3}). @@ -2439,19 +2312,12 @@ v_msg_service_metrics(X, Path, _TrUserData) -> mk_type_error({expected_msg, serv -compile({nowarn_unused_function,v_msg_data/3}). -dialyzer({nowarn_function,v_msg_data/3}). -v_msg_data(#data{service_id = F1, device_uuid = F2, timestamp = F3, tags = F4, fields = F5}, Path, TrUserData) -> +v_msg_data(#data{service_id = F1, metric = F2}, Path, TrUserData) -> if F1 == undefined -> ok; true -> v_type_string(F1, [service_id | Path], TrUserData) end, if F2 == undefined -> ok; - true -> v_type_string(F2, [device_uuid | Path], TrUserData) - end, - if F3 == undefined -> ok; - true -> v_type_int32(F3, [timestamp | Path], TrUserData) - end, - 'v_map'(F4, [tags | Path], TrUserData), - if F5 == undefined -> ok; - true -> v_type_string(F5, [fields | Path], TrUserData) + true -> v_type_string(F2, [metric | Path], TrUserData) end, ok; v_msg_data(X, Path, _TrUserData) -> mk_type_error({expected_msg, data}, X, Path). @@ -2616,19 +2482,6 @@ v_type_string(S, Path, _TrUserData) when is_list(S); is_binary(S) -> end; v_type_string(X, Path, _TrUserData) -> mk_type_error(bad_unicode_string, X, Path). --compile({nowarn_unused_function,'v_map'/3}). --dialyzer({nowarn_function,'v_map'/3}). -'v_map'(KVs, Path, TrUserData) when is_list(KVs) -> - [case X of - {Key, Value} -> - v_type_string(Key, [key | Path], TrUserData), - v_type_string(Value, [value | Path], TrUserData); - _ -> mk_type_error(invalid_key_value_tuple, X, Path) - end - || X <- KVs], - ok; -'v_map'(X, Path, _TrUserData) -> mk_type_error(invalid_list_of_key_value_tuples, X, Path). - -compile({nowarn_unused_function,mk_type_error/3}). -spec mk_type_error(_, _, list()) -> no_return(). mk_type_error(Error, ValueSeen, Path) -> @@ -2664,43 +2517,6 @@ cons(Elem, Acc, _TrUserData) -> [Elem | Acc]. -compile({nowarn_unused_function,'erlang_++'/3}). -compile({inline,'erlang_++'/3}). 'erlang_++'(A, B, _TrUserData) -> A ++ B. --compile({inline,'tr_decode_init_default_data.tags'/2}). -'tr_decode_init_default_data.tags'(_, _) -> mt_empty_map_r(). - --compile({inline,'tr_merge_data.tags'/3}). -'tr_merge_data.tags'(X1, X2, _) -> mt_merge_maptuples_r(X1, X2). - --compile({inline,'tr_decode_repeated_finalize_data.tags'/2}). -'tr_decode_repeated_finalize_data.tags'(L, _) -> mt_finalize_items_r(L). - --compile({inline,'tr_decode_repeated_add_elem_data.tags'/3}). -'tr_decode_repeated_add_elem_data.tags'(Elem, L, _) -> mt_add_item_r(Elem, L). - --compile({inline,'tr_encode_data.tags[x]'/2}). -'tr_encode_data.tags[x]'(X, _) -> mt_maptuple_to_pseudomsg_r(X, 'map'). - --compile({inline,mt_maptuple_to_pseudomsg_r/2}). -mt_maptuple_to_pseudomsg_r({K, V}, RName) -> {RName, K, V}. - - --compile({inline,mt_empty_map_r/0}). -mt_empty_map_r() -> []. - --compile({inline,mt_add_item_r/2}). -mt_add_item_r({_RName, K, V}, Acc) -> [{K, V} | Acc]. - - --compile({inline,mt_finalize_items_r/1}). -mt_finalize_items_r(Acc) -> mt_finalize_items_r_aux(lists:reverse(Acc), dict:new()). - -mt_finalize_items_r_aux([{K, V} | Tl], D) -> mt_finalize_items_r_aux(Tl, dict:store(K, V, D)); -mt_finalize_items_r_aux([], D) -> dict:to_list(D). - - --compile({inline,mt_merge_maptuples_r/2}). -mt_merge_maptuples_r(L1, L2) -> dict:to_list(dict:merge(fun (_Key, _V1, V2) -> V2 end, dict:from_list(L1), dict:from_list(L2))). - - get_msg_defs() -> @@ -2726,12 +2542,7 @@ get_msg_defs() -> {{msg, topic_message}, [#field{name = topic, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = content, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, {{msg, service_params}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = params, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, {{msg, service_metrics}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metrics, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, - {{msg, data}, - [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, - #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, - #field{name = timestamp, fnum = 3, rnum = 4, type = int32, occurrence = optional, opts = []}, - #field{name = tags, fnum = 4, rnum = 5, type = {map, string, string}, occurrence = repeated, opts = []}, - #field{name = fields, fnum = 5, rnum = 6, type = string, occurrence = optional, opts = []}]}, + {{msg, data}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metric, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, {{msg, ping}, [#field{name = adcode, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = boot_time, fnum = 2, rnum = 3, type = uint32, occurrence = optional, opts = []}, @@ -2812,12 +2623,7 @@ find_msg_def(efka_response) -> find_msg_def(topic_message) -> [#field{name = topic, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = content, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; find_msg_def(service_params) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = params, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; find_msg_def(service_metrics) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metrics, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; -find_msg_def(data) -> - [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, - #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, - #field{name = timestamp, fnum = 3, rnum = 4, type = int32, occurrence = optional, opts = []}, - #field{name = tags, fnum = 4, rnum = 5, type = {map, string, string}, occurrence = repeated, opts = []}, - #field{name = fields, fnum = 5, rnum = 6, type = string, occurrence = optional, opts = []}]; +find_msg_def(data) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metric, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; find_msg_def(ping) -> [#field{name = adcode, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = boot_time, fnum = 2, rnum = 3, type = uint32, occurrence = optional, opts = []}, diff --git a/message_pb.proto b/message_pb.proto index 904b31c..ae5dca8 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -56,11 +56,8 @@ message ServiceMetrics { // 数据传输 message Data { string service_id = 1; - string device_uuid = 2; - int32 timestamp = 3; - map tags = 4; - // 为了数据的灵活性,存储类型无关的二进制流; 兼容更多的业务逻辑情况 - string fields = 5; + // measurement[,tag_key=tag_value...] field_key=field_value[,field_key2=field_value2...] [timestamp] + string metric = 2; } //#{<<"adcode">> => 0,<<"boot_time">> => 18256077,<<"city">> => <<>>,