add docker events
This commit is contained in:
parent
6b16612214
commit
0b0043d98d
@ -46,8 +46,8 @@
|
||||
%%%% 主动推送的消息类型子分类, 需要返回值
|
||||
|
||||
-define(PUSH_DEPLOY, 16#01).
|
||||
-define(PUSH_START_SERVICE, 16#02).
|
||||
-define(PUSH_STOP_SERVICE, 16#03).
|
||||
-define(PUSH_START_CONTAINER, 16#02).
|
||||
-define(PUSH_STOP_CONTAINER, 16#03).
|
||||
|
||||
-define(PUSH_SERVICE_CONFIG, 16#04).
|
||||
-define(PUSH_INVOKE, 16#05).
|
||||
|
||||
@ -48,8 +48,7 @@
|
||||
-define('DEPLOY_PB_H', true).
|
||||
-record(deploy,
|
||||
{task_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
|
||||
service_id = <<>> :: unicode:chardata() | undefined, % = 2, optional
|
||||
tar_url = <<>> :: unicode:chardata() | undefined % = 3, optional
|
||||
config = <<>> :: unicode:chardata() | undefined % = 2, optional
|
||||
}).
|
||||
-endif.
|
||||
|
||||
|
||||
150
apps/efka/src/docker/efka_docker_events.erl
Normal file
150
apps/efka/src/docker/efka_docker_events.erl
Normal file
@ -0,0 +1,150 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2025, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 9月 2025 16:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(efka_docker_events).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([monitor_container/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {
|
||||
port,
|
||||
%% 观察者
|
||||
monitors = #{}
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
monitor_container(ReceiverPid, ContainerId) when is_pid(ReceiverPid), is_binary(ContainerId) ->
|
||||
gen_server:cast(?SERVER, {monitor_container, ReceiverPid, ContainerId}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
process_flag(trap_exit, true),
|
||||
try_attach_events(0),
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({monitor_container, ReceiverPid, ContainerId}, State = #state{monitors = Monitors}) ->
|
||||
MRef = erlang:monitor(process, ReceiverPid),
|
||||
{noreply, State#state{monitors = maps:put(ContainerId, {ReceiverPid, MRef}, Monitors)}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, attach_docker_events}, State = #state{port = undefined}) ->
|
||||
ExecCmd = "docker events --format \"{{json .}}\"",
|
||||
case catch erlang:open_port({spawn, ExecCmd}, [exit_status, {line, 10239}, use_stdio, stderr_to_stdout, binary]) of
|
||||
Port when is_port(Port) ->
|
||||
{noreply, State#state{port = Port}};
|
||||
_Error ->
|
||||
try_attach_events(5000),
|
||||
{noreply, State}
|
||||
end;
|
||||
handle_info({Port, {data, BinLine}}, State = #state{port = Port}) ->
|
||||
Event = catch jiffy:decode(BinLine, [return_maps]),
|
||||
lager:debug("event: ~p", [Event]),
|
||||
handle_event(Event, State),
|
||||
{noreply, State};
|
||||
|
||||
%% 进程退出的时候删除掉管理的Pid
|
||||
handle_info({'DOWN', MRef, process, _Pid, _Reason}, State = #state{monitors = Monitors}) ->
|
||||
NMonitors = maps:filter(fun(_Key, {_, Ref}) -> MRef =/= Ref end, Monitors),
|
||||
{noreply, State#state{monitors = NMonitors}};
|
||||
|
||||
%% Port退出的时候,尝试重启
|
||||
handle_info({'EXIT', Port, Reason}, State = #state{port = Port}) ->
|
||||
lager:warning("[efka_docker_events] exit with reason: ~p", [Reason]),
|
||||
try_attach_events(5000),
|
||||
{noreply, State#state{port = undefined}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
handle_event(#{type := <<"container">>, status := Status, id := Id}, #state{monitors = Monitors}) ->
|
||||
case maps:find(Id, Monitors) of
|
||||
error ->
|
||||
ok;
|
||||
{ok, {ReceiverPid, _}} ->
|
||||
case Status of
|
||||
<<"start">> ->
|
||||
ReceiverPid ! {docker_events, start};
|
||||
<<"stop">> ->
|
||||
ReceiverPid ! {docker_events, stop};
|
||||
_ ->
|
||||
ok
|
||||
end
|
||||
end;
|
||||
handle_event(_, _) ->
|
||||
ok.
|
||||
|
||||
try_attach_events(Timeout) ->
|
||||
erlang:start_timer(Timeout, self(), attach_docker_events).
|
||||
@ -236,21 +236,26 @@ handle_event(info, flush_cache, _, State) ->
|
||||
|
||||
%% 微服务部署
|
||||
handle_event(info, {server_async_call, PacketId, <<?PUSH_DEPLOY:8, DeployBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||
#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl} = message_pb:decode_msg(DeployBin, deploy),
|
||||
#deploy{task_id = TaskId, config = Config0} = message_pb:decode_msg(DeployBin, deploy),
|
||||
|
||||
case catch jiffy:decode(Config0, [return_maps]) of
|
||||
Config when is_map(Config) ->
|
||||
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
||||
Reply = case efka_inetd:deploy(TaskId, ServiceId, TarUrl) of
|
||||
Reply = case efka_inetd:deploy(TaskId, Config) of
|
||||
ok ->
|
||||
#async_call_reply{code = 1, result = <<"ok">>};
|
||||
{error, Reason} when is_binary(Reason) ->
|
||||
#async_call_reply{code = 0, message = Reason}
|
||||
end,
|
||||
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply)),
|
||||
|
||||
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(Reply));
|
||||
_Error ->
|
||||
ErrorReply = #async_call_reply{code = 0, message = <<"invalid config json">>},
|
||||
efka_transport:async_call_reply(TransportPid, PacketId, message_pb:encode_msg(ErrorReply))
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
%% 启动微服务
|
||||
handle_event(info, {server_async_call, PacketId, <<?PUSH_START_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||
handle_event(info, {server_async_call, PacketId, <<?PUSH_START_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
||||
Reply = case efka_inetd:start_container(ServiceId) of
|
||||
ok ->
|
||||
@ -263,7 +268,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_START_SERVICE:8, Servic
|
||||
{keep_state, State};
|
||||
|
||||
%% 停止微服务
|
||||
handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_SERVICE:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||
handle_event(info, {server_async_call, PacketId, <<?PUSH_STOP_CONTAINER:8, ServiceId/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||
%% 短暂的等待,efka_inetd收到消息后就立即返回了
|
||||
Reply = case efka_inetd:stop_container(ServiceId) of
|
||||
ok ->
|
||||
|
||||
@ -28,6 +28,15 @@ start_link() ->
|
||||
init([]) ->
|
||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||
ChildSpecs = [
|
||||
#{
|
||||
id => 'efka_docker_events',
|
||||
start => {'efka_docker_events', start_link, []},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['efka_docker_events']
|
||||
},
|
||||
|
||||
#{
|
||||
id => 'efka_model_sup',
|
||||
start => {'efka_model_sup', start_link, []},
|
||||
|
||||
@ -266,7 +266,7 @@ encode_msg_async_call_reply(#async_call_reply{code = F1, result = F2, message =
|
||||
encode_msg_deploy(Msg, TrUserData) -> encode_msg_deploy(Msg, <<>>, TrUserData).
|
||||
|
||||
|
||||
encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrUserData) ->
|
||||
encode_msg_deploy(#deploy{task_id = F1, config = F2}, Bin, TrUserData) ->
|
||||
B1 = if F1 == undefined -> Bin;
|
||||
true ->
|
||||
begin
|
||||
@ -276,7 +276,7 @@ encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrU
|
||||
end
|
||||
end
|
||||
end,
|
||||
B2 = if F2 == undefined -> B1;
|
||||
if F2 == undefined -> B1;
|
||||
true ->
|
||||
begin
|
||||
TrF2 = id(F2, TrUserData),
|
||||
@ -285,16 +285,6 @@ encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrU
|
||||
false -> e_type_string(TrF2, <<B1/binary, 18>>, TrUserData)
|
||||
end
|
||||
end
|
||||
end,
|
||||
if F3 == undefined -> B2;
|
||||
true ->
|
||||
begin
|
||||
TrF3 = id(F3, TrUserData),
|
||||
case is_empty_string(TrF3) of
|
||||
true -> B2;
|
||||
false -> e_type_string(TrF3, <<B2/binary, 26>>, TrUserData)
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
encode_msg_fetch_task_log(Msg, TrUserData) -> encode_msg_fetch_task_log(Msg, <<>>, TrUserData).
|
||||
@ -1061,63 +1051,56 @@ skip_32_async_call_reply(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrU
|
||||
|
||||
skip_64_async_call_reply(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_async_call_reply(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
|
||||
|
||||
decode_msg_deploy(Bin, TrUserData) -> dfp_read_field_def_deploy(Bin, 0, 0, 0, id(0, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData).
|
||||
decode_msg_deploy(Bin, TrUserData) -> dfp_read_field_def_deploy(Bin, 0, 0, 0, id(0, TrUserData), id(<<>>, TrUserData), TrUserData).
|
||||
|
||||
dfp_read_field_def_deploy(<<8, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_deploy_task_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
dfp_read_field_def_deploy(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_deploy_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
dfp_read_field_def_deploy(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_deploy_tar_url(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
dfp_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #deploy{task_id = F@_1, service_id = F@_2, tar_url = F@_3};
|
||||
dfp_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dg_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
|
||||
dfp_read_field_def_deploy(<<8, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_deploy_task_id(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData);
|
||||
dfp_read_field_def_deploy(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_deploy_config(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData);
|
||||
dfp_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, _) -> #deploy{task_id = F@_1, config = F@_2};
|
||||
dfp_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_deploy(Other, Z1, Z2, F, F@_1, F@_2, TrUserData).
|
||||
|
||||
dg_read_field_def_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 32 - 7 -> dg_read_field_def_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
dg_read_field_def_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, TrUserData) ->
|
||||
dg_read_field_def_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
|
||||
dg_read_field_def_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) ->
|
||||
Key = X bsl N + Acc,
|
||||
case Key of
|
||||
8 -> d_field_deploy_task_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData);
|
||||
18 -> d_field_deploy_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData);
|
||||
26 -> d_field_deploy_tar_url(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData);
|
||||
8 -> d_field_deploy_task_id(Rest, 0, 0, 0, F@_1, F@_2, TrUserData);
|
||||
18 -> d_field_deploy_config(Rest, 0, 0, 0, F@_1, F@_2, TrUserData);
|
||||
_ ->
|
||||
case Key band 7 of
|
||||
0 -> skip_varint_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
|
||||
1 -> skip_64_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
|
||||
2 -> skip_length_delimited_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
|
||||
3 -> skip_group_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
|
||||
5 -> skip_32_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData)
|
||||
0 -> skip_varint_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
|
||||
1 -> skip_64_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
|
||||
2 -> skip_length_delimited_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
|
||||
3 -> skip_group_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
|
||||
5 -> skip_32_deploy(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData)
|
||||
end
|
||||
end;
|
||||
dg_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #deploy{task_id = F@_1, service_id = F@_2, tar_url = F@_3}.
|
||||
dg_read_field_def_deploy(<<>>, 0, 0, _, F@_1, F@_2, _) -> #deploy{task_id = F@_1, config = F@_2}.
|
||||
|
||||
d_field_deploy_task_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_deploy_task_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
d_field_deploy_task_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, TrUserData) ->
|
||||
d_field_deploy_task_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_deploy_task_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
|
||||
d_field_deploy_task_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) ->
|
||||
{NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest},
|
||||
dfp_read_field_def_deploy(RestF, 0, 0, F, NewFValue, F@_2, F@_3, TrUserData).
|
||||
dfp_read_field_def_deploy(RestF, 0, 0, F, NewFValue, F@_2, TrUserData).
|
||||
|
||||
d_field_deploy_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_deploy_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
d_field_deploy_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, TrUserData) ->
|
||||
d_field_deploy_config(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_deploy_config(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
|
||||
d_field_deploy_config(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) ->
|
||||
{NewFValue, RestF} = begin Len = X bsl N + Acc, <<Bytes:Len/binary, Rest2/binary>> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end,
|
||||
dfp_read_field_def_deploy(RestF, 0, 0, F, F@_1, NewFValue, F@_3, TrUserData).
|
||||
dfp_read_field_def_deploy(RestF, 0, 0, F, F@_1, NewFValue, TrUserData).
|
||||
|
||||
d_field_deploy_tar_url(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_deploy_tar_url(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
d_field_deploy_tar_url(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, TrUserData) ->
|
||||
{NewFValue, RestF} = begin Len = X bsl N + Acc, <<Bytes:Len/binary, Rest2/binary>> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end,
|
||||
dfp_read_field_def_deploy(RestF, 0, 0, F, F@_1, F@_2, NewFValue, TrUserData).
|
||||
skip_varint_deploy(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData);
|
||||
skip_varint_deploy(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData).
|
||||
|
||||
skip_varint_deploy(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> skip_varint_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
skip_varint_deploy(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
|
||||
|
||||
skip_length_delimited_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> skip_length_delimited_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
|
||||
skip_length_delimited_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) ->
|
||||
skip_length_delimited_deploy(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_deploy(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
|
||||
skip_length_delimited_deploy(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) ->
|
||||
Length = X bsl N + Acc,
|
||||
<<_:Length/binary, Rest2/binary>> = Rest,
|
||||
dfp_read_field_def_deploy(Rest2, 0, 0, F, F@_1, F@_2, F@_3, TrUserData).
|
||||
dfp_read_field_def_deploy(Rest2, 0, 0, F, F@_1, F@_2, TrUserData).
|
||||
|
||||
skip_group_deploy(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, TrUserData) ->
|
||||
skip_group_deploy(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) ->
|
||||
{_, Rest} = read_group(Bin, FNum),
|
||||
dfp_read_field_def_deploy(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, TrUserData).
|
||||
dfp_read_field_def_deploy(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData).
|
||||
|
||||
skip_32_deploy(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
|
||||
skip_32_deploy(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData).
|
||||
|
||||
skip_64_deploy(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
|
||||
skip_64_deploy(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData).
|
||||
|
||||
decode_msg_fetch_task_log(Bin, TrUserData) -> dfp_read_field_def_fetch_task_log(Bin, 0, 0, 0, id(0, TrUserData), TrUserData).
|
||||
|
||||
@ -1880,18 +1863,14 @@ merge_msg_async_call_reply(#async_call_reply{code = PFcode, result = PFresult, m
|
||||
end}.
|
||||
|
||||
-compile({nowarn_unused_function,merge_msg_deploy/3}).
|
||||
merge_msg_deploy(#deploy{task_id = PFtask_id, service_id = PFservice_id, tar_url = PFtar_url}, #deploy{task_id = NFtask_id, service_id = NFservice_id, tar_url = NFtar_url}, _) ->
|
||||
merge_msg_deploy(#deploy{task_id = PFtask_id, config = PFconfig}, #deploy{task_id = NFtask_id, config = NFconfig}, _) ->
|
||||
#deploy{task_id =
|
||||
if NFtask_id =:= undefined -> PFtask_id;
|
||||
true -> NFtask_id
|
||||
end,
|
||||
service_id =
|
||||
if NFservice_id =:= undefined -> PFservice_id;
|
||||
true -> NFservice_id
|
||||
end,
|
||||
tar_url =
|
||||
if NFtar_url =:= undefined -> PFtar_url;
|
||||
true -> NFtar_url
|
||||
config =
|
||||
if NFconfig =:= undefined -> PFconfig;
|
||||
true -> NFconfig
|
||||
end}.
|
||||
|
||||
-compile({nowarn_unused_function,merge_msg_fetch_task_log/3}).
|
||||
@ -2147,15 +2126,12 @@ v_msg_async_call_reply(X, Path, _TrUserData) -> mk_type_error({expected_msg, asy
|
||||
|
||||
-compile({nowarn_unused_function,v_msg_deploy/3}).
|
||||
-dialyzer({nowarn_function,v_msg_deploy/3}).
|
||||
v_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Path, TrUserData) ->
|
||||
v_msg_deploy(#deploy{task_id = F1, config = F2}, Path, TrUserData) ->
|
||||
if F1 == undefined -> ok;
|
||||
true -> v_type_uint32(F1, [task_id | Path], TrUserData)
|
||||
end,
|
||||
if F2 == undefined -> ok;
|
||||
true -> v_type_string(F2, [service_id | Path], TrUserData)
|
||||
end,
|
||||
if F3 == undefined -> ok;
|
||||
true -> v_type_string(F3, [tar_url | Path], TrUserData)
|
||||
true -> v_type_string(F2, [config | Path], TrUserData)
|
||||
end,
|
||||
ok;
|
||||
v_msg_deploy(X, Path, _TrUserData) -> mk_type_error({expected_msg, deploy}, X, Path).
|
||||
@ -2401,10 +2377,7 @@ get_msg_defs() ->
|
||||
[#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []},
|
||||
#field{name = result, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
|
||||
#field{name = message, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]},
|
||||
{{msg, deploy},
|
||||
[#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []},
|
||||
#field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
|
||||
#field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]},
|
||||
{{msg, deploy}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]},
|
||||
{{msg, fetch_task_log}, [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}]},
|
||||
{{msg, invoke},
|
||||
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
|
||||
@ -2480,10 +2453,7 @@ find_msg_def(async_call_reply) ->
|
||||
[#field{name = code, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []},
|
||||
#field{name = result, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
|
||||
#field{name = message, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}];
|
||||
find_msg_def(deploy) ->
|
||||
[#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []},
|
||||
#field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
|
||||
#field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}];
|
||||
find_msg_def(deploy) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = config, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}];
|
||||
find_msg_def(fetch_task_log) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}];
|
||||
find_msg_def(invoke) ->
|
||||
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
|
||||
|
||||
@ -34,8 +34,8 @@ message AsyncCallReply {
|
||||
// 部署逻辑
|
||||
message Deploy {
|
||||
uint32 task_id = 1;
|
||||
string service_id = 2;
|
||||
string tar_url = 3;
|
||||
// json
|
||||
string config = 2;
|
||||
}
|
||||
|
||||
// 获取task的logs
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user