fix subscription

This commit is contained in:
anlicheng 2025-04-29 21:44:39 +08:00
parent 3cdb93ae7c
commit 184a5cff94

View File

@ -14,7 +14,7 @@
%% API %% API
-export([start_link/0]). -export([start_link/0]).
-export([subscribe/2, publish/4, publish/2, response/2]). -export([subscribe/2, publish/4, publish/2, response/2]).
-export([match_components/2, match_topic/2, is_valid_components/1, of_components/1]). -export([match_components/2, is_valid_components/1, of_components/1]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -25,7 +25,12 @@
-record(subscriber, { -record(subscriber, {
topic :: binary(), topic :: binary(),
subscriber_pid :: pid(), subscriber_pid :: pid(),
components = [] components = [],
%%
%% 1. topic优先级别最高
%% 2. *
%% 3. +
order :: integer()
}). }).
-record(state, { -record(state, {
@ -97,23 +102,40 @@ handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subs
Components = of_components(Topic), Components = of_components(Topic),
case is_valid_components(Components) of case is_valid_components(Components) of
true -> true ->
Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components}, Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components, order = order_num(Components)},
{noreply, State#state{subscribers = [Sub|Subscribers]}}; %% SubscriberPid的monitor退
erlang:monitor(process, SubscriberPid),
{noreply, State#state{subscribers = Subscribers ++ [Sub]}};
false -> false ->
{noreply, State} {noreply, State}
end; end;
%% %%
handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) ->
AccPids = dispatch(Subscribers, <<"">>, Topic, Content), %% 广
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), MatchedSubscribers = match_subscribers(Subscribers, Topic),
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
erlang:send(SubscriberPid, {topic_broadcast, Content})
end, MatchedSubscribers),
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]),
{noreply, State}; {noreply, State};
%% %%
handle_cast({publish, PacketId, Topic, Content, CallbackFun}, State = #state{subscribers = Subscribers, callbacks = Callbacks}) when PacketId > 0 -> handle_cast({publish, PacketId, Topic, Content, CallbackFun}, State = #state{subscribers = Subscribers, callbacks = Callbacks}) when PacketId > 0 ->
ShaUUID = sha_uuid(), ShaUUID = sha_uuid(),
AccPids = dispatch(Subscribers, ShaUUID, Topic, Content),
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), %%
MatchedSubscribers = match_subscribers(Subscribers, Topic),
SortedSubscribers = lists:sort(fun(#subscriber{order = Order0}, #subscriber{order = Order1}) -> Order0 < Order1 end, MatchedSubscribers),
case SortedSubscribers of
[] ->
ok;
[S = #subscriber{subscriber_pid = SubscriberPid} | _] ->
lager:debug("[efka_subscription] topic_sink topic: ~p, content: ~p, match subscriber: ~p", [Topic, Content, S]),
erlang:send(SubscriberPid, {topic_sink, Content})
end,
{noreply, State#state{callbacks = maps:put(ShaUUID, CallbackFun, Callbacks)}}; {noreply, State#state{callbacks = maps:put(ShaUUID, CallbackFun, Callbacks)}};
@ -133,7 +155,13 @@ handle_cast({response, ShaUUID, Response}, State = #state{callbacks = Callbacks}
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) -> handle_info({'DOWN', _Ref, process, SubscriberPid, Reason}, State = #state{subscribers = Subscribers}) ->
lager:debug("[efka_subscription] subscriber: ~p, down with reason: ~p", [SubscriberPid, Reason]),
NSubscribers = lists:filter(fun(#subscriber{subscriber_pid = Pid0}) -> SubscriberPid /= Pid0 end, Subscribers),
{noreply, State#state{subscribers = NSubscribers}};
handle_info(Info, State = #state{}) ->
lager:debug("[efka_subscription] get unknown info: ~p", [Info]),
{noreply, State}. {noreply, State}.
%% @private %% @private
@ -171,12 +199,25 @@ dispatch(Subscribers, ShaUUID, Topic, Content) when is_list(Subscribers), is_bin
end end
end, [], Subscribers). end, [], Subscribers).
match_topic(Topic0, Topic1) -> %%
match_components(of_components(Topic0), of_components(Topic1)). -spec match_subscribers(Subscribers :: [#subscriber{}], Topic :: binary()) -> [#subscriber{}].
match_subscribers(Subscribers, Topic) when is_list(Subscribers), is_binary(Topic) ->
Components = of_components(Topic),
lists:foldl(fun(S = #subscriber{components = Components0, subscriber_pid = Pid0}, Acc) ->
case match_components(Components0, Components) andalso not contain_channel(Pid0, Acc) of
true ->
[S|Acc];
false ->
Acc
end
end, [], Subscribers).
-spec contain_channel(Pid :: pid(), Subscribers :: list()) -> boolean().
contain_channel(Pid, Subscribers) when is_pid(Pid), is_list(Subscribers) ->
lists:search(fun(#subscriber{subscriber_pid = Pid0}) -> Pid == Pid0 end, Subscribers) /= false.
%% topic和发布的topic的Components信息 %% topic和发布的topic的Components信息
%% *++ %% *++
-spec match_components(list(), list()) -> boolean(). -spec match_components(list(), list()) -> boolean().
match_components(A, B) when is_list(A), is_list(B) -> match_components(A, B) when is_list(A), is_list(B) ->
match_components(A, B, false). match_components(A, B, false).
@ -204,10 +245,19 @@ is_valid_components([<<$*>>|T]) ->
is_valid_components([_|T]) -> is_valid_components([_|T]) ->
is_valid_components(T). is_valid_components(T).
-spec order_num(Components :: list()) -> integer().
order_num([]) ->
1;
order_num([<<$*>>|_]) ->
2;
order_num([<<$+>>|_]) ->
3;
order_num([_|Tail]) ->
order_num(Tail).
%% uuid %% uuid
-spec sha_uuid() -> binary(). -spec sha_uuid() -> binary().
sha_uuid() -> sha_uuid() ->
Salt = crypto:strong_rand_bytes(32), Salt = crypto:strong_rand_bytes(32),
Str = string:lowercase(binary:encode_hex(crypto:hash(sha256, Salt))), Str = string:lowercase(binary:encode_hex(crypto:hash(sha256, Salt))),
binary:part(Str, 1, 32). binary:part(Str, 1, 32).