ekfa/apps/efka/src/efka_subscription.erl
2025-09-17 16:01:58 +08:00

201 lines
7.5 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. 4月 2025 23:50
%%%-------------------------------------------------------------------
-module(efka_subscription).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([subscribe/2, publish/2]).
-export([match_components/2, is_valid_components/1, of_components/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%% 定义订阅者
-record(subscriber, {
topic :: binary(),
subscriber_pid :: pid(),
components = [],
%% 优先级
%% 1. 完全匹配的topic优先级别最高
%% 2. 带 * 的订阅
%% 3. 带 + 的订阅
order :: integer()
}).
-record(state, {
subscribers = []
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec subscribe(Topic :: binary(), SubscriberPid :: pid()) -> ok | {error, Reason :: binary()}.
subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) ->
gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}).
-spec publish(Topic :: binary(), Content :: binary()) -> no_return().
publish(Topic, Content) when is_binary(Topic), is_binary(Content) ->
gen_server:cast(?SERVER, {publish, Topic, Content}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 同一个SubscriberPid只能订阅同一个topic一次
handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers = Subscribers}) ->
Components = of_components(Topic),
case is_valid_components(Components) of
true ->
Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components, order = order_num(Components)},
%% 建立到SubscriberPid的monitor进程退出需要清理订阅
erlang:monitor(process, SubscriberPid),
{reply, ok, State#state{subscribers = Subscribers ++ [Sub]}};
false ->
{reply, {error, <<"invalid topic name">>}, State}
end.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%% 发布消息
handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) ->
MatchedSubscribers = match_subscribers(Subscribers, Topic),
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
SubscriberPid ! {topic_broadcast, Topic, Content}
end, MatchedSubscribers),
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'DOWN', _Ref, process, SubscriberPid, Reason}, State = #state{subscribers = Subscribers}) ->
lager:debug("[efka_subscription] subscriber: ~p, down with reason: ~p", [SubscriberPid, Reason]),
NSubscribers = lists:filter(fun(#subscriber{subscriber_pid = Pid0}) -> SubscriberPid /= Pid0 end, Subscribers),
{noreply, State#state{subscribers = NSubscribers}};
handle_info(Info, State = #state{}) ->
lager:debug("[efka_subscription] get unknown info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% 查找满足条件订阅者
-spec match_subscribers(Subscribers :: [#subscriber{}], Topic :: binary()) -> [#subscriber{}].
match_subscribers(Subscribers, Topic) when is_list(Subscribers), is_binary(Topic) ->
Components = of_components(Topic),
lists:foldl(fun(S = #subscriber{components = Components0, subscriber_pid = Pid0}, Acc) ->
case match_components(Components0, Components) andalso not contain_channel(Pid0, Acc) of
true ->
[S|Acc];
false ->
Acc
end
end, [], Subscribers).
-spec contain_channel(Pid :: pid(), Subscribers :: list()) -> boolean().
contain_channel(Pid, Subscribers) when is_pid(Pid), is_list(Subscribers) ->
lists:search(fun(#subscriber{subscriber_pid = Pid0}) -> Pid == Pid0 end, Subscribers) /= false.
%% 开始对比订阅的topic和发布的topic的Components信息
%% *表示单级匹配,+表示多级匹配;+只能出现一次,并且只能在末尾
-spec match_components(list(), list()) -> boolean().
match_components(A, B) when is_list(A), is_list(B) ->
match_components(A, B, false).
match_components([<<"+">>], [_|_], _) ->
true;
match_components([], [], _) ->
true;
match_components([<<"*">>|T0], [_|T1], _) ->
match_components(T0, T1, false);
match_components([C0|T0], [C0|T1], _) ->
match_components(T0, T1, false);
match_components(_, _, _) ->
false.
-spec of_components(Topic :: binary()) -> [binary()].
of_components(Topic) when is_binary(Topic) ->
binary:split(Topic, <<$/>>, [global]).
is_valid_components([]) ->
true;
is_valid_components([<<$+>>|T]) ->
length(T) =:= 0;
is_valid_components([<<$*>>|T]) ->
is_valid_components(T);
is_valid_components([_|T]) ->
is_valid_components(T).
-spec order_num(Components :: list()) -> integer().
order_num([]) ->
1;
order_num([<<$*>>|_]) ->
2;
order_num([<<$+>>|_]) ->
3;
order_num([_|Tail]) ->
order_num(Tail).