From 6b16612214a2be22547ef63d76d3513cfbd95601 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 16 Sep 2025 16:40:11 +0800 Subject: [PATCH] fix data --- apps/efka/include/message_pb.hrl | 2 +- apps/efka/src/docker/efka_container.erl | 91 +++++++------------- apps/efka/src/docker/efka_docker_command.erl | 20 ++++- apps/efka/src/efka_remote_agent.erl | 20 ++--- apps/efka/src/proto/message_pb.erl | 30 +++---- message_pb.proto | 2 +- 6 files changed, 76 insertions(+), 89 deletions(-) diff --git a/apps/efka/include/message_pb.hrl b/apps/efka/include/message_pb.hrl index ffb4ec8..4accfaa 100644 --- a/apps/efka/include/message_pb.hrl +++ b/apps/efka/include/message_pb.hrl @@ -81,7 +81,7 @@ -ifndef('DATA_PB_H'). -define('DATA_PB_H', true). -record(data, - {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional + {meta_tag = <<>> :: unicode:chardata() | undefined, % = 1, optional device_uuid = <<>> :: unicode:chardata() | undefined, % = 2, optional route_key = <<>> :: unicode:chardata() | undefined, % = 3, optional metric = <<>> :: iodata() | undefined % = 4, optional diff --git a/apps/efka/src/docker/efka_container.erl b/apps/efka/src/docker/efka_container.erl index 78b3e16..7851dc4 100644 --- a/apps/efka/src/docker/efka_container.erl +++ b/apps/efka/src/docker/efka_container.erl @@ -14,8 +14,11 @@ -behaviour(gen_server). +-define(STATUS_RUNNING, running). +-define(STATUS_STOPPED, stopped). + %% API --export([start_link/3]). +-export([start_link/2]). -export([get_name/1, get_pid/1, attach_channel/3]). -export([push_envs/3, invoke/3]). -export([metric_data/4, send_event/3]). @@ -24,7 +27,6 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - %service_id :: binary(), container_id :: binary(), %% 通道id信息 channel_pid :: pid() | undefined, @@ -32,11 +34,10 @@ meta :: binary(), %% 当前进程的port信息, OSPid = erlang:port_info(Port, os_pid) port :: undefined | port(), - %% 系统对应的pid - os_pid :: undefined | integer(), - %% 配置信息 - manifest :: undefined | efka_manifest:manifest(), + inflight = #{}, + %% 容器的运行状态 + status = ?STATUS_STOPPED, %% 映射关系: #{Ref => Fun} callbacks = #{} @@ -75,10 +76,10 @@ attach_channel(Pid, ChannelPid, Meta) when is_pid(Pid), is_pid(ChannelPid), is_b gen_server:call(Pid, {attach_channel, ChannelPid, Meta}). %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Name :: atom(), ContainerId :: binary(), Args :: [binary()]) -> +-spec(start_link(Name :: atom(), ContainerId :: binary()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Name, ContainerId, Args) when is_atom(Name), is_binary(ContainerId), is_list(Args) -> - gen_server:start_link({local, Name}, ?MODULE, [ContainerId, Args], []). +start_link(Name, ContainerId) when is_atom(Name), is_binary(ContainerId) -> + gen_server:start_link({local, Name}, ?MODULE, [ContainerId], []). %%%=================================================================== %%% gen_server callbacks @@ -89,18 +90,16 @@ start_link(Name, ContainerId, Args) when is_atom(Name), is_binary(ContainerId), -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ContainerId, Args]) -> +init([ContainerId]) -> %% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用 erlang:process_flag(trap_exit, true), - - case startup(<<>>, Args) of - {ok, Port} -> - {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_service] service: ~p, port: ~p, boot_service success os_pid: ~p", [ContainerId, Port, OSPid]), - {ok, #state{container_id = ContainerId, port = Port, os_pid = OSPid}}; - {error, Reason} -> - lager:debug("[efka_service] service: ~p", [ContainerId]), - {stop, Reason} + case efka_docker_command:is_container_running(ContainerId) of + true -> + {ok, Port} = wait_container(ContainerId), + {ok, #state{container_id = ContainerId, status = ?STATUS_RUNNING, port = Port}}; + false -> + lager:debug("[efka_service] container_id: ~p", [ContainerId]), + {ok, #state{container_id = ContainerId, status = ?STATUS_STOPPED}} end. %% @private @@ -136,12 +135,12 @@ handle_call(_Request, _From, State = #state{}) -> handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{container_id = ContainerId, meta = Meta}) -> lager:debug("[efka_service] container_id: ~p, meta: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", [ContainerId, Meta, DeviceUUID, RouteKey, Metric]), - %% 这里的数据需要转换成和meta相关的数据, todo container_id的数据是否需要加上 - efka_remote_agent:metric_data(ContainerId, Meta, DeviceUUID, RouteKey, Metric), + %% 这里的数据需要转换成和meta相关的数据 + efka_remote_agent:metric_data(Meta, DeviceUUID, RouteKey, Metric), {noreply, State}; handle_cast({send_event, EventType, Params}, State = #state{container_id = ContainerId, meta = Meta}) -> - efka_remote_agent:event(ContainerId, Meta, EventType, Params), + efka_remote_agent:event(Meta, EventType, Params), lager:debug("[efka_service] send_event, container_id: ~p, meta: ~p, event_type: ~p, params: ~p", [ContainerId, Meta, EventType, Params]), {noreply, State}; @@ -178,19 +177,6 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 重启服务 -handle_info({timeout, _, reboot_service}, State = #state{container_id = ContainerId, manifest = Manifest}) -> - case startup(Manifest, []) of - {ok, Port} -> - {os_pid, OSPid} = erlang:port_info(Port, os_pid), - lager:debug("[efka_service] service_id: ~p, reboot success, port: ~p, os_pid: ~p", [ContainerId, Port, OSPid]), - {noreply, State#state{port = Port, os_pid = OSPid}}; - {error, Reason} -> - lager:debug("[efka_service] service_id: ~p, boot_service get error: ~p", [ContainerId, Reason]), - try_reboot(), - {noreply, State} - end; - %% 处理channel的回复 handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) -> case maps:take(Ref, Inflight) of @@ -209,13 +195,12 @@ handle_info({Port, {data, Data}}, State = #state{container_id = ContainerId}) wh %% 处理port的消息, Port的被动关闭会触发;因此这个时候的Port和State.port的值是相等的 handle_info({Port, {exit_status, Code}}, State = #state{container_id = ContainerId}) when is_port(Port) -> lager:debug("[efka_service] service_id: ~p, port: ~p, exit with code: ~p", [ContainerId, Port, Code]), - {noreply, State#state{port = undefined, os_pid = undefined}}; + {noreply, State#state{port = undefined}}; %% 处理port的退出消息 handle_info({'EXIT', Port, Reason}, State = #state{container_id = ContainerId}) when is_port(Port) -> lager:debug("[efka_service] service_id: ~p, port: ~p, exit with reason: ~p", [ContainerId, Port, Reason]), - try_reboot(), - {noreply, State#state{port = undefined, os_pid = undefined}}; + {noreply, State#state{port = undefined}}; %% 处理channel进程的退出 handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, container_id = ContainerId}) -> @@ -229,9 +214,8 @@ handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_ %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(Reason, _State = #state{container_id = ContainerId, port = Port, os_pid = OSPid}) -> +terminate(Reason, _State = #state{container_id = ContainerId, port = Port}) -> erlang:is_port(Port) andalso erlang:port_close(Port), - catch kill_os_pid(OSPid), lager:debug("[efka_service] service_id: ~p, terminate with reason: ~p", [ContainerId, Reason]), ok. @@ -247,32 +231,17 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -startup(ExecCmd0, Args0) -> - PortSettings = [ - {args, [binary_to_list(A) || A <- Args0]}, - exit_status - ], - ExecCmd = binary_to_list(ExecCmd0), - case catch erlang:open_port({spawn_executable, ExecCmd}, PortSettings) of +-spec wait_container(ContainerId :: binary()) -> {ok, Port :: erlang:port()} | error. +wait_container(ContainerId) when is_binary(ContainerId) -> + PortSettings = [stream, exit_status, use_stdio, binary], + ExecCmd = "docker wait " ++ binary_to_list(ContainerId), + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of Port when is_port(Port) -> {ok, Port}; _Other -> - {error, <<"exec command startup failed">>} + error end. -%% 关闭系统进程 --spec kill_os_pid(port() | undefined) -> no_return(). -kill_os_pid(undefined) -> - ok; -kill_os_pid(OSPid) when is_integer(OSPid) -> - Cmd = lists:flatten(io_lib:format("kill -9 ~p", [OSPid])), - lager:debug("kill cmd is: ~p", [Cmd]), - os:cmd(Cmd). - --spec try_reboot() -> no_return(). -try_reboot() -> - erlang:start_timer(5000, self(), reboot_service). - -spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). trigger_callback(Ref, Callbacks) -> case maps:take(Ref, Callbacks) of diff --git a/apps/efka/src/docker/efka_docker_command.erl b/apps/efka/src/docker/efka_docker_command.erl index 3eb3101..d9c5bc3 100644 --- a/apps/efka/src/docker/efka_docker_command.erl +++ b/apps/efka/src/docker/efka_docker_command.erl @@ -11,7 +11,7 @@ %% API -export([pull_image/1, check_image_exist/1]). --export([create_container/3, check_container_exist/1]). +-export([create_container/3, check_container_exist/1, is_container_running/1]). -spec pull_image(Image :: binary()) -> ok | {error, Reason :: any()}. pull_image(Image) when is_binary(Image) -> @@ -65,6 +65,24 @@ create_container(ContainerName, ContainerDir, Config) when is_binary(ContainerNa {error, <<"exec command startup failed">>} end. +-spec is_container_running(ContainerId :: binary()) -> boolean(). +is_container_running(ContainerId) when is_binary(ContainerId) -> + %% todo 重定向错误流 {stderr_to_stdout, true} + PortSettings = [stream, exit_status, use_stdio, binary], + ExecCmd = "docker inspect -f '{{.State.Running}}' " ++ binary_to_list(ContainerId), + case catch erlang:open_port({spawn, ExecCmd}, PortSettings) of + Port when is_port(Port) -> + case gather_output(Port) of + {0, Val0} -> + Val = string:trim(Val0), + Val =:= <<"true">>; + _ -> + false + end; + _Error -> + false + end. + -spec check_image_exist(Image :: binary()) -> boolean(). check_image_exist(Image) when is_binary(Image) -> PortSettings = [stream, exit_status, use_stdio, binary], diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 8638e0c..2ea035b 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -46,13 +46,13 @@ %%%=================================================================== %% 发送数据 --spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). -metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> - gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}). +-spec metric_data(MetaTag :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(MetaTag, DeviceUUID, RouteKey, Metric) when is_binary(MetaTag), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> + gen_statem:cast(?SERVER, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}). --spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). -event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> - gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}). +-spec event(MetaTag :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). +event(MetaTag, EventType, Params) when is_binary(MetaTag), is_integer(EventType), is_binary(Params) -> + gen_statem:cast(?SERVER, {event, MetaTag, EventType, Params}). ping(AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces) -> gen_statem:cast(?SERVER, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelArch, Ips, CpuCore, CpuLoad, CpuTemperature, Disk, Memory, Interfaces}). @@ -108,9 +108,9 @@ handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _ {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; %% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia -handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {metric_data, MetaTag, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> Packet = message_pb:encode_msg(#data{ - service_id = ServiceId, + meta_tag = MetaTag, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric @@ -118,9 +118,9 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STAT efka_transport:send(TransportPid, ?METHOD_DATA, Packet), {keep_state, State}; -handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) -> +handle_event(cast, {metric_data, MetaTag, DeviceUUID, LineProtocolData}, _, State) -> Packet = message_pb:encode_msg(#data{ - service_id = ServiceId, + meta_tag = MetaTag, device_uuid = DeviceUUID, metric = LineProtocolData }), diff --git a/apps/efka/src/proto/message_pb.erl b/apps/efka/src/proto/message_pb.erl index 3e6714b..ec94374 100644 --- a/apps/efka/src/proto/message_pb.erl +++ b/apps/efka/src/proto/message_pb.erl @@ -382,7 +382,7 @@ encode_msg_push_service_config(#push_service_config{service_id = F1, config_json encode_msg_data(Msg, TrUserData) -> encode_msg_data(Msg, <<>>, TrUserData). -encode_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, metric = F4}, Bin, TrUserData) -> +encode_msg_data(#data{meta_tag = F1, device_uuid = F2, route_key = F3, metric = F4}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -1281,18 +1281,18 @@ skip_64_push_service_config(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). -dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_meta_tag(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_device_uuid(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); dfp_read_field_def_data(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_route_key(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); dfp_read_field_def_data(<<34, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData); -dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}; +dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{meta_tag = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}; dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, F@_4, TrUserData). dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F@_4, TrUserData) -> Key = X bsl N + Acc, case Key of - 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); + 10 -> d_field_data_meta_tag(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); 18 -> d_field_data_device_uuid(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); 26 -> d_field_data_route_key(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); 34 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, F@_3, F@_4, TrUserData); @@ -1305,10 +1305,10 @@ dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, F 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, F@_4, TrUserData) end end; -dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{service_id = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}. +dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, F@_4, _) -> #data{meta_tag = F@_1, device_uuid = F@_2, route_key = F@_3, metric = F@_4}. -d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); -d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, TrUserData) -> +d_field_data_meta_tag(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData) when N < 57 -> d_field_data_meta_tag(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, F@_4, TrUserData); +d_field_data_meta_tag(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, F@_4, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, F@_3, F@_4, TrUserData). @@ -1932,10 +1932,10 @@ merge_msg_push_service_config(#push_service_config{service_id = PFservice_id, co end}. -compile({nowarn_unused_function,merge_msg_data/3}). -merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, route_key = PFroute_key, metric = PFmetric}, #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, route_key = NFroute_key, metric = NFmetric}, _) -> - #data{service_id = - if NFservice_id =:= undefined -> PFservice_id; - true -> NFservice_id +merge_msg_data(#data{meta_tag = PFmeta_tag, device_uuid = PFdevice_uuid, route_key = PFroute_key, metric = PFmetric}, #data{meta_tag = NFmeta_tag, device_uuid = NFdevice_uuid, route_key = NFroute_key, metric = NFmetric}, _) -> + #data{meta_tag = + if NFmeta_tag =:= undefined -> PFmeta_tag; + true -> NFmeta_tag end, device_uuid = if NFdevice_uuid =:= undefined -> PFdevice_uuid; @@ -2201,9 +2201,9 @@ v_msg_push_service_config(X, Path, _TrUserData) -> mk_type_error({expected_msg, -compile({nowarn_unused_function,v_msg_data/3}). -dialyzer({nowarn_function,v_msg_data/3}). -v_msg_data(#data{service_id = F1, device_uuid = F2, route_key = F3, metric = F4}, Path, TrUserData) -> +v_msg_data(#data{meta_tag = F1, device_uuid = F2, route_key = F3, metric = F4}, Path, TrUserData) -> if F1 == undefined -> ok; - true -> v_type_string(F1, [service_id | Path], TrUserData) + true -> v_type_string(F1, [meta_tag | Path], TrUserData) end, if F2 == undefined -> ok; true -> v_type_string(F2, [device_uuid | Path], TrUserData) @@ -2415,7 +2415,7 @@ get_msg_defs() -> #field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]}, {{msg, data}, - [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, + [#field{name = meta_tag, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = route_key, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}, #field{name = metric, fnum = 4, rnum = 5, type = bytes, occurrence = optional, opts = []}]}, @@ -2494,7 +2494,7 @@ find_msg_def(push_service_config) -> #field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]; find_msg_def(data) -> - [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, + [#field{name = meta_tag, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = route_key, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}, #field{name = metric, fnum = 4, rnum = 5, type = bytes, occurrence = optional, opts = []}]; diff --git a/message_pb.proto b/message_pb.proto index 9c27553..836cd58 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -61,7 +61,7 @@ message PushServiceConfig { // 数据传输 message Data { - string service_id = 1; + string meta_tag = 1; string device_uuid = 2; string route_key = 3; // measurement[,tag_key=tag_value...] field_key=field_value[,field_key2=field_value2...] [timestamp]