This commit is contained in:
anlicheng 2025-05-08 23:53:30 +08:00
parent 7e2f164459
commit 2e34e76d52
7 changed files with 141 additions and 69 deletions

View File

@ -57,7 +57,8 @@
-define('INVOKE_PB_H', true).
-record(invoke,
{service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
payload = <<>> :: unicode:chardata() | undefined % = 2, optional
payload = <<>> :: unicode:chardata() | undefined, % = 2, optional
timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.

View File

@ -120,13 +120,14 @@ init([ServiceId, Host, Port]) ->
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
ok = gen_tcp:controlling_process(Socket, self()),
Packet = <<1:32, ?PACKET_REGISTER:8, ServiceId/binary>>,
PacketId = 1,
Packet = <<?PACKET_REGISTER:8, PacketId:32, ServiceId/binary>>,
ok = gen_tcp:send(Socket, Packet),
lager:debug("[efka_client] will send packet: ~p", [Packet]),
receive
{tcp, Socket, <<1:32, ?PACKET_RESPONSE, 1:8>>} ->
{ok, #state{packet_id = 2, host = Host, port = Port, socket = Socket}};
{tcp, Socket, <<1:32, ?PACKET_RESPONSE, 0:8, Error/binary>>} ->
{tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, 1:8>>} ->
{ok, #state{packet_id = PacketId + 1, host = Host, port = Port, socket = Socket}};
{tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, Error/binary>>} ->
{stop, Error}
after
?EFKA_REQUEST_TIMEOUT ->
@ -149,7 +150,7 @@ handle_call({controller_process, ControllerPid}, _From, State) ->
%% done
handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = <<PacketId:32, ?PACKET_REQUEST_CONFIG:8>>,
Packet = <<?PACKET_REQUEST_CONFIG:8, PacketId:32>>,
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
@ -168,14 +169,14 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = #
Body = efka_point:normalized(Point),
Len = byte_size(DeviceUUID),
Packet = <<0:32, ?PACKET_METRIC_DATA, Len:8, DeviceUUID/binary, Body/binary>>,
Packet = <<?PACKET_METRIC_DATA, 0:32, Len:8, DeviceUUID/binary, Body/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% done
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Packet = <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>,
Packet = <<?PACKET_EVENT:8, 0:32, EventType:16, Params/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
@ -191,7 +192,7 @@ handle_cast(_Info, State = #state{}) ->
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, PacketId:32, Message/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
error ->
{noreply, State};
@ -206,7 +207,7 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>},
end;
%% efka推送的参数设置
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_PUSH_CONFIG:8, ConfigJson/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
handle_info({tcp, Socket, <<?PACKET_PUSH_CONFIG:8, PacketId:32, ConfigJson/binary>>}, State = #state{socket = Socket, controller_process = ControllerPid}) ->
Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of
true ->
Ref = make_ref(),
@ -222,7 +223,7 @@ handle_info({tcp, Socket, <<PacketId:32, ?PACKET_PUSH_CONFIG:8, ConfigJson/binar
false ->
<<0:8, "处理进程异常"/utf8>>
end,
Packet = <<PacketId:32, ?PACKET_RESPONSE:8, Message/binary>>,
Packet = <<?PACKET_RESPONSE:8, PacketId:32, Message/binary>>,
ok = gen_tcp:send(Socket, Packet),
{noreply, State};

View File

@ -265,7 +265,9 @@ handle_info({server_push, PacketId, <<?PUSH_SERVICE_CONFIG:8, ConfigBin/binary>>
{noreply, State};
%%
handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId}}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) ->
handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}},
State = #state{status = ?STATE_ACTIVATED, inflight = Inflight}) ->
efka_logger:debug("[efka_agent] get invoke: ~p", [Invoke]),
%%
case efka_service:get_pid(ServiceId) of
@ -273,7 +275,12 @@ handle_info({server_push, PacketId, Invoke = #invoke{service_id = ServiceId}}, S
Reply = #push_reply{code = 0, message = <<"micro_service not run">>, result = <<>>},
safe_response(PacketId, message_pb:encode_msg(Reply), State);
ServicePid when is_pid(ServicePid) ->
ok
Ref = make_ref(),
efka_service:invoke(ServicePid, Ref, Payload),
%%
erlang:start_timer(Timeout * 1000, self(), {request_timeout, Ref}),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end,
{noreply, State};

View File

@ -70,6 +70,9 @@ get_pid(ServiceId) when is_binary(ServiceId) ->
push_config(Pid, Ref, ConfigJson) when is_pid(Pid), is_binary(ConfigJson) ->
gen_server:cast(Pid, {push_config, Ref, self(), ConfigJson}).
invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) ->
gen_server:cast(Pid, {invoke, Ref, self(), Payload}).
request_config(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_config).
@ -228,6 +231,17 @@ handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{running_
{reply, State}
end;
%%
handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:invoke(ChannelPid, Ref, self(), Payload),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false ->
ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State}
end;
handle_cast(_Request, State = #state{}) ->
{noreply, State}.

View File

@ -13,7 +13,7 @@
%% API
-export([start_link/1]).
-export([push_config/4]).
-export([push_config/4, invoke/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -32,7 +32,8 @@
%% efka下发给微服务配置
-define(PACKET_PUSH_CONFIG, 5).
%% efka获取自身的采集项
-define(PACKET_INVOKE, 6).
-define(PACKET_REQUEST_CONFIG, 10).
%%
@ -57,6 +58,10 @@
push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) ->
gen_server:cast(ChannelPid, {push_config, Ref, ReceiverPid, ConfigJson}).
-spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return().
invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) ->
gen_server:cast(ChannelPid, {invoke, Ref, ReceiverPid, Payload}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Socket :: gen_tcp:socket()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
@ -96,10 +101,14 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_cast({push_params, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_PUSH_CONFIG:8, ConfigJson/binary>>),
{noreply, State#state{inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
%%
handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<?PACKET_PUSH_CONFIG:8, PacketId:32, ConfigJson/binary>>),
{noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
ok = gen_tcp:send(Socket, <<?PACKET_INVOKE:8, PacketId:32, Payload/binary>>),
{noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
@ -111,39 +120,42 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REGISTER:8, ServiceId/binary>>}, State = #state{socket = Socket}) ->
handle_info({tcp, Socket, <<?PACKET_REGISTER:8, PacketId:32, ServiceId/binary>>}, State = #state{socket = Socket}) ->
case efka_service:get_pid(ServiceId) of
undefined ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 0:8, "service not running">>);
lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]),
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, PacketId:32, 0:8, "service not running">>),
{stop, normal, State};
Pid when is_pid(Pid) ->
case efka_service:attach_channel(Pid, self()) of
ok ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 1:8>>),
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, PacketId:32, 1:8>>),
{noreply, State#state{service_id = ServiceId, service_pid = Pid, is_registered = true}};
{error, Reason} ->
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, 10:8, Reason/binary>>),
{error, Error} ->
lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]),
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, PacketId:32, 0:8, Error/binary>>),
{stop, normal, State}
end
end;
%%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_REQUEST_CONFIG:8>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
handle_info({tcp, Socket, <<?PACKET_REQUEST_CONFIG:8, PacketId:32>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
{ok, ConfigJson} = efka_service:request_config(ServicePid),
ok = gen_tcp:send(Socket, <<PacketId:32, ?PACKET_RESPONSE:8, ConfigJson/binary>>),
ok = gen_tcp:send(Socket, <<?PACKET_RESPONSE:8, PacketId:32, ConfigJson/binary>>),
{noreply, State};
%%
handle_info({tcp, Socket, <<0:32, ?PACKET_METRIC_DATA:8, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
handle_info({tcp, Socket, <<?PACKET_METRIC_DATA:8, 0:32, Len:8, DeviceUUID:Len/binary, Data/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
efka_service:metric_data(ServicePid, DeviceUUID, Data),
{noreply, State};
%% Event事件
handle_info({tcp, Socket, <<0:32, ?PACKET_EVENT:8, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
handle_info({tcp, Socket, <<?PACKET_EVENT:8, 0:32, EventType:16, Params/binary>>}, State = #state{socket = Socket, service_pid = ServicePid, is_registered = true}) ->
efka_service:send_event(ServicePid, EventType, Params),
{noreply, State};
%%
handle_info({tcp, Socket, <<PacketId:32, ?PACKET_RESPONSE:8, Response/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
handle_info({tcp, Socket, <<?PACKET_RESPONSE:8, PacketId:32, Response/binary>>}, State = #state{socket = Socket, inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
error ->
lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Response, PacketId]),
@ -188,3 +200,10 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 32
-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer().
next_packet_id(PacketId) when PacketId >= 4294967295 ->
1;
next_packet_id(PacketId) ->
PacketId + 1.

View File

@ -297,7 +297,7 @@ encode_msg_deploy(#deploy{task_id = F1, service_id = F2, tar_url = F3}, Bin, TrU
encode_msg_invoke(Msg, TrUserData) -> encode_msg_invoke(Msg, <<>>, TrUserData).
encode_msg_invoke(#invoke{service_id = F1, payload = F2}, Bin, TrUserData) ->
encode_msg_invoke(#invoke{service_id = F1, payload = F2, timeout = F3}, Bin, TrUserData) ->
B1 = if F1 == undefined -> Bin;
true ->
begin
@ -308,13 +308,22 @@ encode_msg_invoke(#invoke{service_id = F1, payload = F2}, Bin, TrUserData) ->
end
end
end,
if F2 == undefined -> B1;
B2 = if F2 == undefined -> B1;
true ->
begin
TrF2 = id(F2, TrUserData),
case is_empty_string(TrF2) of
true -> B1;
false -> e_type_string(TrF2, <<B1/binary, 18>>, TrUserData)
end
end
end,
if F3 == undefined -> B2;
true ->
begin
TrF2 = id(F2, TrUserData),
case is_empty_string(TrF2) of
true -> B1;
false -> e_type_string(TrF2, <<B1/binary, 18>>, TrUserData)
TrF3 = id(F3, TrUserData),
if TrF3 =:= 0 -> B2;
true -> e_varint(TrF3, <<B2/binary, 24>>, TrUserData)
end
end
end.
@ -1091,56 +1100,63 @@ skip_32_deploy(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -
skip_64_deploy(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_deploy(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
decode_msg_invoke(Bin, TrUserData) -> dfp_read_field_def_invoke(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData).
decode_msg_invoke(Bin, TrUserData) -> dfp_read_field_def_invoke(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(0, TrUserData), TrUserData).
dfp_read_field_def_invoke(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_invoke_service_id(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData);
dfp_read_field_def_invoke(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_invoke_payload(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData);
dfp_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, _) -> #invoke{service_id = F@_1, payload = F@_2};
dfp_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, TrUserData).
dfp_read_field_def_invoke(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_invoke_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
dfp_read_field_def_invoke(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_invoke_payload(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
dfp_read_field_def_invoke(<<24, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_invoke_timeout(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
dfp_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #invoke{service_id = F@_1, payload = F@_2, timeout = F@_3};
dfp_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dg_read_field_def_invoke(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
dg_read_field_def_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
dg_read_field_def_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) ->
dg_read_field_def_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 32 - 7 -> dg_read_field_def_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
dg_read_field_def_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, TrUserData) ->
Key = X bsl N + Acc,
case Key of
10 -> d_field_invoke_service_id(Rest, 0, 0, 0, F@_1, F@_2, TrUserData);
18 -> d_field_invoke_payload(Rest, 0, 0, 0, F@_1, F@_2, TrUserData);
10 -> d_field_invoke_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData);
18 -> d_field_invoke_payload(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData);
24 -> d_field_invoke_timeout(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData);
_ ->
case Key band 7 of
0 -> skip_varint_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
1 -> skip_64_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
2 -> skip_length_delimited_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
3 -> skip_group_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData);
5 -> skip_32_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData)
0 -> skip_varint_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
1 -> skip_64_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
2 -> skip_length_delimited_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
3 -> skip_group_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData);
5 -> skip_32_invoke(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData)
end
end;
dg_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, _) -> #invoke{service_id = F@_1, payload = F@_2}.
dg_read_field_def_invoke(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #invoke{service_id = F@_1, payload = F@_2, timeout = F@_3}.
d_field_invoke_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_invoke_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
d_field_invoke_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) ->
d_field_invoke_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_invoke_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
d_field_invoke_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, TrUserData) ->
{NewFValue, RestF} = begin Len = X bsl N + Acc, <<Bytes:Len/binary, Rest2/binary>> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end,
dfp_read_field_def_invoke(RestF, 0, 0, F, NewFValue, F@_2, TrUserData).
dfp_read_field_def_invoke(RestF, 0, 0, F, NewFValue, F@_2, F@_3, TrUserData).
d_field_invoke_payload(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_invoke_payload(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
d_field_invoke_payload(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) ->
d_field_invoke_payload(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_invoke_payload(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
d_field_invoke_payload(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, TrUserData) ->
{NewFValue, RestF} = begin Len = X bsl N + Acc, <<Bytes:Len/binary, Rest2/binary>> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end,
dfp_read_field_def_invoke(RestF, 0, 0, F, F@_1, NewFValue, TrUserData).
dfp_read_field_def_invoke(RestF, 0, 0, F, F@_1, NewFValue, F@_3, TrUserData).
skip_varint_invoke(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData);
skip_varint_invoke(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData).
d_field_invoke_timeout(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_invoke_timeout(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
d_field_invoke_timeout(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, TrUserData) ->
{NewFValue, RestF} = {id((X bsl N + Acc) band 4294967295, TrUserData), Rest},
dfp_read_field_def_invoke(RestF, 0, 0, F, F@_1, F@_2, NewFValue, TrUserData).
skip_length_delimited_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData);
skip_length_delimited_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) ->
skip_varint_invoke(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> skip_varint_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData);
skip_varint_invoke(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
skip_length_delimited_invoke(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> skip_length_delimited_invoke(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData);
skip_length_delimited_invoke(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) ->
Length = X bsl N + Acc,
<<_:Length/binary, Rest2/binary>> = Rest,
dfp_read_field_def_invoke(Rest2, 0, 0, F, F@_1, F@_2, TrUserData).
dfp_read_field_def_invoke(Rest2, 0, 0, F, F@_1, F@_2, F@_3, TrUserData).
skip_group_invoke(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) ->
skip_group_invoke(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, TrUserData) ->
{_, Rest} = read_group(Bin, FNum),
dfp_read_field_def_invoke(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData).
dfp_read_field_def_invoke(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, TrUserData).
skip_32_invoke(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData).
skip_32_invoke(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
skip_64_invoke(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData).
skip_64_invoke(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_invoke(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData).
decode_msg_service_config(Bin, TrUserData) -> dfp_read_field_def_service_config(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(0, TrUserData), TrUserData).
@ -1815,7 +1831,7 @@ merge_msg_deploy(#deploy{task_id = PFtask_id, service_id = PFservice_id, tar_url
end}.
-compile({nowarn_unused_function,merge_msg_invoke/3}).
merge_msg_invoke(#invoke{service_id = PFservice_id, payload = PFpayload}, #invoke{service_id = NFservice_id, payload = NFpayload}, _) ->
merge_msg_invoke(#invoke{service_id = PFservice_id, payload = PFpayload, timeout = PFtimeout}, #invoke{service_id = NFservice_id, payload = NFpayload, timeout = NFtimeout}, _) ->
#invoke{service_id =
if NFservice_id =:= undefined -> PFservice_id;
true -> NFservice_id
@ -1823,6 +1839,10 @@ merge_msg_invoke(#invoke{service_id = PFservice_id, payload = PFpayload}, #invok
payload =
if NFpayload =:= undefined -> PFpayload;
true -> NFpayload
end,
timeout =
if NFtimeout =:= undefined -> PFtimeout;
true -> NFtimeout
end}.
-compile({nowarn_unused_function,merge_msg_service_config/3}).
@ -2070,13 +2090,16 @@ v_msg_deploy(X, Path, _TrUserData) -> mk_type_error({expected_msg, deploy}, X, P
-compile({nowarn_unused_function,v_msg_invoke/3}).
-dialyzer({nowarn_function,v_msg_invoke/3}).
v_msg_invoke(#invoke{service_id = F1, payload = F2}, Path, TrUserData) ->
v_msg_invoke(#invoke{service_id = F1, payload = F2, timeout = F3}, Path, TrUserData) ->
if F1 == undefined -> ok;
true -> v_type_string(F1, [service_id | Path], TrUserData)
end,
if F2 == undefined -> ok;
true -> v_type_string(F2, [payload | Path], TrUserData)
end,
if F3 == undefined -> ok;
true -> v_type_uint32(F3, [timeout | Path], TrUserData)
end,
ok;
v_msg_invoke(X, Path, _TrUserData) -> mk_type_error({expected_msg, invoke}, X, Path).
@ -2295,7 +2318,10 @@ get_msg_defs() ->
[#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []},
#field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]},
{{msg, invoke}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]},
{{msg, invoke},
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
#field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]},
{{msg, service_config},
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
#field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
@ -2372,7 +2398,10 @@ find_msg_def(deploy) ->
[#field{name = task_id, fnum = 1, rnum = 2, type = uint32, occurrence = optional, opts = []},
#field{name = service_id, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = tar_url, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}];
find_msg_def(invoke) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}];
find_msg_def(invoke) ->
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
#field{name = payload, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},
#field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}];
find_msg_def(service_config) ->
[#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []},
#field{name = config_json, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []},

View File

@ -40,6 +40,7 @@ message Deploy {
message Invoke {
string service_id = 1;
string payload = 2;
uint32 timeout = 3;
}
//