From 46b782d91228eec96367b8dc8b00cc0f84edf564 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 18 Aug 2025 14:07:06 +0800 Subject: [PATCH] fix --- apps/efka/include/message_pb.hrl | 7 +- apps/efka/src/efka_remote_agent.erl | 11 +- apps/efka/src/efka_service.erl | 12 +-- apps/efka/src/proto/message_pb.erl | 159 ++++++++++++---------------- 4 files changed, 83 insertions(+), 106 deletions(-) diff --git a/apps/efka/include/message_pb.hrl b/apps/efka/include/message_pb.hrl index 461a809..ffb4ec8 100644 --- a/apps/efka/include/message_pb.hrl +++ b/apps/efka/include/message_pb.hrl @@ -31,7 +31,7 @@ -define('PUB_PB_H', true). -record(pub, {topic = <<>> :: unicode:chardata() | undefined, % = 1, optional - content = <<>> :: unicode:chardata() | undefined % = 2, optional + content = <<>> :: iodata() | undefined % = 2, optional }). -endif. @@ -64,7 +64,7 @@ -define('INVOKE_PB_H', true). -record(invoke, {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional - payload = <<>> :: unicode:chardata() | undefined, % = 2, optional + payload = <<>> :: iodata() | undefined, % = 2, optional timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits }). -endif. @@ -84,8 +84,7 @@ {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional device_uuid = <<>> :: unicode:chardata() | undefined, % = 2, optional route_key = <<>> :: unicode:chardata() | undefined, % = 3, optional - format = <<>> :: unicode:chardata() | undefined, % = 4, optional - metric = <<>> :: unicode:chardata() | undefined % = 5, optional + metric = <<>> :: iodata() | undefined % = 4, optional }). -endif. diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index d35eb80..493b77b 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -16,7 +16,7 @@ %% API -export([start_link/0]). --export([metric_data/5, event/3, ping/13, request_service_config/2, await_reply/2]). +-export([metric_data/4, event/3, ping/13, request_service_config/2, await_reply/2]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -46,9 +46,9 @@ %%%=================================================================== %% 发送数据 --spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Format :: binary(), Metric :: binary()) -> no_return(). -metric_data(ServiceId, DeviceUUID, RouteKey, Format, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Format), is_binary(Metric) -> - gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Format, Metric}). +-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> + gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}). -spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> @@ -108,12 +108,11 @@ handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _ {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; %% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia -handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Format, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> Packet = message_pb:encode_msg(#data{ service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, - format = Format, metric = Metric }), efka_transport:send(TransportPid, ?METHOD_DATA, Packet), diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index a76a52b..7a5c0b2 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -18,7 +18,7 @@ -export([start_link/2]). -export([get_name/1, get_pid/1, attach_channel/2]). -export([push_config/3, request_config/1, invoke/3]). --export([metric_data/5, send_event/3]). +-export([metric_data/4, send_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -63,9 +63,9 @@ invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload request_config(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_config). --spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Format :: binary(), Metric :: binary()) -> no_return(). -metric_data(Pid, DeviceUUID, RouteKey, Format, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Format), is_binary(Metric) -> - gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Format, Metric}). +-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> + gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}). -spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return(). send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> @@ -155,9 +155,9 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({metric_data, DeviceUUID, RouteKey, Format, Metric}, State = #state{service_id = ServiceId}) -> +handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{service_id = ServiceId}) -> lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", [ServiceId, DeviceUUID, RouteKey, Metric]), - efka_remote_agent:metric_data(ServiceId, DeviceUUID, RouteKey, Format, Metric), + efka_remote_agent:metric_data(ServiceId, DeviceUUID, RouteKey, Metric), {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) -> diff --git a/apps/efka/src/proto/message_pb.erl b/apps/efka/src/proto/message_pb.erl index c6dc024..3e6714b 100644 --- a/apps/efka/src/proto/message_pb.erl +++ b/apps/efka/src/proto/message_pb.erl @@ -222,9 +222,9 @@ encode_msg_pub(#pub{topic = F1, content = F2}, Bin, TrUserData) -> true -> begin TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) + case iolist_size(TrF2) of + 0 -> B1; + _ -> e_type_bytes(TrF2, <>, TrUserData) end end end. @@ -329,9 +329,9 @@ encode_msg_invoke(#invoke{service_id = F1, payload = F2, timeout = F3}, Bin, TrU true -> begin TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) + case iolist_size(TrF2) of + 0 -> B1; + _ -> e_type_bytes(TrF2, <>, TrUserData) end end end, @@ -382,7 +382,7 @@ encode_msg_push_service_config(#push_service_config{service_id = F1, config_json encode_msg_data(Msg, TrUserData) -> encode_msg_data(Msg, <<>>, TrUserData). -encode_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, format = F4, metric = F5}, Bin, TrUserData) -> +encode_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, metric = F4}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -413,23 +413,13 @@ encode_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, format end end end, - B4 = if F4 == undefined -> B3; - true -> - begin - TrF4 = id(F4, TrUserData), - case is_empty_string(TrF4) of - true -> B3; - false -> e_type_string(TrF4, <>, TrUserData) - end - end - end, - if F5 == undefined -> B4; + if F4 == undefined -> B3; true -> begin - TrF5 = id(F5, TrUserData), - case is_empty_string(TrF5) of - true -> B4; - false -> e_type_string(TrF5, <>, TrUserData) + TrF4 = id(F4, TrUserData), + case iolist_size(TrF4) of + 0 -> B3; + _ -> e_type_bytes(TrF4, <>, TrUserData) end end end. @@ -1289,77 +1279,70 @@ skip_32_push_service_config(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, skip_64_push_service_config(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_push_service_config(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). +decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). -dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_device_uuid(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_route_key(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<34, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_format(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<42, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, F@_5, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, format = F@_4, metric = F@_5}; -dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). +dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_device_uuid(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_data(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_route_key(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_data(<<34, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}; +dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). -dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F@_4, TrUserData) -> Key = X bsl N + Acc, case Key of - 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 18 -> d_field_data_device_uuid(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 26 -> d_field_data_route_key(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 34 -> d_field_data_format(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 42 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); + 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 18 -> d_field_data_device_uuid(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 26 -> d_field_data_route_key(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 34 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); _ -> case Key band 7 of - 0 -> skip_varint_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 1 -> skip_64_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 2 -> skip_length_delimited_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 3 -> skip_group_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); - 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) + 0 -> skip_varint_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 1 -> skip_64_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 2 -> skip_length_delimited_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 3 -> skip_group_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData); + 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData) end end; -dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, F@_5, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, format = F@_4, metric = F@_5}. +dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}. -d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, F@_3, F@_4, TrUserData). -d_field_data_device_uuid(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_device_uuid(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_device_uuid(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, F@_4, F@_5, TrUserData) -> +d_field_data_device_uuid(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_device_uuid(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_data_device_uuid(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, F@_4, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, NewFValue, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, F@_1, NewFValue, F@_3, F@_4, TrUserData). -d_field_data_route_key(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_route_key(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_route_key(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, F@_4, F@_5, TrUserData) -> +d_field_data_route_key(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_route_key(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_data_route_key(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, F@_4, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, NewFValue, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, NewFValue, F@_4, TrUserData). -d_field_data_format(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_format(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_format(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, _, F@_5, TrUserData) -> +d_field_data_metric(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_metric(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_data_metric(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, _, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, F@_3, NewFValue, F@_5, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, F@_3, NewFValue, TrUserData). -d_field_data_metric(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> d_field_data_metric(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -d_field_data_metric(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, _, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, F@_3, F@_4, NewFValue, TrUserData). +skip_varint_data(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> skip_varint_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +skip_varint_data(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). -skip_varint_data(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> skip_varint_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -skip_varint_data(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). - -skip_length_delimited_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) when N < 57 -> skip_length_delimited_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData); -skip_length_delimited_data(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +skip_length_delimited_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> skip_length_delimited_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +skip_length_delimited_data(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> Length = X bsl N + Acc, <<_:Length/binary, Rest2/binary>> = Rest, - dfp_read_field_def_data(Rest2, 0, 0, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(Rest2, 0, 0, F, F@_1, F@_2, F@_3, F@_4, TrUserData). -skip_group_data(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> +skip_group_data(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, F@_4, TrUserData) -> {_, Rest} = read_group(Bin, FNum), - dfp_read_field_def_data(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). + dfp_read_field_def_data(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, F@_4, TrUserData). -skip_32_data(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). +skip_32_data(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). -skip_64_data(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, F@_5, TrUserData). +skip_64_data(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). decode_msg_ping(Bin, TrUserData) -> dfp_read_field_def_ping(Bin, @@ -1949,8 +1932,7 @@ merge_msg_push_service_config(#push_service_config{service_id = PFservice_id, co end}. -compile({nowarn_unused_function,merge_msg_data/3}). -merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, route_key = PFroute_key, format = PFformat, metric = PFmetric}, - #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, route_key = NFroute_key, format = NFformat, metric = NFmetric}, _) -> +merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, route_key = PFroute_key, metric = PFmetric}, #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, route_key = NFroute_key, metric = NFmetric}, _) -> #data{service_id = if NFservice_id =:= undefined -> PFservice_id; true -> NFservice_id @@ -1963,10 +1945,6 @@ merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, rou if NFroute_key =:= undefined -> PFroute_key; true -> NFroute_key end, - format = - if NFformat =:= undefined -> PFformat; - true -> NFformat - end, metric = if NFmetric =:= undefined -> PFmetric; true -> NFmetric @@ -2147,7 +2125,7 @@ v_msg_pub(#pub{topic = F1, content = F2}, Path, TrUserData) -> true -> v_type_string(F1, [topic | Path], TrUserData) end, if F2 == undefined -> ok; - true -> v_type_string(F2, [content | Path], TrUserData) + true -> v_type_bytes(F2, [content | Path], TrUserData) end, ok; v_msg_pub(X, Path, _TrUserData) -> mk_type_error({expected_msg, pub}, X, Path). @@ -2198,7 +2176,7 @@ v_msg_invoke(#invoke{service_id = F1, payload = F2, timeout = F3}, Path, TrUserD true -> v_type_string(F1, [service_id | Path], TrUserData) end, if F2 == undefined -> ok; - true -> v_type_string(F2, [payload | Path], TrUserData) + true -> v_type_bytes(F2, [payload | Path], TrUserData) end, if F3 == undefined -> ok; true -> v_type_uint32(F3, [timeout | Path], TrUserData) @@ -2223,7 +2201,7 @@ v_msg_push_service_config(X, Path, _TrUserData) -> mk_type_error({expected_msg, -compile({nowarn_unused_function,v_msg_data/3}). -dialyzer({nowarn_function,v_msg_data/3}). -v_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, format = F4, metric = F5}, Path, TrUserData) -> +v_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, metric = F4}, Path, TrUserData) -> if F1 == undefined -> ok; true -> v_type_string(F1, [service_id | Path], TrUserData) end, @@ -2234,10 +2212,7 @@ v_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, format = F4, true -> v_type_string(F3, [route_key | Path], TrUserData) end, if F4 == undefined -> ok; - true -> v_type_string(F4, [format | Path], TrUserData) - end, - if F5 == undefined -> ok; - true -> v_type_string(F5, [metric | Path], TrUserData) + true -> v_type_bytes(F4, [metric | Path], TrUserData) end, ok; v_msg_data(X, Path, _TrUserData) -> mk_type_error({expected_msg, data}, X, Path). @@ -2370,6 +2345,12 @@ v_type_string(S, Path, _TrUserData) when is_list(S); is_binary(S) -> end; v_type_string(X, Path, _TrUserData) -> mk_type_error(bad_unicode_string, X, Path). +-compile({nowarn_unused_function,v_type_bytes/3}). +-dialyzer({nowarn_function,v_type_bytes/3}). +v_type_bytes(B, _Path, _TrUserData) when is_binary(B) -> ok; +v_type_bytes(B, _Path, _TrUserData) when is_list(B) -> ok; +v_type_bytes(X, Path, _TrUserData) -> mk_type_error(bad_binary_value, X, Path). + -compile({nowarn_unused_function,mk_type_error/3}). -spec mk_type_error(_, _, list()) -> no_return(). mk_type_error(Error, ValueSeen, Path) -> @@ -2415,7 +2396,7 @@ get_msg_defs() -> #field{name = token, fnum = 5, rnum = 5, type = string, occurrence = optional, opts = []}, #field{name = timestamp, fnum = 6, rnum = 6, type = uint32, occurrence = optional, opts = []}]}, {{msg, auth_reply}, [#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = message, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, - {{msg, pub}, [#field{name = topic, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = content, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, + {{msg, pub}, [#field{name = topic, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = content, fnum = 2, rnum = 3, type = bytes, occurrence = optional, opts = []}]}, {{msg, async_call_reply}, [#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = result, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, @@ -2427,7 +2408,7 @@ get_msg_defs() -> {{msg, fetch_task_log}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]}, {{msg, invoke}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, - #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, + #field{name = payload, fnum = 2, rnum = 3, type = bytes, occurrence = optional, opts = []}, #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]}, {{msg, push_service_config}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, @@ -2437,8 +2418,7 @@ get_msg_defs() -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = route_key, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}, - #field{name = format, fnum = 4, rnum = 5, type = string, occurrence = optional, opts = []}, - #field{name = metric, fnum = 5, rnum = 6, type = string, occurrence = optional, opts = []}]}, + #field{name = metric, fnum = 4, rnum = 5, type = bytes, occurrence = optional, opts = []}]}, {{msg, ping}, [#field{name = adcode, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = boot_time, fnum = 2, rnum = 3, type = uint32, occurrence = optional, opts = []}, @@ -2495,7 +2475,7 @@ find_msg_def(auth_request) -> #field{name = token, fnum = 5, rnum = 5, type = string, occurrence = optional, opts = []}, #field{name = timestamp, fnum = 6, rnum = 6, type = uint32, occurrence = optional, opts = []}]; find_msg_def(auth_reply) -> [#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = message, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; -find_msg_def(pub) -> [#field{name = topic, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = content, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; +find_msg_def(pub) -> [#field{name = topic, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = content, fnum = 2, rnum = 3, type = bytes, occurrence = optional, opts = []}]; find_msg_def(async_call_reply) -> [#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = result, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, @@ -2507,7 +2487,7 @@ find_msg_def(deploy) -> find_msg_def(fetch_task_log) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]; find_msg_def(invoke) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, - #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, + #field{name = payload, fnum = 2, rnum = 3, type = bytes, occurrence = optional, opts = []}, #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]; find_msg_def(push_service_config) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, @@ -2517,8 +2497,7 @@ find_msg_def(data) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = route_key, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}, - #field{name = format, fnum = 4, rnum = 5, type = string, occurrence = optional, opts = []}, - #field{name = metric, fnum = 5, rnum = 6, type = string, occurrence = optional, opts = []}]; + #field{name = metric, fnum = 4, rnum = 5, type = bytes, occurrence = optional, opts = []}]; find_msg_def(ping) -> [#field{name = adcode, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = boot_time, fnum = 2, rnum = 3, type = uint32, occurrence = optional, opts = []},