From 0b0043d98dc48622c7fd9f091829cbdec0c80697 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 16 Sep 2025 17:13:23 +0800 Subject: [PATCH] add docker events --- apps/efka/include/efka.hrl | 4 +- apps/efka/include/message_pb.hrl | 3 +- apps/efka/src/docker/efka_docker_events.erl | 150 ++++++++++++++++++++ apps/efka/src/efka_remote_agent.erl | 29 ++-- apps/efka/src/efka_sup.erl | 9 ++ apps/efka/src/proto/message_pb.erl | 118 ++++++--------- message_pb.proto | 4 +- 7 files changed, 225 insertions(+), 92 deletions(-) create mode 100644 apps/efka/src/docker/efka_docker_events.erl diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index fcd8c1f..f6046c4 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -46,8 +46,8 @@ %%%% 主动推送的消息类型子分类, 需要返回值 -define(PUSH_DEPLOY, 16#01). --define(PUSH_START_SERVICE, 16#02). --define(PUSH_STOP_SERVICE, 16#03). +-define(PUSH_START_CONTAINER, 16#02). +-define(PUSH_STOP_CONTAINER, 16#03). -define(PUSH_SERVICE_CONFIG, 16#04). -define(PUSH_INVOKE, 16#05). diff --git a/apps/efka/include/message_pb.hrl b/apps/efka/include/message_pb.hrl index 4accfaa..785b06f 100644 --- a/apps/efka/include/message_pb.hrl +++ b/apps/efka/include/message_pb.hrl @@ -48,8 +48,7 @@ -define('DEPLOY_PB_H', true). -record(deploy, {task_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits - service_id = <<>> :: unicode:chardata() | undefined, % = 2, optional - tar_url = <<>> :: unicode:chardata() | undefined % = 3, optional + config = <<>> :: unicode:chardata() | undefined % = 2, optional }). -endif. diff --git a/apps/efka/src/docker/efka_docker_events.erl b/apps/efka/src/docker/efka_docker_events.erl new file mode 100644 index 0000000..923f230 --- /dev/null +++ b/apps/efka/src/docker/efka_docker_events.erl @@ -0,0 +1,150 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 16. 9月 2025 16:48 +%%%------------------------------------------------------------------- +-module(efka_docker_events). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([monitor_container/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + port, + %% 观察者 + monitors = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +monitor_container(ReceiverPid, ContainerId) when is_pid(ReceiverPid), is_binary(ContainerId) -> + gen_server:cast(?SERVER, {monitor_container, ReceiverPid, ContainerId}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + process_flag(trap_exit, true), + try_attach_events(0), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({monitor_container, ReceiverPid, ContainerId}, State = #state{monitors = Monitors}) -> + MRef = erlang:monitor(process, ReceiverPid), + {noreply, State#state{monitors = maps:put(ContainerId, {ReceiverPid, MRef}, Monitors)}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, attach_docker_events}, State = #state{port = undefined}) -> + ExecCmd = "docker events --format \"{{json .}}\"", + case catch erlang:open_port({spawn, ExecCmd}, [exit_status, {line, 10239}, use_stdio, stderr_to_stdout, binary]) of + Port when is_port(Port) -> + {noreply, State#state{port = Port}}; + _Error -> + try_attach_events(5000), + {noreply, State} + end; +handle_info({Port, {data, BinLine}}, State = #state{port = Port}) -> + Event = catch jiffy:decode(BinLine, [return_maps]), + lager:debug("event: ~p", [Event]), + handle_event(Event, State), + {noreply, State}; + +%% 进程退出的时候删除掉管理的Pid +handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #state{monitors = Monitors}) -> + NMonitors = maps:filter(fun(_Key, {_, Ref}) -> MRef =/= Ref end, Monitors), + {noreply, State#state{monitors = NMonitors}}; + +%% Port退出的时候,尝试重启 +handle_info({'EXIT', Port, Reason}, State = #state{port = Port}) -> + lager:warning("[efka_docker_events] exit with reason: ~p", [Reason]), + try_attach_events(5000), + {noreply, State#state{port = undefined}}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== +handle_event(#{type := <<"container">>, status := Status, id := Id}, #state{monitors = Monitors}) -> + case maps:find(Id, Monitors) of + error -> + ok; + {ok, {ReceiverPid, _}} -> + case Status of + <<"start">> -> + ReceiverPid ! {docker_events, start}; + <<"stop">> -> + ReceiverPid ! {docker_events, stop}; + _ -> + ok + end + end; +handle_event(_, _) -> + ok. + +try_attach_events(Timeout) -> + erlang:start_timer(Timeout, self(), attach_docker_events). diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 2ea035b..c93023d 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -236,21 +236,26 @@ handle_event(info, flush_cache, _, State) -> %% 微服务部署 handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - #deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy), - - %% 短暂的等待,efka_inetd收到消息后就立即返回了 - Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of - ok -> - #async_call_reply{code = 1, result = <<"ok">>}; - {error, Reason} when is_binary(Reason) -> - #async_call_reply{code = 0, message = Reason} - end, - efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)), + #deploy{task_id = TaskId, config = Config0} = message_pb:decode_msg(DeployBin, deploy), + case catch jiffy:decode(Config0, [return_maps]) of + Config when is_map(Config) -> + %% 短暂的等待,efka_inetd收到消息后就立即返回了 + Reply = case efka_inetd:deploy(TaskId, Config) of + ok -> + #async_call_reply{code = 1, result = <<"ok">>}; + {error, Reason} when is_binary(Reason) -> + #async_call_reply{code = 0, message = Reason} + end, + efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)); + _Error -> + ErrorReply = #async_call_reply{code = 0, message = <<"invalid config json">>}, + efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(ErrorReply)) + end, {keep_state, State}; %% 启动微服务 -handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 Reply = case efka_inetd:start_container(ServiceId) of ok -> @@ -263,7 +268,7 @@ handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {server_async_call, PacketId, <>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> %% 短暂的等待,efka_inetd收到消息后就立即返回了 Reply = case efka_inetd:stop_container(ServiceId) of ok -> diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index 01b2a65..6413735 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [ + #{ + id => 'efka_docker_events', + start => {'efka_docker_events', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['efka_docker_events'] + }, + #{ id => 'efka_model_sup', start => {'efka_model_sup', start_link, []}, diff --git a/apps/efka/src/proto/message_pb.erl b/apps/efka/src/proto/message_pb.erl index ec94374..11453fb 100644 --- a/apps/efka/src/proto/message_pb.erl +++ b/apps/efka/src/proto/message_pb.erl @@ -266,7 +266,7 @@ encode_msg_async_call_reply(#async_call_reply{code = F1, result = F2, message = encode_msg_deploy(Msg, TrUserData) -> encode_msg_deploy(Msg, <<>>, TrUserData). -encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrUserData) -> +encode_msg_deploy(#deploy{task_id = F1, config = F2}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -276,23 +276,13 @@ encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrU end end end, - B2 = if F2 == undefined -> B1; - true -> - begin - TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) - end - end - end, - if F3 == undefined -> B2; + if F2 == undefined -> B1; true -> begin - TrF3 = id(F3, TrUserData), - case is_empty_string(TrF3) of - true -> B2; - false -> e_type_string(TrF3, <>, TrUserData) + TrF2 = id(F2, TrUserData), + case is_empty_string(TrF2) of + true -> B1; + false -> e_type_string(TrF2, <>, TrUserData) end end end. @@ -1061,63 +1051,56 @@ skip_32_async_call_reply(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrU skip_64_async_call_reply(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_async_call_reply(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -decode_msg_deploy(Bin, TrUserData) -> dfp_read_field_def_deploy(Bin, 0, 0, 0, id(0, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). +decode_msg_deploy(Bin, TrUserData) -> dfp_read_field_def_deploy(Bin, 0, 0, 0, id(0, TrUserData), id(<<>>, TrUserData), TrUserData). -dfp_read_field_def_deploy(<<8, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_deploy_task_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -dfp_read_field_def_deploy(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_deploy_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -dfp_read_field_def_deploy(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_deploy_tar_url(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -dfp_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #deploy{task_id = F@_1, service_id = F@_2, tar_url = F@_3}; -dfp_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dg_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). +dfp_read_field_def_deploy(<<8, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_deploy_task_id(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_deploy(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_deploy_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +dfp_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, _) -> #deploy{task_id = F@_1, config = F@_2}; +dfp_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, TrUserData). -dg_read_field_def_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 32 - 7 -> dg_read_field_def_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -dg_read_field_def_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, TrUserData) -> +dg_read_field_def_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +dg_read_field_def_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) -> Key = X bsl N + Acc, case Key of - 8 -> d_field_deploy_task_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); - 18 -> d_field_deploy_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); - 26 -> d_field_deploy_tar_url(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); + 8 -> d_field_deploy_task_id(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 18 -> d_field_deploy_config(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); _ -> case Key band 7 of - 0 -> skip_varint_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 1 -> skip_64_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 2 -> skip_length_delimited_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 3 -> skip_group_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); - 5 -> skip_32_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData) + 0 -> skip_varint_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 1 -> skip_64_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 2 -> skip_length_delimited_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 3 -> skip_group_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); + 5 -> skip_32_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData) end end; -dg_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #deploy{task_id = F@_1, service_id = F@_2, tar_url = F@_3}. +dg_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, _) -> #deploy{task_id = F@_1, config = F@_2}. -d_field_deploy_task_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_deploy_task_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -d_field_deploy_task_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, TrUserData) -> +d_field_deploy_task_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_deploy_task_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_deploy_task_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) -> {NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest}, - dfp_read_field_def_deploy(RestF, 0, 0, F, NewFValue, F@_2, F@_3, TrUserData). + dfp_read_field_def_deploy(RestF, 0, 0, F, NewFValue, F@_2, TrUserData). -d_field_deploy_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_deploy_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -d_field_deploy_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, TrUserData) -> +d_field_deploy_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_deploy_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +d_field_deploy_config(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_deploy(RestF, 0, 0, F, F@_1, NewFValue, F@_3, TrUserData). + dfp_read_field_def_deploy(RestF, 0, 0, F, F@_1, NewFValue, TrUserData). -d_field_deploy_tar_url(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_deploy_tar_url(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -d_field_deploy_tar_url(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, TrUserData) -> - {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_deploy(RestF, 0, 0, F, F@_1, F@_2, NewFValue, TrUserData). +skip_varint_deploy(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); +skip_varint_deploy(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -skip_varint_deploy(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> skip_varint_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); -skip_varint_deploy(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). - -skip_length_delimited_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> skip_length_delimited_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); -skip_length_delimited_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) -> +skip_length_delimited_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); +skip_length_delimited_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) -> Length = X bsl N + Acc, <<_:Length/binary, Rest2/binary>> = Rest, - dfp_read_field_def_deploy(Rest2, 0, 0, F, F@_1, F@_2, F@_3, TrUserData). + dfp_read_field_def_deploy(Rest2, 0, 0, F, F@_1, F@_2, TrUserData). -skip_group_deploy(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, TrUserData) -> +skip_group_deploy(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) -> {_, Rest} = read_group(Bin, FNum), - dfp_read_field_def_deploy(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, TrUserData). + dfp_read_field_def_deploy(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData). -skip_32_deploy(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). +skip_32_deploy(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). -skip_64_deploy(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). +skip_64_deploy(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). decode_msg_fetch_task_log(Bin, TrUserData) -> dfp_read_field_def_fetch_task_log(Bin, 0, 0, 0, id(0, TrUserData), TrUserData). @@ -1880,18 +1863,14 @@ merge_msg_async_call_reply(#async_call_reply{code = PFcode, result = PFresult, m end}. -compile({nowarn_unused_function,merge_msg_deploy/3}). -merge_msg_deploy(#deploy{task_id = PFtask_id, service_id = PFservice_id, tar_url = PFtar_url}, #deploy{task_id = NFtask_id, service_id = NFservice_id, tar_url = NFtar_url}, _) -> +merge_msg_deploy(#deploy{task_id = PFtask_id, config = PFconfig}, #deploy{task_id = NFtask_id, config = NFconfig}, _) -> #deploy{task_id = if NFtask_id =:= undefined -> PFtask_id; true -> NFtask_id end, - service_id = - if NFservice_id =:= undefined -> PFservice_id; - true -> NFservice_id - end, - tar_url = - if NFtar_url =:= undefined -> PFtar_url; - true -> NFtar_url + config = + if NFconfig =:= undefined -> PFconfig; + true -> NFconfig end}. -compile({nowarn_unused_function,merge_msg_fetch_task_log/3}). @@ -2147,15 +2126,12 @@ v_msg_async_call_reply(X, Path, _TrUserData) -> mk_type_error({expected_msg, asy -compile({nowarn_unused_function,v_msg_deploy/3}). -dialyzer({nowarn_function,v_msg_deploy/3}). -v_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Path, TrUserData) -> +v_msg_deploy(#deploy{task_id = F1, config = F2}, Path, TrUserData) -> if F1 == undefined -> ok; true -> v_type_uint32(F1, [task_id | Path], TrUserData) end, if F2 == undefined -> ok; - true -> v_type_string(F2, [service_id | Path], TrUserData) - end, - if F3 == undefined -> ok; - true -> v_type_string(F3, [tar_url | Path], TrUserData) + true -> v_type_string(F2, [config | Path], TrUserData) end, ok; v_msg_deploy(X, Path, _TrUserData) -> mk_type_error({expected_msg, deploy}, X, Path). @@ -2401,10 +2377,7 @@ get_msg_defs() -> [#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = result, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = message, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]}, - {{msg, deploy}, - [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, - #field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, - #field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]}, + {{msg, deploy}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, {{msg, fetch_task_log}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]}, {{msg, invoke}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, @@ -2480,10 +2453,7 @@ find_msg_def(async_call_reply) -> [#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = result, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = message, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]; -find_msg_def(deploy) -> - [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, - #field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, - #field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]; +find_msg_def(deploy) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; find_msg_def(fetch_task_log) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]; find_msg_def(invoke) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, diff --git a/message_pb.proto b/message_pb.proto index 836cd58..dee8679 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -34,8 +34,8 @@ message AsyncCallReply { // 部署逻辑 message Deploy { uint32 task_id = 1; - string service_id = 2; - string tar_url = 3; + // json + string config = 2; } // 获取task的logs