diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index f6046c4..3585940 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -19,8 +19,8 @@ -define(PACKET_COMMAND, 16#04). %% 服务器端推送消息 --define(PACKET_ASYNC_CALL, 16#05). --define(PACKET_ASYNC_CALL_REPLY, 16#06). +-define(PACKET_RPC, 16#05). +-define(PACKET_RPC_REPLY, 16#06). %% ping包,客户端主动发起 -define(PACKET_PING, 16#FF). @@ -32,23 +32,15 @@ %% 主机端上报数据类型标识 -define(METHOD_AUTH, 16#01). -define(METHOD_DATA, 16#02). --define(METHOD_PING, 16#03). --define(METHOD_INFORM, 16#04). --define(METHOD_EVENT, 16#05). --define(METHOD_PHASE, 16#06). --define(METHOD_REQUEST_SERVICE_CONFIG, 16#07). +-define(METHOD_EVENT, 16#03). +-define(METHOD_PING, 16#04). %%%% 命令类型子分类, 不需要返回值 - %% 授权 -define(COMMAND_AUTH, 16#08). %%%% 主动推送的消息类型子分类, 需要返回值 - --define(PUSH_DEPLOY, 16#01). --define(PUSH_START_CONTAINER, 16#02). --define(PUSH_STOP_CONTAINER, 16#03). - --define(PUSH_SERVICE_CONFIG, 16#04). --define(PUSH_INVOKE, 16#05). --define(PUSH_TASK_LOG, 16#06). +-define(RPC_DEPLOY, 16#01). +-define(RPC_START_CONTAINER, 16#02). +-define(RPC_STOP_CONTAINER, 16#03). +-define(RPC_CONFIG_CONTAINER, 16#04). \ No newline at end of file diff --git a/apps/efka/include/message_pb.hrl b/apps/efka/include/message_pb.hrl index bb8d00e..a9cf70c 100644 --- a/apps/efka/include/message_pb.hrl +++ b/apps/efka/include/message_pb.hrl @@ -59,12 +59,11 @@ }). -endif. --ifndef('SERVICE_CONFIG_PB_H'). --define('SERVICE_CONFIG_PB_H', true). --record(service_config, - {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional - config_json = <<>> :: unicode:chardata() | undefined, % = 2, optional - timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits +-ifndef('CONTAINER_CONFIG_PB_H'). +-define('CONTAINER_CONFIG_PB_H', true). +-record(container_config, + {container_name = <<>> :: unicode:chardata() | undefined, % = 1, optional + config = <<>> :: iodata() | undefined % = 2, optional }). -endif. diff --git a/apps/efka/src/docker/docker_container_helper.erl b/apps/efka/src/docker/docker_container_helper.erl new file mode 100644 index 0000000..19dbf4d --- /dev/null +++ b/apps/efka/src/docker/docker_container_helper.erl @@ -0,0 +1,31 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 17. 9月 2025 14:50 +%%%------------------------------------------------------------------- +-module(docker_container_helper). +-author("anlicheng"). + +%% API +-export([ensure_dir/2, get_dir/2]). + +-spec ensure_dir(RootDir :: string(), ContainerName :: binary()) -> {ok, ServerRootDir :: string()}. +ensure_dir(RootDir, ContainerName) when is_list(RootDir), is_binary(ContainerName) -> + %% 根目录 + ContainerRootDir = RootDir ++ "/" ++ binary_to_list(ContainerName) ++ "/", + ok = filelib:ensure_dir(ContainerRootDir), + {ok, ContainerRootDir}. + +-spec get_dir(RootDir :: string(), ContainerName :: binary()) -> {ok, ServerRootDir :: string()} | error. +get_dir(RootDir, ContainerName) when is_list(RootDir), is_binary(ContainerName) -> + %% 根目录 + ContainerRootDir = RootDir ++ "/" ++ binary_to_list(ContainerName) ++ "/", + case filelib:is_dir(ContainerRootDir) of + true -> + {ok, ContainerRootDir}; + false -> + error + end. diff --git a/apps/efka/src/docker/docker_manager.erl b/apps/efka/src/docker/docker_manager.erl index a8d2251..8ed9432 100644 --- a/apps/efka/src/docker/docker_manager.erl +++ b/apps/efka/src/docker/docker_manager.erl @@ -16,7 +16,7 @@ %% API -export([start_link/0]). --export([deploy/2, start_container/1, stop_container/1]). +-export([deploy/2, start_container/1, stop_container/1, config_container/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -37,6 +37,10 @@ deploy(TaskId, Config) when is_integer(TaskId), is_map(Config) -> gen_server:call(?SERVER, {deploy, TaskId, Config}). +-spec config_container(ContainerName :: binary(), Config :: binary()) -> ok | {error, Reason :: binary()}. +config_container(ContainerName, Config) when is_binary(ContainerName), is_binary(Config) -> + gen_server:call(?SERVER, {config_container, ContainerName, Config}). + -spec start_container(ServiceId :: binary()) -> ok | {error, Reason :: term()}. start_container(ContainerId) when is_binary(ContainerId) -> gen_server:call(?SERVER, {start_container, ContainerId}). @@ -75,13 +79,31 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({deploy, TaskId, Config}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> +handle_call({deploy, TaskId, Config = #{<<"container_name">> := ContainerName}}, _From, State = #state{root_dir = RootDir, task_map = TaskMap}) -> %% 创建目录 + {ok, ContainerDir} = docker_container_helper:ensure_dir(ContainerDir, ContainerName), {ok, TaskPid} = docker_deployer:start_link(TaskId, RootDir, Config), docker_deployer:deploy(TaskPid), lager:debug("[efka_inetd] start deploy task_id: ~p, config: ~p", [TaskId, Config]), {reply, ok, State#state{task_map = maps:put(TaskPid, TaskId, TaskMap)}}; +%% 处理容器关联的配置文件 +handle_call({config_container, ContainerName, Config}, _From, State = #state{root_dir = RootDir}) -> + case docker_container_helper:get_dir(RootDir, ContainerName) of + {ok, ContainerDir} -> + %% 覆盖容器的配置文件 + ConfigFile = ContainerDir ++ "/etc/config", + case file:write_file(ConfigFile, Config, [write, binary]) of + ok -> + {reply, ok, State}; + {error, Reason} -> + lager:warning("[docker_manager] write config file get error: ~p", [Reason]), + {reply, {error, <<"write config failed">>}, State} + end; + error -> + {reply, {error, <<"error">>}, State} + end; + %% 启动服务: 当前服务如果正常运行,则不允许重启 handle_call({start_container, ContainerId}, _From, State) -> case docker_commands:start_container(ContainerId) of diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index bae6601..03b60d6 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -224,85 +224,72 @@ handle_event(info, flush_cache, _, State) -> %% 激活消息 %% 微服务部署 -handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> #deploy{task_id = TaskId, config = Config0} = message_pb:decode_msg(DeployBin, deploy), case catch jiffy:decode(Config0, [return_maps]) of Config when is_map(Config) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case docker_manager:deploy(TaskId, Config) of - ok -> - #async_call_reply{code = 1, result = <<"ok">>}; - {error, Reason} when is_binary(Reason) -> - #async_call_reply{code = 0, message = Reason} - end, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)); + case docker_manager:deploy(TaskId, Config) of + ok -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + {error, Reason} when is_binary(Reason) -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_error(Reason)) + end; _Error -> - ErrorReply = #async_call_reply{code = 0, message = <<"invalid config json">>}, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(ErrorReply)) + efka_transport:async_call_reply(TransportPid, PacketId, reply_error(<<"invalid config json">>)) end, {keep_state, State}; %% 启动微服务 -handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case docker_manager:start_container(ServiceId) of - ok -> - #async_call_reply{code = 1, result = <<"ok">>}; - {error, Reason} when is_binary(Reason) -> - #async_call_reply{code = 0, message = Reason} - end, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), - + case docker_manager:start_container(ServiceId) of + ok -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + {error, Reason} when is_binary(Reason) -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_error(Reason)) + end, {keep_state, State}; %% 停止微服务 -handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case docker_manager:stop_container(ServiceId) of - ok -> - #async_call_reply{code = 1, result = <<"ok">>}; - {error, Reason} when is_binary(Reason) -> - #async_call_reply{code = 0, message = Reason} - end, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), - + case docker_manager:stop_container(ServiceId) of + ok -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + {error, Reason} when is_binary(Reason) -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_error(Reason)) + end, {keep_state, State}; -%% config.json配置信息, TODO 推送配置不是推送到服务,而是修改容器的配置文件 -handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> - #service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout} = message_pb:decode_msg(ConfigBin, push_service_config), +%% config.json配置信息 +handle_event(info, {server_rpc, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid, push_inflight = PushInflight}) -> + #container_config{container_name = ContainerName, config = Config} = message_pb:decode_msg(ConfigBin, push_service_config), - case efka_service:get_pid(ServiceId) of - undefined -> - Reply = #async_call_reply{code = 0, message = <<"service not run">>}, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), - {keep_state, State}; - ServicePid when is_pid(ServicePid) -> - Ref = make_ref(), - %% 将配置文件推送到对应的微服务 - efka_service:push_config(ServicePid, Ref, ConfigJson), - %% 处理超时逻辑 - erlang:start_timer(Timeout, self(), {request_timeout, Ref}), - - {keep_state, State#state{push_inflight = maps:put(Ref, PacketId, PushInflight)}} - end; + case docker_manager:config_container(ContainerName, Config) of + ok -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_success(<<"ok">>)); + {error, Reason} -> + efka_transport:async_call_reply(TransportPid, PacketId, reply_error(Reason)) + end, + {keep_state, State}; %% 处理task_log -handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - #fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log), - lager:debug("[efka_remote_agent] get task_log request: ~p", [TaskId]), - {ok, Logs} = efka_inetd_task_log:get_logs(TaskId), - Reply = case length(Logs) > 0 of - true -> - Result = iolist_to_binary(jiffy:encode(Logs, [force_utf8])), - #async_call_reply{code = 1, result = Result}; - false -> - #async_call_reply{code = 1, result = <<"[]">>} - end, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), - - {keep_state, State}; +%handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +% #fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log), +% lager:debug("[efka_remote_agent] get task_log request: ~p", [TaskId]), +% {ok, Logs} = efka_inetd_task_log:get_logs(TaskId), +% Reply = case length(Logs) > 0 of +% true -> +% Result = iolist_to_binary(jiffy:encode(Logs, [force_utf8])), +% #async_call_reply{code = 1, result = Result}; +% false -> +% #async_call_reply{code = 1, result = <<"[]">>} +% end, +% efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), +% +% {keep_state, State}; %% 处理命令 handle_event(info, {server_command, ?COMMAND_AUTH, <>}, StateName, State = #state{transport_pid = TransportPid}) -> @@ -408,4 +395,14 @@ auth_request() -> salt = unicode:characters_to_binary(Salt), token = unicode:characters_to_binary(Token), timestamp = efka_util:timestamp() - }). \ No newline at end of file + }). + +-spec reply_success(Result :: binary()) -> binary(). +reply_success(Result) when is_binary(Result) -> + Reply = #async_call_reply{code = 1, result = Result}, + message_pb:encode_msg(Reply). + +-spec reply_error(Message :: binary()) -> binary(). +reply_error(Message) when is_binary(Message) -> + Reply = #async_call_reply{code = 1, message = Message}, + message_pb:encode_msg(Reply). diff --git a/apps/efka/src/efka_transport.erl b/apps/efka/src/efka_transport.erl index c0641fc..c09668f 100644 --- a/apps/efka/src/efka_transport.erl +++ b/apps/efka/src/efka_transport.erl @@ -27,10 +27,7 @@ parent_pid :: pid(), host :: string(), port :: integer(), - socket :: undefined | ssl:sslsocket(), - packet_id = 1, - %% 通过packet_id建立请求和响应的关系 - inflight = #{} + socket :: undefined | ssl:sslsocket() }). %%%=================================================================== @@ -123,34 +120,29 @@ handle_cast(connect, State = #state{host = Host, port = Port, parent_pid = Paren end; %% auth校验 -handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket, packet_id = PacketId}) -> - ok = ssl:send(Socket, <>), +handle_cast({auth_request, AuthRequestBin}, State = #state{parent_pid = ParentPid, socket = Socket}) -> + ok = ssl:send(Socket, <>), %% 需要等待auth返回的结果 receive {ssl, Socket, <>} -> ParentPid ! {auth_reply, {ok, ReplyBin}}, - {noreply, State#state{packet_id = PacketId + 1}}; + {noreply, State}; {ssl, Socket, Info} -> lager:warning("[efka_transport] get invalid auth_reply: ~p", [Info]), ParentPid ! {auth_reply, {error, invalid_auth_reply}}, - {noreply, State#state{packet_id = PacketId + 1}} + {noreply, State} after 5000 -> ParentPid ! {auth_reply, {error, timeout}}, - {noreply, State#state{packet_id = PacketId + 1}} + {noreply, State} end; -%% 提交请求 -handle_cast({request, Ref, Method, ReqBin}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = ssl:send(Socket, <>), - {noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, Ref, Inflight)}}; - handle_cast({send, Method, Packet}, State = #state{socket = Socket}) -> ok = ssl:send(Socket, <>), {noreply, State}; %% 服务push的消息的回复 handle_cast({async_call_reply, PacketId, Response}, State = #state{socket = Socket}) -> - ok = ssl:send(Socket, <>), + ok = ssl:send(Socket, <>), {noreply, State}. %% @private @@ -169,20 +161,10 @@ handle_info({ssl, Socket, <>}, State = #state{socket ParentPid ! {server_pub, Topic, Content}, {noreply, State}; -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> +handle_info({ssl, Socket, <>}, State = #state{socket = Socket, parent_pid = ParentPid}) -> ParentPid ! {server_async_call, PacketId, AsyncCallBin}, {noreply, State}; -%% efka:request <-> iot:response -handle_info({ssl, Socket, <>}, State = #state{socket = Socket, inflight = Inflight, parent_pid = ParentPid}) -> - case maps:take(PacketId, Inflight) of - error -> - {noreply, State}; - {Ref, NInflight} -> - ParentPid ! {server_reply, Ref, ReplyBin}, - {noreply, State#state{inflight = NInflight}} - end; - handle_info({ssl_error, Socket, Reason}, State = #state{socket = Socket}) -> lager:debug("[efka_transport] ssl error: ~p", [Reason]), {stop, normal, State}; diff --git a/apps/efka/src/proto/message_pb.erl b/apps/efka/src/proto/message_pb.erl index 883a085..7fb539e 100644 --- a/apps/efka/src/proto/message_pb.erl +++ b/apps/efka/src/proto/message_pb.erl @@ -67,7 +67,7 @@ -type fetch_task_log() :: #fetch_task_log{}. --type service_config() :: #service_config{}. +-type container_config() :: #container_config{}. -type data() :: #data{}. @@ -75,9 +75,9 @@ -type ping() :: #ping{}. --export_type(['auth_request'/0, 'auth_reply'/0, 'pub'/0, 'async_call_reply'/0, 'deploy'/0, 'fetch_task_log'/0, 'service_config'/0, 'data'/0, 'event'/0, 'ping'/0]). --type '$msg_name'() :: auth_request | auth_reply | pub | async_call_reply | deploy | fetch_task_log | service_config | data | event | ping. --type '$msg'() :: auth_request() | auth_reply() | pub() | async_call_reply() | deploy() | fetch_task_log() | service_config() | data() | event() | ping(). +-export_type(['auth_request'/0, 'auth_reply'/0, 'pub'/0, 'async_call_reply'/0, 'deploy'/0, 'fetch_task_log'/0, 'container_config'/0, 'data'/0, 'event'/0, 'ping'/0]). +-type '$msg_name'() :: auth_request | auth_reply | pub | async_call_reply | deploy | fetch_task_log | container_config | data | event | ping. +-type '$msg'() :: auth_request() | auth_reply() | pub() | async_call_reply() | deploy() | fetch_task_log() | container_config() | data() | event() | ping(). -export_type(['$msg_name'/0, '$msg'/0]). -if(?OTP_RELEASE >= 24). @@ -110,7 +110,7 @@ encode_msg(Msg, MsgName, Opts) -> async_call_reply -> encode_msg_async_call_reply(id(Msg, TrUserData), TrUserData); deploy -> encode_msg_deploy(id(Msg, TrUserData), TrUserData); fetch_task_log -> encode_msg_fetch_task_log(id(Msg, TrUserData), TrUserData); - service_config -> encode_msg_service_config(id(Msg, TrUserData), TrUserData); + container_config -> encode_msg_container_config(id(Msg, TrUserData), TrUserData); data -> encode_msg_data(id(Msg, TrUserData), TrUserData); event -> encode_msg_event(id(Msg, TrUserData), TrUserData); ping -> encode_msg_ping(id(Msg, TrUserData), TrUserData) @@ -292,10 +292,10 @@ encode_msg_fetch_task_log(#fetch_task_log{task_id = F1}, Bin, TrUserData) -> end end. -encode_msg_service_config(Msg, TrUserData) -> encode_msg_service_config(Msg, <<>>, TrUserData). +encode_msg_container_config(Msg, TrUserData) -> encode_msg_container_config(Msg, <<>>, TrUserData). -encode_msg_service_config(#service_config{service_id = F1, config_json = F2, timeout = F3}, Bin, TrUserData) -> +encode_msg_container_config(#container_config{container_name = F1, config = F2}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -306,22 +306,13 @@ encode_msg_service_config(#service_config{service_id = F1, config_json = F2, tim end end end, - B2 = if F2 == undefined -> B1; - true -> - begin - TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) - end - end - end, - if F3 == undefined -> B2; + if F2 == undefined -> B1; true -> begin - TrF3 = id(F3, TrUserData), - if TrF3 =:= 0 -> B2; - true -> e_varint(TrF3, <>, TrUserData) + TrF2 = id(F2, TrUserData), + case iolist_size(TrF2) of + 0 -> B1; + _ -> e_type_bytes(TrF2, <>, TrUserData) end end end. @@ -698,7 +689,7 @@ decode_msg_2_doit(pub, Bin, TrUserData) -> id(decode_msg_pub(Bin, TrUserData), T decode_msg_2_doit(async_call_reply, Bin, TrUserData) -> id(decode_msg_async_call_reply(Bin, TrUserData), TrUserData); decode_msg_2_doit(deploy, Bin, TrUserData) -> id(decode_msg_deploy(Bin, TrUserData), TrUserData); decode_msg_2_doit(fetch_task_log, Bin, TrUserData) -> id(decode_msg_fetch_task_log(Bin, TrUserData), TrUserData); -decode_msg_2_doit(service_config, Bin, TrUserData) -> id(decode_msg_service_config(Bin, TrUserData), TrUserData); +decode_msg_2_doit(container_config, Bin, TrUserData) -> id(decode_msg_container_config(Bin, TrUserData), TrUserData); decode_msg_2_doit(data, Bin, TrUserData) -> id(decode_msg_data(Bin, TrUserData), TrUserData); decode_msg_2_doit(event, Bin, TrUserData) -> id(decode_msg_event(Bin, TrUserData), TrUserData); decode_msg_2_doit(ping, Bin, TrUserData) -> id(decode_msg_ping(Bin, TrUserData), TrUserData). @@ -1032,63 +1023,56 @@ skip_32_fetch_task_log(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> df skip_64_fetch_task_log(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, TrUserData) -> dfp_read_field_def_fetch_task_log(Rest, Z1, Z2, F, F@_1, TrUserData). -decode_msg_service_config(Bin, TrUserData) -> dfp_read_field_def_service_config(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(0, TrUserData), TrUserData). +decode_msg_container_config(Bin, TrUserData) -> dfp_read_field_def_container_config(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). -dfp_read_field_def_service_config(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_service_config_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -dfp_read_field_def_service_config(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_service_config_config_json(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -dfp_read_field_def_service_config(<<24, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_service_config_timeout(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -dfp_read_field_def_service_config(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #service_config{service_id = F@_1, config_json = F@_2, timeout = F@_3}; -dfp_read_field_def_service_config(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dg_read_field_def_service_config(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). +dfp_read_field_def_container_config(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_container_config_container_name(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_container_config(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_container_config_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_container_config(<<>>, 0, 0, _, F@_1, F@_2, _) -> #container_config{container_name = F@_1, config = F@_2}; +dfp_read_field_def_container_config(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_container_config(Other, Z1, Z2, F, F@_1, F@_2, TrUserData). -dg_read_field_def_service_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 32 - 7 -> dg_read_field_def_service_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -dg_read_field_def_service_config(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, TrUserData) -> +dg_read_field_def_container_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_container_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +dg_read_field_def_container_config(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) -> Key = X bsl N + Acc, case Key of - 10 -> d_field_service_config_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); - 18 -> d_field_service_config_config_json(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); - 24 -> d_field_service_config_timeout(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); + 10 -> d_field_container_config_container_name(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 18 -> d_field_container_config_config(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); _ -> case Key band 7 of - 0 -> skip_varint_service_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 1 -> skip_64_service_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 2 -> skip_length_delimited_service_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 3 -> skip_group_service_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 5 -> skip_32_service_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData) + 0 -> skip_varint_container_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 1 -> skip_64_container_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 2 -> skip_length_delimited_container_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 3 -> skip_group_container_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 5 -> skip_32_container_config(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData) end end; -dg_read_field_def_service_config(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #service_config{service_id = F@_1, config_json = F@_2, timeout = F@_3}. +dg_read_field_def_container_config(<<>>, 0, 0, _, F@_1, F@_2, _) -> #container_config{container_name = F@_1, config = F@_2}. -d_field_service_config_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_service_config_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -d_field_service_config_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, TrUserData) -> +d_field_container_config_container_name(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_container_config_container_name(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_container_config_container_name(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_service_config(RestF, 0, 0, F, NewFValue, F@_2, F@_3, TrUserData). + dfp_read_field_def_container_config(RestF, 0, 0, F, NewFValue, F@_2, TrUserData). -d_field_service_config_config_json(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_service_config_config_json(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -d_field_service_config_config_json(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, TrUserData) -> +d_field_container_config_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_container_config_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_container_config_config(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_service_config(RestF, 0, 0, F, F@_1, NewFValue, F@_3, TrUserData). + dfp_read_field_def_container_config(RestF, 0, 0, F, F@_1, NewFValue, TrUserData). -d_field_service_config_timeout(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_service_config_timeout(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -d_field_service_config_timeout(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, TrUserData) -> - {NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest}, - dfp_read_field_def_service_config(RestF, 0, 0, F, F@_1, F@_2, NewFValue, TrUserData). +skip_varint_container_config(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_container_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +skip_varint_container_config(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_container_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -skip_varint_service_config(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> skip_varint_service_config(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -skip_varint_service_config(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_service_config(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). - -skip_length_delimited_service_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> skip_length_delimited_service_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -skip_length_delimited_service_config(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) -> +skip_length_delimited_container_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_container_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +skip_length_delimited_container_config(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) -> Length = X bsl N + Acc, <<_:Length/binary, Rest2/binary>> = Rest, - dfp_read_field_def_service_config(Rest2, 0, 0, F, F@_1, F@_2, F@_3, TrUserData). + dfp_read_field_def_container_config(Rest2, 0, 0, F, F@_1, F@_2, TrUserData). -skip_group_service_config(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, TrUserData) -> +skip_group_container_config(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) -> {_, Rest} = read_group(Bin, FNum), - dfp_read_field_def_service_config(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, TrUserData). + dfp_read_field_def_container_config(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData). -skip_32_service_config(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_service_config(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). +skip_32_container_config(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_container_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -skip_64_service_config(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_service_config(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). +skip_64_container_config(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_container_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). @@ -1505,7 +1489,7 @@ merge_msgs(Prev, New, MsgName, Opts) -> async_call_reply -> merge_msg_async_call_reply(Prev, New, TrUserData); deploy -> merge_msg_deploy(Prev, New, TrUserData); fetch_task_log -> merge_msg_fetch_task_log(Prev, New, TrUserData); - service_config -> merge_msg_service_config(Prev, New, TrUserData); + container_config -> merge_msg_container_config(Prev, New, TrUserData); data -> merge_msg_data(Prev, New, TrUserData); event -> merge_msg_event(Prev, New, TrUserData); ping -> merge_msg_ping(Prev, New, TrUserData) @@ -1589,20 +1573,16 @@ merge_msg_fetch_task_log(#fetch_task_log{task_id = PFtask_id}, #fetch_task_log{t true -> NFtask_id end}. --compile({nowarn_unused_function,merge_msg_service_config/3}). -merge_msg_service_config(#service_config{service_id = PFservice_id, config_json = PFconfig_json, timeout = PFtimeout}, #service_config{service_id = NFservice_id, config_json = NFconfig_json, timeout = NFtimeout}, _) -> - #service_config{service_id = - if NFservice_id =:= undefined -> PFservice_id; - true -> NFservice_id - end, - config_json = - if NFconfig_json =:= undefined -> PFconfig_json; - true -> NFconfig_json - end, - timeout = - if NFtimeout =:= undefined -> PFtimeout; - true -> NFtimeout - end}. +-compile({nowarn_unused_function,merge_msg_container_config/3}). +merge_msg_container_config(#container_config{container_name = PFcontainer_name, config = PFconfig}, #container_config{container_name = NFcontainer_name, config = NFconfig}, _) -> + #container_config{container_name = + if NFcontainer_name =:= undefined -> PFcontainer_name; + true -> NFcontainer_name + end, + config = + if NFconfig =:= undefined -> PFconfig; + true -> NFconfig + end}. -compile({nowarn_unused_function,merge_msg_data/3}). merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, route_key = PFroute_key, metric = PFmetric}, #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, route_key = NFroute_key, metric = NFmetric}, _) -> @@ -1717,7 +1697,7 @@ verify_msg(Msg, MsgName, Opts) -> async_call_reply -> v_msg_async_call_reply(Msg, [MsgName], TrUserData); deploy -> v_msg_deploy(Msg, [MsgName], TrUserData); fetch_task_log -> v_msg_fetch_task_log(Msg, [MsgName], TrUserData); - service_config -> v_msg_service_config(Msg, [MsgName], TrUserData); + container_config -> v_msg_container_config(Msg, [MsgName], TrUserData); data -> v_msg_data(Msg, [MsgName], TrUserData); event -> v_msg_event(Msg, [MsgName], TrUserData); ping -> v_msg_ping(Msg, [MsgName], TrUserData); @@ -1806,20 +1786,17 @@ v_msg_fetch_task_log(#fetch_task_log{task_id = F1}, Path, TrUserData) -> ok; v_msg_fetch_task_log(X, Path, _TrUserData) -> mk_type_error({expected_msg, fetch_task_log}, X, Path). --compile({nowarn_unused_function,v_msg_service_config/3}). --dialyzer({nowarn_function,v_msg_service_config/3}). -v_msg_service_config(#service_config{service_id = F1, config_json = F2, timeout = F3}, Path, TrUserData) -> +-compile({nowarn_unused_function,v_msg_container_config/3}). +-dialyzer({nowarn_function,v_msg_container_config/3}). +v_msg_container_config(#container_config{container_name = F1, config = F2}, Path, TrUserData) -> if F1 == undefined -> ok; - true -> v_type_string(F1, [service_id | Path], TrUserData) + true -> v_type_string(F1, [container_name | Path], TrUserData) end, if F2 == undefined -> ok; - true -> v_type_string(F2, [config_json | Path], TrUserData) - end, - if F3 == undefined -> ok; - true -> v_type_uint32(F3, [timeout | Path], TrUserData) + true -> v_type_bytes(F2, [config | Path], TrUserData) end, ok; -v_msg_service_config(X, Path, _TrUserData) -> mk_type_error({expected_msg, service_config}, X, Path). +v_msg_container_config(X, Path, _TrUserData) -> mk_type_error({expected_msg, container_config}, X, Path). -compile({nowarn_unused_function,v_msg_data/3}). -dialyzer({nowarn_function,v_msg_data/3}). @@ -1995,10 +1972,7 @@ get_msg_defs() -> #field{name = message, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]}, {{msg, deploy}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, {{msg, fetch_task_log}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]}, - {{msg, service_config}, - [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, - #field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, - #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]}, + {{msg, container_config}, [#field{name = container_name, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = bytes, occurrence = optional, opts = []}]}, {{msg, data}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, @@ -2024,13 +1998,13 @@ get_msg_defs() -> #field{name = interfaces, fnum = 13, rnum = 14, type = string, occurrence = optional, opts = []}]}]. -get_msg_names() -> [auth_request, auth_reply, pub, async_call_reply, deploy, fetch_task_log, service_config, data, event, ping]. +get_msg_names() -> [auth_request, auth_reply, pub, async_call_reply, deploy, fetch_task_log, container_config, data, event, ping]. get_group_names() -> []. -get_msg_or_group_names() -> [auth_request, auth_reply, pub, async_call_reply, deploy, fetch_task_log, service_config, data, event, ping]. +get_msg_or_group_names() -> [auth_request, auth_reply, pub, async_call_reply, deploy, fetch_task_log, container_config, data, event, ping]. get_enum_names() -> []. @@ -2061,10 +2035,7 @@ find_msg_def(async_call_reply) -> #field{name = message, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]; find_msg_def(deploy) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; find_msg_def(fetch_task_log) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]; -find_msg_def(service_config) -> - [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, - #field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, - #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]; +find_msg_def(container_config) -> [#field{name = container_name, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = bytes, occurrence = optional, opts = []}]; find_msg_def(data) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, @@ -2152,7 +2123,7 @@ fqbin_to_msg_name(<<"Pub">>) -> pub; fqbin_to_msg_name(<<"AsyncCallReply">>) -> async_call_reply; fqbin_to_msg_name(<<"Deploy">>) -> deploy; fqbin_to_msg_name(<<"FetchTaskLog">>) -> fetch_task_log; -fqbin_to_msg_name(<<"ServiceConfig">>) -> service_config; +fqbin_to_msg_name(<<"ContainerConfig">>) -> container_config; fqbin_to_msg_name(<<"Data">>) -> data; fqbin_to_msg_name(<<"Event">>) -> event; fqbin_to_msg_name(<<"Ping">>) -> ping; @@ -2165,7 +2136,7 @@ msg_name_to_fqbin(pub) -> <<"Pub">>; msg_name_to_fqbin(async_call_reply) -> <<"AsyncCallReply">>; msg_name_to_fqbin(deploy) -> <<"Deploy">>; msg_name_to_fqbin(fetch_task_log) -> <<"FetchTaskLog">>; -msg_name_to_fqbin(service_config) -> <<"ServiceConfig">>; +msg_name_to_fqbin(container_config) -> <<"ContainerConfig">>; msg_name_to_fqbin(data) -> <<"Data">>; msg_name_to_fqbin(event) -> <<"Event">>; msg_name_to_fqbin(ping) -> <<"Ping">>; @@ -2207,7 +2178,7 @@ get_all_source_basenames() -> ["message_pb.proto"]. get_all_proto_names() -> ["message_pb"]. -get_msg_containment("message_pb") -> [async_call_reply, auth_reply, auth_request, data, deploy, event, fetch_task_log, ping, pub, service_config]; +get_msg_containment("message_pb") -> [async_call_reply, auth_reply, auth_request, container_config, data, deploy, event, fetch_task_log, ping, pub]; get_msg_containment(P) -> error({gpb_error, {badproto, P}}). @@ -2231,9 +2202,9 @@ get_proto_by_msg_name_as_fqbin(<<"Data">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"Pub">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"Event">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"AuthRequest">>) -> "message_pb"; -get_proto_by_msg_name_as_fqbin(<<"ServiceConfig">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"Ping">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"FetchTaskLog">>) -> "message_pb"; +get_proto_by_msg_name_as_fqbin(<<"ContainerConfig">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"Deploy">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"AuthReply">>) -> "message_pb"; get_proto_by_msg_name_as_fqbin(<<"AsyncCallReply">>) -> "message_pb"; diff --git a/message_pb.proto b/message_pb.proto index 8a317d6..4141775 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -44,10 +44,10 @@ message FetchTaskLog { } // 参数配置 -message ServiceConfig { - string service_id = 1; - string config_json = 2; - uint32 timeout = 3; +message ContainerConfig { + string container_name = 1; + // 任意的数据格式 + bytes config = 2; } /////// EFKA主动上报的消息类型