diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 5df9832..4770a5a 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -4,20 +4,19 @@ %%% @doc %%% %%% @end -%%% Created : 20. 4月 2025 18:47 +%%% Created : 29. 4月 2025 17:47 %%%------------------------------------------------------------------- -module(efka_agent). -author("anlicheng"). -include("message_pb.hrl"). --include("efka.hrl"). --behaviour(gen_server). +-behaviour(gen_statem). %% API -export([start_link/0]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). +%% gen_statem callbacks +-export([init/1, format_status/2, handle_event/4, terminate/3, code_change/4, callback_mode/0]). -define(SERVER, ?MODULE). @@ -25,165 +24,148 @@ -define(STATE_DENIED, denied). -define(STATE_CONNECTING, connecting). -define(STATE_AUTH, auth). +%% 不能推送消息到服务,但是可以接受服务器的部分指令 +-define(STATE_RESTRICTED, restricted). +%% 激活状态下 -define(STATE_ACTIVATED, activated). -record(state, { - transport_pid :: undefined | pid(), - status = ?STATE_DENIED + transport_pid :: undefined | pid() }). %%%=================================================================== %%% API %%%=================================================================== -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + gen_statem:start_link({local, ?SERVER}, ?MODULE, [], []). %%%=================================================================== -%%% gen_server callbacks +%%% gen_statem callbacks %%%=================================================================== %% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. init([]) -> erlang:process_flag(trap_exit, true), erlang:start_timer(0, self(), create_transport), - {ok, #state{}}. + {ok, ?STATE_DENIED, #state{}}. %% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. %% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Request, State = #state{}) -> - {noreply, State}. +%% @doc Called (1) whenever sys:get_status/1,2 is called by gen_statem or +%% (2) when gen_statem terminates abnormally. +%% This callback is optional. +format_status(_Opt, [_PDict, _StateName, _State]) -> + Status = some_term, + Status. %% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, create_transport}, State = #state{}) -> +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. + +%% 异步建立到目标服务器的连接 +handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State = #state{}) -> {ok, Props} = application:get_env(efka, tls_server), Host = proplists:get_value(host, Props), Port = proplists:get_value(port, Props), - {ok, TransportPid} = efka_transport:start_link(self(), Host, Port), efka_transport:connect(TransportPid), - {noreply, State#state{transport_pid = TransportPid, status = ?STATE_CONNECTING}}; + {next_state, ?STATE_CONNECTING, State}; -%% 连接的消息确认 -handle_info({connect_reply, Reply}, State = #state{transport_pid = TransportPid}) -> +%% 连接的回复 +handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{transport_pid = TransportPid}) -> case Reply of ok -> efka_transport:auth_request(TransportPid, 5000), - {noreply, State#state{status = ?STATE_AUTH}}; + {next_state, ?STATE_AUTH, State}; {error, Reason} -> lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]), % efka_transport:stop(TransportPid), - {noreply, State#state{status = ?STATE_DENIED}} + {next_state, ?STATE_DENIED, State} end; -%% 验证的消息确认 -handle_info({auth_reply, Reply}, State = #state{transport_pid = TransportPid}) -> +%% 权限校验的回复 +handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> case Reply of {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}} -> lager:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), - {noreply, State#state{status = ?STATE_ACTIVATED}}; + {next_state, ?STATE_ACTIVATED, State}; + %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent %% socket的连接状态需要维持 {ok, #auth_reply{code = -1, message = Message}} -> lager:debug("[efka_agent] auth denied, message: ~p", [Message]), - {noreply, State#state{status = ?STATE_DENIED}}; + {next_state, ?STATE_RESTRICTED, State}; %% 其他类型的错误,需要间隔时间重试 {ok, #auth_reply{code = -2, message = Message}} -> lager:debug("[efka_agent] auth failed, message: ~p", [Message]), efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}; {error, Reason} -> lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}} + {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}} end; %% 云端服务器推送了消息 - %% 激活消息 -handle_info({server_push_message, <<8:8, ActivatePush>>}, State = #state{transport_pid = TransportPid, status = Status}) -> + +%% TODO +handle_event(info, {server_push_message, <<8:8, ActivatePush>>}, StateName, State = #state{transport_pid = TransportPid}) -> #activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push), - case {Auth, Status} of + case {Auth, StateName} of {true, ?STATE_ACTIVATED} -> - {noreply, State}; + {keep_state, State}; {true, ?STATE_DENIED} -> efka_transport:auth_request(TransportPid, 5000), - {noreply, State}; + {next_state, ?STATE_AUTH, State}; {false, _} -> - {noreply, State#state{status = ?STATE_DENIED}} + {next_state, ?STATE_DENIED, State} end; -%% TODO 收到指令 -handle_info({server_push_message, <<16:8, Directive>>}, State = #state{}) -> - #topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive), - lager:debug("[efka_agent] get directive to device_uuid: ~p, content: ~p", [Topic, Content]), - {noreply, State}; - %% TODO 收到需要回复的指令 -handle_info({server_push_message, PacketId, <<16:8, Directive>>}, State = #state{}) -> +handle_event(info, {server_push_message, PacketId, <<16:8, Directive>>}, _StateName, State = #state{}) -> #topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive), lager:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]), - {noreply, State}; + {keep_state, State}; -%% 不管transport因为什么原因退出,间隔一定时间后重试 -handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {'EXIT', TransportPid, Reason}, _StateName, State = #state{transport_pid = TransportPid}) -> lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), erlang:start_timer(5000, self(), create_transport), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + {next_status, ?STATE_DENIED, State#state{transport_pid = undefined}}; -handle_info(Info, State = #state{}) -> - lager:debug("[efka_agent] get unknown info: ~p", [Info]), - {noreply, State}. +handle_event(_EventType, _EventContent, _StateName, State = #state{}) -> + NextStateName = the_next_state_name, + {next_state, NextStateName, State}. %% @private -%% @doc This function is called by a gen_server when it is about to +%% @doc This function is called by a gen_statem when it is about to %% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(_Reason, _StateName, _State = #state{}) -> ok. %% @private %% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. %%%=================================================================== %%% Internal functions -%%%=================================================================== \ No newline at end of file +%%%===================================================================