From 3cdb93ae7c23f1d128ae4c1ff16c9024eca3a2dd Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 29 Apr 2025 21:11:07 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 18 ++++++++-- apps/efka/src/efka_subscription.erl | 54 ++++++++++++++++++++++++----- 2 files changed, 61 insertions(+), 11 deletions(-) diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 235c3bf..0f28637 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -132,16 +132,28 @@ handle_event(info, {server_push_message, <<8:8, ActivatePush>>}, StateName, Stat {true, ?STATE_ACTIVATED} -> {keep_state, State}; {true, ?STATE_DENIED} -> + %% 重新激活, 需要重新校验 efka_transport:auth_request(TransportPid, 5000), {next_state, ?STATE_AUTH, State}; {false, _} -> - {next_state, ?STATE_DENIED, State} + %% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息 + {next_state, ?STATE_RESTRICTED, State} end; -%% TODO 收到需要回复的指令 -handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, _StateName, State = #state{}) -> +%% 收到需要回复的指令 +handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> #topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive), lager:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]), + + %% 消息发送到订阅系统 + case PacketId > 0 of + true -> + CallbackFun = fun(Response) -> is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Response) end, + efka_subscription:publish(PacketId, Topic, Content, CallbackFun); + false -> + efka_subscription:publish(Topic, Content) + end, + {keep_state, State}; %% transport进程退出 diff --git a/apps/efka/src/efka_subscription.erl b/apps/efka/src/efka_subscription.erl index c16add0..992755c 100644 --- a/apps/efka/src/efka_subscription.erl +++ b/apps/efka/src/efka_subscription.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([subscribe/2, publish/2]). +-export([subscribe/2, publish/4, publish/2, response/2]). -export([match_components/2, match_topic/2, is_valid_components/1, of_components/1]). %% gen_server callbacks @@ -29,7 +29,10 @@ }). -record(state, { - subscribers = [] + subscribers = [], + %% #{sha_uuid => fun(Response)}; + %% 为什么不采用packet_id来作为映射关系是因为 efka_agent建立新的efka_transport进程的时候packet_id会重置; 会导致映射关系错误 + callbacks = #{} }). %%%=================================================================== @@ -44,6 +47,14 @@ subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> publish(Topic, Content) when is_binary(Topic), is_binary(Content) -> gen_server:cast(?SERVER, {publish, Topic, Content}). +-spec publish(PacketId :: integer(), Topic :: binary(), Content :: binary(), CallbackFun :: fun((A :: binary()) -> term()) ) -> no_return(). +publish(PacketId, Topic, Content, CallbackFun) when is_integer(PacketId), is_binary(Topic), is_binary(Content) -> + gen_server:cast(?SERVER, {publish, PacketId, Topic, Content, CallbackFun}). + +-spec response(ShaUUID :: binary(), Response :: binary()) -> no_return(). +response(ShaUUID, Response) when is_binary(ShaUUID), is_binary(Response) -> + gen_server:cast(?SERVER, {response, ShaUUID, Response}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -92,10 +103,29 @@ handle_cast({subscribe, Topic, SubscriberPid}, State = #state{subscribers = Subs {noreply, State} end; +%% 不需要响应的推送 handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> - AccPids = dispatch(Subscribers, Topic, Content), + AccPids = dispatch(Subscribers, <<"">>, Topic, Content), lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), - {noreply, State}. + {noreply, State}; + +%% 需要响应的推送 +handle_cast({publish, PacketId, Topic, Content, CallbackFun}, State = #state{subscribers = Subscribers, callbacks = Callbacks}) when PacketId > 0 -> + ShaUUID = sha_uuid(), + AccPids = dispatch(Subscribers, ShaUUID, Topic, Content), + lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, AccPids]), + + {noreply, State#state{callbacks = maps:put(ShaUUID, CallbackFun, Callbacks)}}; + +%% 消息回复 +handle_cast({response, ShaUUID, Response}, State = #state{callbacks = Callbacks}) -> + case maps:take(ShaUUID, Callbacks) of + {F, NCallbacks} -> + F(Response), + {noreply, State#state{callbacks = NCallbacks}}; + error -> + {noreply, State} + end. %% @private %% @doc Handling all non call/cast messages @@ -128,13 +158,13 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec dispatch(Subscribers :: list(), Topic :: binary(), Content :: binary()) -> [pid()]. -dispatch(Subscribers, Topic, Content) when is_list(Subscribers), is_binary(Topic), is_binary(Content) -> +-spec dispatch(Subscribers :: list(), ShaUUID :: binary(), Topic :: binary(), Content :: binary()) -> [pid()]. +dispatch(Subscribers, ShaUUID, Topic, Content) when is_list(Subscribers), is_binary(Topic), is_binary(Content) -> Components = of_components(Topic), lists:foldl(fun(#subscriber{components = Components0, subscriber_pid = SubscriberPid}, AccPids) -> case match_components(Components0, Components) andalso not lists:member(SubscriberPid, AccPids) of true -> - erlang:send(SubscriberPid, {xyz, Content}), + erlang:send(SubscriberPid, {topic_sink, ShaUUID, Content}), [SubscriberPid|AccPids]; false -> AccPids @@ -172,4 +202,12 @@ is_valid_components([<<$+>>|T]) -> is_valid_components([<<$*>>|T]) -> is_valid_components(T); is_valid_components([_|T]) -> - is_valid_components(T). \ No newline at end of file + is_valid_components(T). + +%% 生产唯一的uuid +-spec sha_uuid() -> binary(). +sha_uuid() -> + Salt = crypto:strong_rand_bytes(32), + Str = string:lowercase(binary:encode_hex(crypto:hash(sha256, Salt))), + binary:part(Str, 1, 32). +