改成异步

This commit is contained in:
anlicheng 2025-05-07 00:19:26 +08:00
parent 7c7fcf329b
commit 862a3c2a06
3 changed files with 61 additions and 73 deletions

View File

@ -35,7 +35,9 @@
-record(state, {
transport_pid :: undefined | pid(),
status = ?STATE_DENIED
status = ?STATE_DENIED,
%% #{Ref => PacketId}
inflight = #{}
}).
%%%===================================================================
@ -293,7 +295,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_DEPLOY:8, DeployBin/binary
{noreply, State};
%%
handle_info({server_push_message, PacketId, <<?METHOD_PRAMAS:8, ParamsBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
handle_info({server_push_message, PacketId, <<?METHOD_PRAMAS:8, ParamsBin/binary>>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
#service_params{service_id = ServiceId, params = Params} = message_pb:decode_msg(ParamsBin, service_params),
case efka_micro_service:get_pid(ServiceId) of
@ -302,27 +304,17 @@ handle_info({server_push_message, PacketId, <<?METHOD_PRAMAS:8, ParamsBin/binary
code = 0,
message = <<"service not run">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_params(ServicePid, Params, 15000) of
ok ->
Reply = #efka_response{
code = 1,
message = <<"">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
{error, Reason} ->
Reply = #efka_response{
code = 0,
message = Reason
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply))
end
Ref = make_ref(),
efka_micro_service:push_params(ServicePid, Ref, self(), Params),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end,
{noreply, State};
%%
handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/binary>>}, State = #state{transport_pid = TransportPid}) ->
handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/binary>>}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
#service_metrics{service_id = ServiceId, metrics = Metrics} = message_pb:decode_msg(MetricsBin, service_metrics),
case efka_micro_service:get_pid(ServiceId) of
@ -331,9 +323,21 @@ handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/bina
code = 0,
message = <<"service not run">>
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply));
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply)),
{noreply, State};
ServicePid when is_pid(ServicePid) ->
case efka_micro_service:push_metrics(ServicePid, Metrics, 15000) of
Ref = make_ref(),
efka_micro_service:push_metrics(ServicePid, Ref, self(), Metrics),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end;
%% channel的回复
handle_info({channel_reply, Ref, Reply}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{PacketId, NInflight} ->
case Reply of
ok ->
Reply = #efka_response {
code = 1,
@ -346,9 +350,9 @@ handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/bina
message = Reason
},
efka_transport:response(TransportPid, PacketId, message_pb:encode_msg(Reply))
end
end,
{noreply, State};
end,
{noreply, State#state{inflight = NInflight}}
end;
%% TODO
handle_info({server_push_message, <<8:8, ActivatePush/binary>>}, State = #state{transport_pid = TransportPid, status = Status}) ->

View File

@ -20,7 +20,7 @@
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
-export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]).
-export([push_params/4, push_metrics/4, request_params/1, request_metrics/1]).
-export([metric_data/2, send_event/3, send_ai_event/3]).
%% gen_server callbacks
@ -65,11 +65,11 @@ get_name(ServiceId) when is_binary(ServiceId) ->
get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)).
push_params(Pid, Params, Timeout) when is_pid(Pid), is_binary(Params), is_integer(Timeout) ->
gen_server:call(Pid, {push_params, Params, Timeout - 1000}, Timeout).
push_params(Pid, Ref, ReceiverPid, Params) when is_pid(Pid), is_binary(Params) ->
gen_server:cast(Pid, {push_params, Ref, ReceiverPid, Params}).
push_metrics(Pid, Metrics, Timeout) when is_pid(Pid), is_binary(Metrics), is_integer(Timeout) ->
gen_server:call(Pid, {push_metrics, Metrics, Timeout - 1000}, Timeout).
push_metrics(Pid, Ref, ReceiverPid, Metrics) when is_pid(Pid), is_binary(Metrics) ->
gen_server:cast(Pid, {push_metrics, Ref, ReceiverPid, Metrics}).
request_params(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_params).
@ -176,40 +176,6 @@ handle_call(request_metrics, _From, State = #state{service_id = ServiceId, runni
Metrics = micro_service_model:get_metrics(ServiceId),
{reply, {ok, Metrics}, State};
%%
handle_call({push_params, Params, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
{ok, Ref} = efka_tcp_channel:push_params(ChannelPid, self(), Params),
receive
{channel_reply, Ref, {ok, Result}} ->
{reply, {ok, Result}, State};
{channel_reply, Ref, {error, Error}} ->
{reply, {error, Error}, State}
after Timeout ->
{reply, {error, <<"push_params timeout">>}, State}
end;
false ->
{reply, {error, <<"channel is not alive">>}, State}
end;
%%
handle_call({push_metrics, Metrics, Timeout}, _From, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
{ok, Ref} = efka_tcp_channel:push_metrics(ChannelPid, self(), Metrics),
receive
{channel_reply, Ref, {ok, Result}} ->
{reply, {ok, Result}, State};
{channel_reply, Ref, {error, Error}} ->
{reply, {error, Error}, State}
after Timeout ->
{reply, {error, <<"push_metrics timeout">>}, State}
end;
false ->
{reply, {error, <<"channel is not alive">>}, State}
end;
%% :
handle_call(start_service, _From, State = #state{running_status = ?STATUS_RUNNING}) ->
{reply, {error, <<"service is running">>}, State};
@ -266,6 +232,28 @@ handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = Serv
efka_agent:ai_event(ServiceId, EventType, Params),
{noreply, State};
%%
handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:push_params(ChannelPid, Ref, ReceiverPid, Params),
{noreply, State};
false ->
ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State}
end;
%%
handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:push_metrics(ChannelPid, Ref, ReceiverPid, Metrics),
{noreply, State};
false ->
ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}},
{noreply, State}
end;
handle_cast(_Request, State = #state{}) ->
{noreply, State}.

View File

@ -13,7 +13,7 @@
%% API
-export([start_link/1]).
-export([push_metrics/3, push_params/3]).
-export([push_metrics/4, push_params/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -56,17 +56,13 @@
%%% API
%%%===================================================================
-spec push_params(ChannelPid :: pid(), ReceiverPid :: pid(), Params :: binary()) -> {ok, Ref :: reference()}.
push_params(ChannelPid, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params) ->
Ref = make_ref(),
gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}),
{ok, Ref}.
-spec push_params(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Params :: binary()) -> no_return().
push_params(ChannelPid, Ref, ReceiverPid, Params) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Params), is_reference(Ref) ->
gen_server:cast(ChannelPid, {push_params, Ref, ReceiverPid, Params}).
-spec push_metrics(ChannelPid :: pid(), ReceiverPid :: pid(), Metrics :: binary()) -> {ok, Ref :: reference()}.
push_metrics(ChannelPid, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) ->
Ref = make_ref(),
gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}),
{ok, Ref}.
-spec push_metrics(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Metrics :: binary()) -> no_return().
push_metrics(ChannelPid, Ref, ReceiverPid, Metrics) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Metrics) ->
gen_server:cast(ChannelPid, {push_metrics, Ref, ReceiverPid, Metrics}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Socket :: gen_tcp:socket()) ->