diff --git a/apps/efka/include/message_pb.hrl b/apps/efka/include/message_pb.hrl index e0cdfaf..732b47b 100644 --- a/apps/efka/include/message_pb.hrl +++ b/apps/efka/include/message_pb.hrl @@ -57,7 +57,8 @@ -define('INVOKE_PB_H', true). -record(invoke, {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional - payload = <<>> :: unicode:chardata() | undefined % = 2, optional + payload = <<>> :: unicode:chardata() | undefined, % = 2, optional + timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits }). -endif. diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index 2ed6a02..45201b1 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -120,13 +120,14 @@ init([ServiceId, Host, Port]) -> {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), ok = gen_tcp:controlling_process(Socket, self()), - Packet = <<1:32, ?PACKET_REGISTER:8, ServiceId/binary>>, + PacketId = 1, + Packet = <>, ok = gen_tcp:send(Socket, Packet), lager:debug("[efka_client] will send packet: ~p", [Packet]), receive - {tcp, Socket, <<1:32, ?PACKET_RESPONSE, 1:8>>} -> - {ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}}; - {tcp, Socket, <<1:32, ?PACKET_RESPONSE, 0:8, Error/binary>>} -> + {tcp, Socket, <>} -> + {ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}}; + {tcp, Socket, <>} -> {stop, Error} after ?EFKA_REQUEST_TIMEOUT -> @@ -149,7 +150,7 @@ handle_call({controller_process, ControllerPid}, _From, State) -> %% done handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), Ref = make_ref(), @@ -168,14 +169,14 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = # Body = efka_point:normalized(Point), Len = byte_size(DeviceUUID), - Packet = <<0:32, ?PACKET_METRIC_DATA, Len:8, DeviceUUID/binary, Body/binary>>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; %% done handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> - Packet = <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; @@ -191,7 +192,7 @@ handle_cast(_Info, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 收到请求的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> case maps:take(PacketId, Inflight) of error -> {noreply, State}; @@ -206,7 +207,7 @@ handle_info({tcp, Socket, <>}, end; %% 收到efka推送的参数设置 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of true -> Ref = make_ref(), @@ -222,7 +223,7 @@ handle_info({tcp, Socket, < <<0:8, "处理进程异常"/utf8>> end, - Packet = <>, + Packet = <>, ok = gen_tcp:send(Socket, Packet), {noreply, State}; diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 5a38ef5..109ea90 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -265,7 +265,9 @@ handle_info({server_push, PacketId, <> {noreply, State}; %% 收到需要回复的指令 -handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId}}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) -> +handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}}, + State = #state{status = ?STATE_ACTIVATED, inflight = Inflight}) -> + efka_logger:debug("[efka_agent] get invoke: ~p", [Invoke]), %% 消息发送到订阅系统 case efka_service:get_pid(ServiceId) of @@ -273,7 +275,12 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId}}, S Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>}, safe_response(PacketId, message_pb:encode_msg(Reply), State); ServicePid when is_pid(ServicePid) -> - ok + Ref = make_ref(), + efka_service:invoke(ServicePid, Ref, Payload), + %% 处理超时逻辑 + erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}), + + {noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}} end, {noreply, State}; diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 03fbdfc..5a12472 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -70,6 +70,9 @@ get_pid(ServiceId) when is_binary(ServiceId) -> push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) -> gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}). +invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) -> + gen_server:cast(Pid, {invoke, Ref, self(), Payload}). + request_config(Pid) when is_pid(Pid) -> gen_server:call(Pid, request_config). @@ -228,6 +231,17 @@ handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_ {reply, State} end; +%% 推送配置项目 +handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) -> + case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of + true -> + efka_tcp_channel:invoke(ChannelPid, Ref, self(), Payload), + {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; + false -> + ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}}, + {reply, State} + end; + handle_cast(_Request, State = #state{}) -> {noreply, State}. diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index a71c9d8..2dfec37 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -13,7 +13,7 @@ %% API -export([start_link/1]). --export([push_config/4]). +-export([push_config/4, invoke/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -32,7 +32,8 @@ %% efka下发给微服务配置 -define(PACKET_PUSH_CONFIG, 5). -%% 微服务从efka获取自身的采集项 +-define(PACKET_INVOKE, 6). + -define(PACKET_REQUEST_CONFIG, 10). %% 微服务事件上报 @@ -57,6 +58,10 @@ push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> gen_server:cast(ChannelPid, {push_config, Ref, ReceiverPid, ConfigJson}). +-spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). +invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> + gen_server:cast(ChannelPid, {invoke, Ref, ReceiverPid, Payload}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Socket :: gen_tcp:socket()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -96,10 +101,14 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -%% 推送参数项目 -handle_cast({push_params, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - ok = gen_tcp:send(Socket, <>), - {noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; +%% 推送配置项目 +handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + +handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + ok = gen_tcp:send(Socket, <>), + {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; handle_cast(_Request, State = #state{}) -> {noreply, State}. @@ -111,39 +120,42 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 注册 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> case efka_service:get_pid(ServiceId) of undefined -> - ok = gen_tcp:send(Socket, <>); + lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), + ok = gen_tcp:send(Socket, <>), + {stop, normal, State}; Pid when is_pid(Pid) -> case efka_service:attach_channel(Pid, self()) of ok -> - ok = gen_tcp:send(Socket, <>), + ok = gen_tcp:send(Socket, <>), {noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}}; - {error, Reason} -> - ok = gen_tcp:send(Socket, <>), + {error, Error} -> + lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), + ok = gen_tcp:send(Socket, <>), {stop, normal, State} end end; %% 请求参数 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> {ok, ConfigJson} = efka_service:request_config(ServicePid), - ok = gen_tcp:send(Socket, <>), + ok = gen_tcp:send(Socket, <>), {noreply, State}; %% 数据项 -handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> efka_service:metric_data(ServicePid, DeviceUUID, Data), {noreply, State}; %% Event事件 -handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) -> efka_service:send_event(ServicePid, EventType, Params), {noreply, State}; %% 收到端上的响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> case maps:take(PacketId, Inflight) of error -> lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Response, PacketId]), @@ -188,3 +200,10 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +%% 采用32位编码 +-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). +next_packet_id(PacketId) when PacketId >= 4294967295 -> + 1; +next_packet_id(PacketId) -> + PacketId + 1. \ No newline at end of file diff --git a/apps/efka/src/proto/message_pb.erl b/apps/efka/src/proto/message_pb.erl index 4eead0e..060dcc1 100644 --- a/apps/efka/src/proto/message_pb.erl +++ b/apps/efka/src/proto/message_pb.erl @@ -297,7 +297,7 @@ encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrU encode_msg_invoke(Msg, TrUserData) -> encode_msg_invoke(Msg, <<>>, TrUserData). -encode_msg_invoke(#invoke{service_id = F1, payload = F2}, Bin, TrUserData) -> +encode_msg_invoke(#invoke{service_id = F1, payload = F2, timeout = F3}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -308,13 +308,22 @@ encode_msg_invoke(#invoke{service_id = F1, payload = F2}, Bin, TrUserData) -> end end end, - if F2 == undefined -> B1; + B2 = if F2 == undefined -> B1; + true -> + begin + TrF2 = id(F2, TrUserData), + case is_empty_string(TrF2) of + true -> B1; + false -> e_type_string(TrF2, <>, TrUserData) + end + end + end, + if F3 == undefined -> B2; true -> begin - TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) + TrF3 = id(F3, TrUserData), + if TrF3 =:= 0 -> B2; + true -> e_varint(TrF3, <>, TrUserData) end end end. @@ -1091,56 +1100,63 @@ skip_32_deploy(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) - skip_64_deploy(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -decode_msg_invoke(Bin, TrUserData) -> dfp_read_field_def_invoke(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). +decode_msg_invoke(Bin, TrUserData) -> dfp_read_field_def_invoke(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(0, TrUserData), TrUserData). -dfp_read_field_def_invoke(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_invoke_service_id(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -dfp_read_field_def_invoke(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_invoke_payload(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -dfp_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, _) -> #invoke{service_id = F@_1, payload = F@_2}; -dfp_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, TrUserData). +dfp_read_field_def_invoke(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_invoke_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +dfp_read_field_def_invoke(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_invoke_payload(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +dfp_read_field_def_invoke(<<24, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_invoke_timeout(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +dfp_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #invoke{service_id = F@_1, payload = F@_2, timeout = F@_3}; +dfp_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dg_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -dg_read_field_def_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -dg_read_field_def_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) -> +dg_read_field_def_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 32 - 7 -> dg_read_field_def_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +dg_read_field_def_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, TrUserData) -> Key = X bsl N + Acc, case Key of - 10 -> d_field_invoke_service_id(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); - 18 -> d_field_invoke_payload(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 10 -> d_field_invoke_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); + 18 -> d_field_invoke_payload(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); + 24 -> d_field_invoke_timeout(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); _ -> case Key band 7 of - 0 -> skip_varint_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 1 -> skip_64_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 2 -> skip_length_delimited_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 3 -> skip_group_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 5 -> skip_32_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData) + 0 -> skip_varint_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 1 -> skip_64_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 2 -> skip_length_delimited_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 3 -> skip_group_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 5 -> skip_32_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData) end end; -dg_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, _) -> #invoke{service_id = F@_1, payload = F@_2}. +dg_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #invoke{service_id = F@_1, payload = F@_2, timeout = F@_3}. -d_field_invoke_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_invoke_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -d_field_invoke_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) -> +d_field_invoke_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_invoke_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +d_field_invoke_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_invoke(RestF, 0, 0, F, NewFValue, F@_2, TrUserData). + dfp_read_field_def_invoke(RestF, 0, 0, F, NewFValue, F@_2, F@_3, TrUserData). -d_field_invoke_payload(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_invoke_payload(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -d_field_invoke_payload(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) -> +d_field_invoke_payload(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_invoke_payload(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +d_field_invoke_payload(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_invoke(RestF, 0, 0, F, F@_1, NewFValue, TrUserData). + dfp_read_field_def_invoke(RestF, 0, 0, F, F@_1, NewFValue, F@_3, TrUserData). -skip_varint_invoke(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -skip_varint_invoke(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). +d_field_invoke_timeout(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_invoke_timeout(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +d_field_invoke_timeout(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, TrUserData) -> + {NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest}, + dfp_read_field_def_invoke(RestF, 0, 0, F, F@_1, F@_2, NewFValue, TrUserData). -skip_length_delimited_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -skip_length_delimited_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) -> +skip_varint_invoke(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> skip_varint_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +skip_varint_invoke(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). + +skip_length_delimited_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> skip_length_delimited_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +skip_length_delimited_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) -> Length = X bsl N + Acc, <<_:Length/binary, Rest2/binary>> = Rest, - dfp_read_field_def_invoke(Rest2, 0, 0, F, F@_1, F@_2, TrUserData). + dfp_read_field_def_invoke(Rest2, 0, 0, F, F@_1, F@_2, F@_3, TrUserData). -skip_group_invoke(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) -> +skip_group_invoke(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, TrUserData) -> {_, Rest} = read_group(Bin, FNum), - dfp_read_field_def_invoke(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData). + dfp_read_field_def_invoke(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, TrUserData). -skip_32_invoke(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). +skip_32_invoke(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -skip_64_invoke(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). +skip_64_invoke(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). decode_msg_service_config(Bin, TrUserData) -> dfp_read_field_def_service_config(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(0, TrUserData), TrUserData). @@ -1815,7 +1831,7 @@ merge_msg_deploy(#deploy{task_id = PFtask_id, service_id = PFservice_id, tar_url end}. -compile({nowarn_unused_function,merge_msg_invoke/3}). -merge_msg_invoke(#invoke{service_id = PFservice_id, payload = PFpayload}, #invoke{service_id = NFservice_id, payload = NFpayload}, _) -> +merge_msg_invoke(#invoke{service_id = PFservice_id, payload = PFpayload, timeout = PFtimeout}, #invoke{service_id = NFservice_id, payload = NFpayload, timeout = NFtimeout}, _) -> #invoke{service_id = if NFservice_id =:= undefined -> PFservice_id; true -> NFservice_id @@ -1823,6 +1839,10 @@ merge_msg_invoke(#invoke{service_id = PFservice_id, payload = PFpayload}, #invok payload = if NFpayload =:= undefined -> PFpayload; true -> NFpayload + end, + timeout = + if NFtimeout =:= undefined -> PFtimeout; + true -> NFtimeout end}. -compile({nowarn_unused_function,merge_msg_service_config/3}). @@ -2070,13 +2090,16 @@ v_msg_deploy(X, Path, _TrUserData) -> mk_type_error({expected_msg, deploy}, X, P -compile({nowarn_unused_function,v_msg_invoke/3}). -dialyzer({nowarn_function,v_msg_invoke/3}). -v_msg_invoke(#invoke{service_id = F1, payload = F2}, Path, TrUserData) -> +v_msg_invoke(#invoke{service_id = F1, payload = F2, timeout = F3}, Path, TrUserData) -> if F1 == undefined -> ok; true -> v_type_string(F1, [service_id | Path], TrUserData) end, if F2 == undefined -> ok; true -> v_type_string(F2, [payload | Path], TrUserData) end, + if F3 == undefined -> ok; + true -> v_type_uint32(F3, [timeout | Path], TrUserData) + end, ok; v_msg_invoke(X, Path, _TrUserData) -> mk_type_error({expected_msg, invoke}, X, Path). @@ -2295,7 +2318,10 @@ get_msg_defs() -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]}, - {{msg, invoke}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, + {{msg, invoke}, + [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, + #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, + #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]}, {{msg, service_config}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, @@ -2372,7 +2398,10 @@ find_msg_def(deploy) -> [#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []}, #field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]; -find_msg_def(invoke) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; +find_msg_def(invoke) -> + [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, + #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, + #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]; find_msg_def(service_config) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, diff --git a/message_pb.proto b/message_pb.proto index d4232ba..3831139 100644 --- a/message_pb.proto +++ b/message_pb.proto @@ -40,6 +40,7 @@ message Deploy { message Invoke { string service_id = 1; string payload = 2; + uint32 timeout = 3; } // 参数配置