This commit is contained in:
anlicheng 2025-05-07 11:18:43 +08:00
parent 862a3c2a06
commit 05111807ca
2 changed files with 28 additions and 16 deletions

View File

@ -308,7 +308,7 @@ handle_info({server_push_message, PacketId, <<?METHOD_PRAMAS:8, ParamsBin/binary
{noreply, State};
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),
efka_micro_service:push_params(ServicePid, Ref, self(), Params),
efka_micro_service:push_params(ServicePid, Ref, Params),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end,
{noreply, State};
@ -327,12 +327,12 @@ handle_info({server_push_message, PacketId, <<?METHOD_METRICS:8, MetricsBin/bina
{noreply, State};
ServicePid when is_pid(ServicePid) ->
Ref = make_ref(),
efka_micro_service:push_metrics(ServicePid, Ref, self(), Metrics),
efka_micro_service:push_metrics(ServicePid, Ref, Metrics),
{noreply, State#state{inflight = maps:put(Ref, PacketId, Inflight)}}
end;
%% channel的回复
handle_info({channel_reply, Ref, Reply}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
%% efka_micro_service的回复
handle_info({ems_reply, Ref, Reply}, State = #state{transport_pid = TransportPid, inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};

View File

@ -20,7 +20,7 @@
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, start_service/1, stop_service/1, attach_channel/2]).
-export([push_params/4, push_metrics/4, request_params/1, request_metrics/1]).
-export([push_params/3, push_metrics/3, request_params/1, request_metrics/1]).
-export([metric_data/2, send_event/3, send_ai_event/3]).
%% gen_server callbacks
@ -41,6 +41,8 @@
%%
manifest :: undefined | efka_manifest:manifest(),
inflight = #{},
%%
running_status = ?STATUS_STOPPED
}).
@ -65,11 +67,11 @@ get_name(ServiceId) when is_binary(ServiceId) ->
get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)).
push_params(Pid, Ref, ReceiverPid, Params) when is_pid(Pid), is_binary(Params) ->
gen_server:cast(Pid, {push_params, Ref, ReceiverPid, Params}).
push_params(Pid, Ref, Params) when is_pid(Pid), is_binary(Params) ->
gen_server:cast(Pid, {push_params, Ref, self(), Params}).
push_metrics(Pid, Ref, ReceiverPid, Metrics) when is_pid(Pid), is_binary(Metrics) ->
gen_server:cast(Pid, {push_metrics, Ref, ReceiverPid, Metrics}).
push_metrics(Pid, Ref, Metrics) when is_pid(Pid), is_binary(Metrics) ->
gen_server:cast(Pid, {push_metrics, Ref, self(), Metrics}).
request_params(Pid) when is_pid(Pid) ->
gen_server:call(Pid, request_params).
@ -233,22 +235,22 @@ handle_cast({send_ai_event, EventType, Params}, State = #state{service_id = Serv
{noreply, State};
%%
handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
handle_cast({push_params, Ref, ReceiverPid, Params}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:push_params(ChannelPid, Ref, ReceiverPid, Params),
{noreply, State};
efka_tcp_channel:push_params(ChannelPid, Ref, self(), Params),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false ->
ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}},
ReceiverPid ! {ems_reply, Ref, {error, <<"channel is not alive">>}},
{reply, State}
end;
%%
handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid}) ->
handle_cast({push_metrics, Ref, ReceiverPid, Metrics}, State = #state{running_status = ?STATUS_RUNNING, channel_pid = ChannelPid, inflight = Inflight}) ->
case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of
true ->
efka_tcp_channel:push_metrics(ChannelPid, Ref, ReceiverPid, Metrics),
{noreply, State};
efka_tcp_channel:push_metrics(ChannelPid, Ref, self(), Metrics),
{noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}};
false ->
ReceiverPid ! {channel_reply, Ref, {error, <<"channel is not alive">>}},
{noreply, State}
@ -281,6 +283,16 @@ handle_info({timeout, _, reboot_service}, State = #state{service_id = ServiceId,
end
end;
%% channel的回复
handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight}) ->
case maps:take(Ref, Inflight) of
error ->
{noreply, State};
{ReceiverPid, NInflight} ->
ReceiverPid ! {ems_reply, Ref, Reply},
{noreply, State#state{inflight = NInflight}}
end;
handle_info({Port, {data, Data}}, State = #state{port = Port, service_id = ServiceId}) ->
lager:debug("[efka_micro_service] service_id: ~p, port data: ~p", [ServiceId, Data]),
{noreply, State};