diff --git a/apps/efka/src/efka_subscription.erl b/apps/efka/src/efka_subscription.erl new file mode 100644 index 0000000..ab5eb42 --- /dev/null +++ b/apps/efka/src/efka_subscription.erl @@ -0,0 +1,170 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 28. 4月 2025 23:50 +%%%------------------------------------------------------------------- +-module(efka_subscription). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([match_components/2, match_topic/2, is_valid_components/1, of_components/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%% 定义订阅者 +-record(subscriber, { + topic :: binary(), + subscriber_pid :: pid(), + components = [] +}). + +-record(state, { + subscribers = [] +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> + gen_server:cast(?SERVER, {subscribe, Topic, SubscriberPid}). + +publish(Topic, Content) when is_binary(Topic), is_binary(Content) -> + gen_server:cast(?SERVER, {publish, Topic, Content}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% 同一个SubscriberPid只能订阅同一个topic一次 +handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subscribers}) -> + Components = of_components(Topic), + case is_valid_components(Components) of + true -> + Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components}, + {noreply, State#state{subscribers = [Sub|Subscribers]}}; + false -> + {noreply, State} + end; + +handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> + AccPids = dispatch(Subscribers, Topic, Content), + lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec dispatch(Subscribers :: list(), Topic :: binary(), Content :: binary()) -> [pid()]. +dispatch(Subscribers, Topic, Content) when is_list(Subscribers), is_binary(Topic), is_binary(Content) -> + Components = of_components(Topic), + lists:foldl(fun(#subscriber{components = Components0, subscriber_pid = SubscriberPid}, AccPids) -> + case match_components(Components0, Components) andalso not lists:member(SubscriberPid, AccPids) of + true -> + erlang:send(SubscriberPid, {xyz, Content}), + [SubscriberPid|AccPids]; + false -> + AccPids + end + end, [], Subscribers). + +match_topic(Topic0, Topic1) -> + match_components(of_components(Topic0), of_components(Topic1)). + +%% 开始对比订阅的topic和发布的topic的Components信息 +%% *表示单级匹配,+表示多级匹配;+只能出现一次,并且只能在末尾 + +-spec match_components(list(), list()) -> boolean(). +match_components([<<$+>>], [_|_]) -> + true; +match_components([], []) -> + true; +match_components([<<$*>>|T0], [_|T1]) -> + match_components(T0, T1); +match_components([C0|T0], [C0|T1]) -> + match_components(T0, T1); +match_components(_, _) -> + false. + +-spec of_components(Topic :: binary()) -> [binary()]. +of_components(Topic) when is_binary(Topic) -> + binary:split(Topic, <<$/>>, [global]). + +is_valid_components([]) -> + true; +is_valid_components([<<$+>>|T]) -> + length(T) =:= 0; +is_valid_components([<<$*>>|T]) -> + is_valid_components(T); +is_valid_components([_|T]) -> + is_valid_components(T). \ No newline at end of file