This commit is contained in:
anlicheng 2025-04-29 21:11:07 +08:00
parent a117d81fc6
commit 3cdb93ae7c
2 changed files with 61 additions and 11 deletions

View File

@ -132,16 +132,28 @@ handle_event(info, {server_push_message, <<8:8, ActivatePush>>}, StateName, Stat
{true, ?STATE_ACTIVATED} ->
{keep_state, State};
{true, ?STATE_DENIED} ->
%% ,
efka_transport:auth_request(TransportPid, 5000),
{next_state, ?STATE_AUTH, State};
{false, _} ->
{next_state, ?STATE_DENIED, State}
%%
{next_state, ?STATE_RESTRICTED, State}
end;
%% TODO
handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, _StateName, State = #state{}) ->
%%
handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
#topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive),
lager:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]),
%%
case PacketId > 0 of
true ->
CallbackFun = fun(Response) -> is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Response) end,
efka_subscription:publish(PacketId, Topic, Content, CallbackFun);
false ->
efka_subscription:publish(Topic, Content)
end,
{keep_state, State};
%% transport进程退出

View File

@ -13,7 +13,7 @@
%% API
-export([start_link/0]).
-export([subscribe/2, publish/2]).
-export([subscribe/2, publish/4, publish/2, response/2]).
-export([match_components/2, match_topic/2, is_valid_components/1, of_components/1]).
%% gen_server callbacks
@ -29,7 +29,10 @@
}).
-record(state, {
subscribers = []
subscribers = [],
%% #{sha_uuid => fun(Response)};
%% packet_id来作为映射关系是因为 efka_agent建立新的efka_transport进程的时候packet_id会重置;
callbacks = #{}
}).
%%%===================================================================
@ -44,6 +47,14 @@ subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) ->
publish(Topic, Content) when is_binary(Topic), is_binary(Content) ->
gen_server:cast(?SERVER, {publish, Topic, Content}).
-spec publish(PacketId :: integer(), Topic :: binary(), Content :: binary(), CallbackFun :: fun((A :: binary()) -> term()) ) -> no_return().
publish(PacketId, Topic, Content, CallbackFun) when is_integer(PacketId), is_binary(Topic), is_binary(Content) ->
gen_server:cast(?SERVER, {publish, PacketId, Topic, Content, CallbackFun}).
-spec response(ShaUUID :: binary(), Response :: binary()) -> no_return().
response(ShaUUID, Response) when is_binary(ShaUUID), is_binary(Response) ->
gen_server:cast(?SERVER, {response, ShaUUID, Response}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
@ -92,10 +103,29 @@ handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subs
{noreply, State}
end;
%%
handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) ->
AccPids = dispatch(Subscribers, Topic, Content),
AccPids = dispatch(Subscribers, <<"">>, Topic, Content),
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]),
{noreply, State}.
{noreply, State};
%%
handle_cast({publish, PacketId, Topic, Content, CallbackFun}, State = #state{subscribers = Subscribers, callbacks = Callbacks}) when PacketId > 0 ->
ShaUUID = sha_uuid(),
AccPids = dispatch(Subscribers, ShaUUID, Topic, Content),
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]),
{noreply, State#state{callbacks = maps:put(ShaUUID, CallbackFun, Callbacks)}};
%%
handle_cast({response, ShaUUID, Response}, State = #state{callbacks = Callbacks}) ->
case maps:take(ShaUUID, Callbacks) of
{F, NCallbacks} ->
F(Response),
{noreply, State#state{callbacks = NCallbacks}};
error ->
{noreply, State}
end.
%% @private
%% @doc Handling all non call/cast messages
@ -128,13 +158,13 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions
%%%===================================================================
-spec dispatch(Subscribers :: list(), Topic :: binary(), Content :: binary()) -> [pid()].
dispatch(Subscribers, Topic, Content) when is_list(Subscribers), is_binary(Topic), is_binary(Content) ->
-spec dispatch(Subscribers :: list(), ShaUUID :: binary(), Topic :: binary(), Content :: binary()) -> [pid()].
dispatch(Subscribers, ShaUUID, Topic, Content) when is_list(Subscribers), is_binary(Topic), is_binary(Content) ->
Components = of_components(Topic),
lists:foldl(fun(#subscriber{components = Components0, subscriber_pid = SubscriberPid}, AccPids) ->
case match_components(Components0, Components) andalso not lists:member(SubscriberPid, AccPids) of
true ->
erlang:send(SubscriberPid, {xyz, Content}),
erlang:send(SubscriberPid, {topic_sink, ShaUUID, Content}),
[SubscriberPid|AccPids];
false ->
AccPids
@ -172,4 +202,12 @@ is_valid_components([<<$+>>|T]) ->
is_valid_components([<<$*>>|T]) ->
is_valid_components(T);
is_valid_components([_|T]) ->
is_valid_components(T).
is_valid_components(T).
%% uuid
-spec sha_uuid() -> binary().
sha_uuid() ->
Salt = crypto:strong_rand_bytes(32),
Str = string:lowercase(binary:encode_hex(crypto:hash(sha256, Salt))),
binary:part(Str, 1, 32).