From 184a5cff9408e2bc9409ba93d21854b674b62ced Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 29 Apr 2025 21:44:39 +0800 Subject: [PATCH] fix subscription --- apps/efka/src/efka_subscription.erl | 78 +++++++++++++++++++++++------ 1 file changed, 64 insertions(+), 14 deletions(-) diff --git a/apps/efka/src/efka_subscription.erl b/apps/efka/src/efka_subscription.erl index 992755c..d640f7b 100644 --- a/apps/efka/src/efka_subscription.erl +++ b/apps/efka/src/efka_subscription.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). -export([subscribe/2, publish/4, publish/2, response/2]). --export([match_components/2, match_topic/2, is_valid_components/1, of_components/1]). +-export([match_components/2, is_valid_components/1, of_components/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -25,7 +25,12 @@ -record(subscriber, { topic :: binary(), subscriber_pid :: pid(), - components = [] + components = [], + %% 优先级 + %% 1. 完全匹配的topic优先级别最高 + %% 2. 带 * 的订阅 + %% 3. 带 + 的订阅 + order :: integer() }). -record(state, { @@ -97,23 +102,40 @@ handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subs Components = of_components(Topic), case is_valid_components(Components) of true -> - Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components}, - {noreply, State#state{subscribers = [Sub|Subscribers]}}; + Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components, order = order_num(Components)}, + %% 建立到SubscriberPid的monitor,进程退出需要清理订阅 + erlang:monitor(process, SubscriberPid), + + {noreply, State#state{subscribers = Subscribers ++ [Sub]}}; false -> {noreply, State} end; %% 不需要响应的推送 handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> - AccPids = dispatch(Subscribers, <<"">>, Topic, Content), - lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), + %% 不需要回复的消息体采用广播的信息 + MatchedSubscribers = match_subscribers(Subscribers, Topic), + lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> + erlang:send(SubscriberPid, {topic_broadcast, Content}) + end, MatchedSubscribers), + + lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]), {noreply, State}; %% 需要响应的推送 handle_cast({publish, PacketId, Topic, Content, CallbackFun}, State = #state{subscribers = Subscribers, callbacks = Callbacks}) when PacketId > 0 -> ShaUUID = sha_uuid(), - AccPids = dispatch(Subscribers, ShaUUID, Topic, Content), - lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), + + %% 基于优先级 + MatchedSubscribers = match_subscribers(Subscribers, Topic), + SortedSubscribers = lists:sort(fun(#subscriber{order = Order0}, #subscriber{order = Order1}) -> Order0 < Order1 end, MatchedSubscribers), + case SortedSubscribers of + [] -> + ok; + [S = #subscriber{subscriber_pid = SubscriberPid} | _] -> + lager:debug("[efka_subscription] topic_sink topic: ~p, content: ~p, match subscriber: ~p", [Topic, Content, S]), + erlang:send(SubscriberPid, {topic_sink, Content}) + end, {noreply, State#state{callbacks = maps:put(ShaUUID, CallbackFun, Callbacks)}}; @@ -133,7 +155,13 @@ handle_cast({response, ShaUUID, Response}, State = #state{callbacks = Callbacks} {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> +handle_info({'DOWN', _Ref, process, SubscriberPid, Reason}, State = #state{subscribers = Subscribers}) -> + lager:debug("[efka_subscription] subscriber: ~p, down with reason: ~p", [SubscriberPid, Reason]), + NSubscribers = lists:filter(fun(#subscriber{subscriber_pid = Pid0}) -> SubscriberPid /= Pid0 end, Subscribers), + {noreply, State#state{subscribers = NSubscribers}}; + +handle_info(Info, State = #state{}) -> + lager:debug("[efka_subscription] get unknown info: ~p", [Info]), {noreply, State}. %% @private @@ -171,12 +199,25 @@ dispatch(Subscribers, ShaUUID, Topic, Content) when is_list(Subscribers), is_bin end end, [], Subscribers). -match_topic(Topic0, Topic1) -> - match_components(of_components(Topic0), of_components(Topic1)). +%% 查找满足条件订阅者 +-spec match_subscribers(Subscribers :: [#subscriber{}], Topic :: binary()) -> [#subscriber{}]. +match_subscribers(Subscribers, Topic) when is_list(Subscribers), is_binary(Topic) -> + Components = of_components(Topic), + lists:foldl(fun(S = #subscriber{components = Components0, subscriber_pid = Pid0}, Acc) -> + case match_components(Components0, Components) andalso not contain_channel(Pid0, Acc) of + true -> + [S|Acc]; + false -> + Acc + end + end, [], Subscribers). + +-spec contain_channel(Pid :: pid(), Subscribers :: list()) -> boolean(). +contain_channel(Pid, Subscribers) when is_pid(Pid), is_list(Subscribers) -> + lists:search(fun(#subscriber{subscriber_pid = Pid0}) -> Pid == Pid0 end, Subscribers) /= false. %% 开始对比订阅的topic和发布的topic的Components信息 %% *表示单级匹配,+表示多级匹配;+只能出现一次,并且只能在末尾 - -spec match_components(list(), list()) -> boolean(). match_components(A, B) when is_list(A), is_list(B) -> match_components(A, B, false). @@ -204,10 +245,19 @@ is_valid_components([<<$*>>|T]) -> is_valid_components([_|T]) -> is_valid_components(T). +-spec order_num(Components :: list()) -> integer(). +order_num([]) -> + 1; +order_num([<<$*>>|_]) -> + 2; +order_num([<<$+>>|_]) -> + 3; +order_num([_|Tail]) -> + order_num(Tail). + %% 生产唯一的uuid -spec sha_uuid() -> binary(). sha_uuid() -> Salt = crypto:strong_rand_bytes(32), Str = string:lowercase(binary:encode_hex(crypto:hash(sha256, Salt))), - binary:part(Str, 1, 32). - + binary:part(Str, 1, 32). \ No newline at end of file