From 3f5a13aa652eff5e1905c60b54a8ba130a7afea4 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 00:13:24 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 140 +++++++++++++------------ apps/efka/src/efka_agent1.erl | 190 ---------------------------------- 2 files changed, 76 insertions(+), 254 deletions(-) delete mode 100644 apps/efka/src/efka_agent1.erl diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index c7d4b64..a1648d1 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -2,21 +2,21 @@ %%% @author anlicheng %%% @copyright (C) 2025, %%% @doc -%%% 负责和云端服务器的通讯 +%%% %%% @end -%%% Created : 29. 4月 2025 17:47 +%%% Created : 06. 5月 2025 00:01 %%%------------------------------------------------------------------- -module(efka_agent). -author("anlicheng"). -include("message_pb.hrl"). --behaviour(gen_statem). +-behaviour(gen_server). %% API -export([start_link/0]). -%% gen_statem callbacks --export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). @@ -30,115 +30,123 @@ -define(STATE_ACTIVATED, activated). -record(state, { - transport_pid :: undefined | pid() + transport_pid :: undefined | pid(), + status = ?STATE_DENIED }). %%%=================================================================== %%% API %%%=================================================================== -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link() -> - gen_statem:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== -%%% gen_statem callbacks +%%% gen_server callbacks %%%=================================================================== %% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). init([]) -> erlang:process_flag(trap_exit, true), erlang:start_timer(0, self(), create_transport), - {ok, ?STATE_DENIED, #state{}}. + {ok, #state{}}. %% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. %% @private -%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or -%% (2) when gen_statem terminates abnormally. -%% This callback is optional. -format_status(_Opt, [_PDict, _StateName, _State]) -> - Status = some_term, - Status. +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. %% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. - -%% 异步建立到目标服务器的连接 -handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State = #state{}) -> +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, create_transport}, State = #state{status = ?STATE_DENIED}) -> {ok, Props} = application:get_env(efka, tls_server), Host = proplists:get_value(host, Props), Port = proplists:get_value(port, Props), {ok, TransportPid} = efka_transport:start_link(self(), Host, Port), efka_transport:connect(TransportPid), - {next_state, ?STATE_CONNECTING, State#state{transport_pid = TransportPid}}; + {noreply, State#state{status = ?STATE_CONNECTING, transport_pid = TransportPid}}; -%% 连接的回复 -handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{transport_pid = TransportPid}) -> +handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, transport_pid = TransportPid}) -> case Reply of ok -> efka_transport:auth_request(TransportPid, 5000), - {next_state, ?STATE_AUTH, State}; + {noreply, State#state{status = ?STATE_AUTH}}; {error, Reason} -> efka_logger:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]), efka_transport:stop(TransportPid), - {next_state, ?STATE_DENIED, State} + {noreply, ?STATE_DENIED, State#state{status = ?STATE_DENIED}} end; -%% 权限校验的回复 -handle_event(info, {auth_reply, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}}, ?STATE_AUTH, State) -> +handle_info({auth_reply, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}}, State = #state{status = ?STATE_AUTH}) -> efka_logger:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {next_state, ?STATE_ACTIVATED, State}; -handle_event(info, {auth_reply, {ok, #auth_reply{code = -1, message = Message}}}, ?STATE_AUTH, State) -> + {noreply, State#state{status = ?STATE_ACTIVATED}}; +handle_info({auth_reply, {ok, #auth_reply{code = -1, message = Message}}}, State = #state{status = ?STATE_AUTH}) -> %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent %% socket的连接状态需要维持 efka_logger:debug("[efka_agent] auth denied, message: ~p", [Message]), - {next_state, ?STATE_RESTRICTED, State}; -handle_event(info, {auth_reply, {ok, #auth_reply{code = -2, message = Message}}}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> + {noreply, State#state{status = ?STATE_RESTRICTED}}; +handle_info({auth_reply, {ok, #auth_reply{code = -2, message = Message}}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> %% 其他类型的错误,需要间隔时间重试 efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]), efka_transport:stop(TransportPid), - {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}; -handle_event(info, {auth_reply, {error, Reason}}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; +handle_info({auth_reply, {error, Reason}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> efka_logger:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), efka_transport:stop(TransportPid), - {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}; + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + %% 云端服务器推送了消息 %% 激活消息 %% TODO -handle_event(info, {server_push_message, <<8:8, ActivatePush>>}, StateName, State = #state{transport_pid = TransportPid}) -> +handle_info({server_push_message, <<8:8, ActivatePush>>}, State = #state{transport_pid = TransportPid, status = Status}) -> #activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push), - case {Auth, StateName} of + case {Auth, Status} of {true, ?STATE_ACTIVATED} -> - {keep_state, State}; + {noreply, State}; {true, ?STATE_DENIED} -> %% 重新激活, 需要重新校验 efka_transport:auth_request(TransportPid, 5000), - {next_state, ?STATE_AUTH, State}; + {noreply, State#state{status = ?STATE_AUTH}}; {false, _} -> %% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息 - {next_state, ?STATE_RESTRICTED, State} + {noreply, State#state{status = ?STATE_RESTRICTED}} end; %% 收到需要回复的指令 -handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_info({server_push_message, PacketId, <<16:8, Directive>>}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) -> #topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive), efka_logger:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]), - %% 消息发送到订阅系统 case PacketId > 0 of true -> @@ -148,30 +156,34 @@ handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, ?STATE_ efka_subscription:publish(Topic, Content) end, - {keep_state, State}; + {noreply, State}; %% transport进程退出 -handle_event(info, {'EXIT', TransportPid, Reason}, _StateName, State = #state{transport_pid = TransportPid}) -> +handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) -> efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), erlang:start_timer(500000, self(), create_transport), - {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}; + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; -handle_event(_EventType, _EventContent, _StateName, State = #state{}) -> - NextStateName = the_next_state_name, - {next_state, NextStateName, State}. +handle_info(_Info, State = #state{}) -> + {noreply, State}. %% @private -%% @doc This function is called by a gen_statem when it is about to +%% @doc This function is called by a gen_server when it is about to %% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(_Reason, _StateName, _State = #state{}) -> +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> ok. %% @private %% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. %%%=================================================================== %%% Internal functions diff --git a/apps/efka/src/efka_agent1.erl b/apps/efka/src/efka_agent1.erl deleted file mode 100644 index 94c40a1..0000000 --- a/apps/efka/src/efka_agent1.erl +++ /dev/null @@ -1,190 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 06. 5月 2025 00:01 -%%%------------------------------------------------------------------- --module(efka_agent1). --author("anlicheng"). --include("message_pb.hrl"). - --behaviour(gen_server). - -%% API --export([start_link/0]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). - -%% 标记当前agent的状态,只有在 activated 状态下才可以正常的发送数据 --define(STATE_DENIED, denied). --define(STATE_CONNECTING, connecting). --define(STATE_AUTH, auth). -%% 不能推送消息到服务,但是可以接受服务器的部分指令 --define(STATE_RESTRICTED, restricted). -%% 激活状态下 --define(STATE_ACTIVATED, activated). - --record(state, { - transport_pid :: undefined | pid(), - status = ?STATE_DENIED -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). - -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== - -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - erlang:process_flag(trap_exit, true), - erlang:start_timer(0, self(), create_transport), - {ok, #state{}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, create_transport}, State = #state{status = ?STATE_DENIED}) -> - {ok, Props} = application:get_env(efka, tls_server), - Host = proplists:get_value(host, Props), - Port = proplists:get_value(port, Props), - {ok, TransportPid} = efka_transport:start_link(self(), Host, Port), - efka_transport:connect(TransportPid), - - {noreply, State#state{status = ?STATE_CONNECTING, transport_pid = TransportPid}}; - -handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, transport_pid = TransportPid}) -> - case Reply of - ok -> - efka_transport:auth_request(TransportPid, 5000), - {noreply, State#state{status = ?STATE_AUTH}}; - {error, Reason} -> - efka_logger:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]), - efka_transport:stop(TransportPid), - {noreply, ?STATE_DENIED, State#state{status = ?STATE_DENIED}} - end; - -handle_info({auth_reply, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}}, State = #state{status = ?STATE_AUTH}) -> - efka_logger:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State#state{status = ?STATE_ACTIVATED}}; -handle_info({auth_reply, {ok, #auth_reply{code = -1, message = Message}}}, State = #state{status = ?STATE_AUTH}) -> - %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent - %% socket的连接状态需要维持 - efka_logger:debug("[efka_agent] auth denied, message: ~p", [Message]), - {noreply, State#state{status = ?STATE_RESTRICTED}}; -handle_info({auth_reply, {ok, #auth_reply{code = -2, message = Message}}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> - %% 其他类型的错误,需要间隔时间重试 - efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]), - efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; -handle_info({auth_reply, {error, Reason}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> - efka_logger:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), - efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; - - -%% 云端服务器推送了消息 -%% 激活消息 - -%% TODO -handle_info({server_push_message, <<8:8, ActivatePush>>}, State = #state{transport_pid = TransportPid, status = Status}) -> - #activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push), - case {Auth, Status} of - {true, ?STATE_ACTIVATED} -> - {noreply, State}; - {true, ?STATE_DENIED} -> - %% 重新激活, 需要重新校验 - efka_transport:auth_request(TransportPid, 5000), - {noreply, State#state{status = ?STATE_AUTH}}; - {false, _} -> - %% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息 - {noreply, State#state{status = ?STATE_RESTRICTED}} - end; - -%% 收到需要回复的指令 -handle_info({server_push_message, PacketId, <<16:8, Directive>>}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) -> - #topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive), - efka_logger:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]), - %% 消息发送到订阅系统 - case PacketId > 0 of - true -> - CallbackFun = fun(Response) -> is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Response) end, - efka_subscription:publish(PacketId, Topic, Content, CallbackFun); - false -> - efka_subscription:publish(Topic, Content) - end, - - {noreply, State}; - -%% transport进程退出 -handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) -> - efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), - erlang:start_timer(500000, self(), create_transport), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; - -handle_info(_Info, State = #state{}) -> - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%===================================================================