From 53d0ee9f3a51fd2d9f2595d2ac186b11fdf77c44 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Wed, 17 Sep 2025 11:41:02 +0800 Subject: [PATCH] fix channel --- apps/efka/src/efka_service.erl | 53 ++-------------------------------- 1 file changed, 3 insertions(+), 50 deletions(-) diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index e8e6500..56fb6fd 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -17,7 +17,6 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, attach_channel/2]). --export([invoke/3]). -export([metric_data/4, send_event/3]). %% gen_server callbacks @@ -26,11 +25,7 @@ -record(state, { service_id :: binary(), %% 通道id信息 - channel_pid :: pid() | undefined, - - inflight = #{}, - %% 映射关系: #{Ref => Fun} - callbacks = #{} + channel_pid :: pid() | undefined }). %%%=================================================================== @@ -45,10 +40,6 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). --spec invoke(Pid :: pid(), Ref :: reference(), Payload :: binary()) -> no_return(). -invoke(Pid, Ref, Payload) when is_pid(Pid), is_reference(Ref), is_binary(Payload) -> - gen_server:cast(Pid, {invoke, Ref, self(), Payload}). - -spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}). @@ -78,7 +69,6 @@ start_link(Name, ServiceId) when is_atom(Name), is_binary(ServiceId) -> {stop, Reason :: term()} | ignore). init([ServiceId]) -> %% supervisor进程通过exit(ChildPid, shutdown)调用的时候,确保terminate函数被调用 - erlang:process_flag(trap_exit, true), lager:debug("[efka_service] service_id: ~p, started", [ServiceId]), {ok, #state{service_id = ServiceId}}. @@ -122,17 +112,6 @@ handle_cast({send_event, EventType, Params}, State = #state{service_id = Service lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]), {noreply, State}; -%% 推送配置项目 -handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{channel_pid = ChannelPid, inflight = Inflight}) -> - case is_pid(ChannelPid) andalso is_process_alive(ChannelPid) of - true -> - gen_channel:invoke(ChannelPid, Ref, self(), Payload), - {noreply, State#state{inflight = maps:put(Ref, ReceiverPid, Inflight)}}; - false -> - ReceiverPid ! {service_reply, Ref, {error, <<"channel is not alive">>}}, - {reply, State} - end; - handle_cast(_Request, State = #state{}) -> {noreply, State}. @@ -142,26 +121,10 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). - -%% 处理channel的回复 -handle_info({channel_reply, Ref, Reply}, State = #state{inflight = Inflight, callbacks = Callbacks}) -> - case maps:take(Ref, Inflight) of - error -> - {noreply, State}; - {ReceiverPid, NInflight} -> - ReceiverPid ! {service_reply, Ref, Reply}, - - {noreply, State#state{inflight = NInflight, callbacks = trigger_callback(Ref, Callbacks)}} - end; - -handle_info({Port, {data, Data}}, State = #state{service_id = ServiceId}) when is_port(Port) -> - lager:debug("[efka_service] service_id: ~p, port data: ~p", [ServiceId, Data]), - {noreply, State}; - %% 处理channel进程的退出 handle_info({'DOWN', _Ref, process, ChannelPid, Reason}, State = #state{channel_pid = ChannelPid, service_id = ServiceId}) -> lager:debug("[efka_service] service_id: ~p, channel exited: ~p", [ServiceId, Reason]), - {noreply, State#state{channel_pid = undefined, inflight = #{}}}. + {noreply, State#state{channel_pid = undefined}}. %% @private %% @doc This function is called by a gen_server when it is about to @@ -184,14 +147,4 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions -%%%=================================================================== - --spec trigger_callback(Ref :: reference(), Callbacks :: map()) -> NewCallbacks :: map(). -trigger_callback(Ref, Callbacks) -> - case maps:take(Ref, Callbacks) of - error -> - Callbacks; - {Fun, NCallbacks} -> - catch Fun(), - NCallbacks - end. \ No newline at end of file +%%%=================================================================== \ No newline at end of file