From d7da93b7e8f83ed11de7a6fbcd3e19e98575e479 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 6 May 2025 00:12:55 +0800 Subject: [PATCH] fix --- apps/efka/src/efka_agent.erl | 2 +- apps/efka/src/efka_agent1.erl | 190 ++++++++++++++++++++++++++++++++++ apps/efka/src/efka_client.erl | 7 +- 3 files changed, 195 insertions(+), 4 deletions(-) create mode 100644 apps/efka/src/efka_agent1.erl diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 96f4ce2..c7d4b64 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -106,7 +106,7 @@ handle_event(info, {auth_reply, {ok, #auth_reply{code = -1, message = Message}}} %% socket的连接状态需要维持 efka_logger:debug("[efka_agent] auth denied, message: ~p", [Message]), {next_state, ?STATE_RESTRICTED, State}; -handle_event(info, {auth_reply, {ok, #auth_reply{code = -2, message = Message}}}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> +handle_event(info, {auth_reply, {ok, #auth_reply{code = -2, message = Message}}}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> %% 其他类型的错误,需要间隔时间重试 efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]), efka_transport:stop(TransportPid), diff --git a/apps/efka/src/efka_agent1.erl b/apps/efka/src/efka_agent1.erl new file mode 100644 index 0000000..94c40a1 --- /dev/null +++ b/apps/efka/src/efka_agent1.erl @@ -0,0 +1,190 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 06. 5月 2025 00:01 +%%%------------------------------------------------------------------- +-module(efka_agent1). +-author("anlicheng"). +-include("message_pb.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%% 标记当前agent的状态,只有在 activated 状态下才可以正常的发送数据 +-define(STATE_DENIED, denied). +-define(STATE_CONNECTING, connecting). +-define(STATE_AUTH, auth). +%% 不能推送消息到服务,但是可以接受服务器的部分指令 +-define(STATE_RESTRICTED, restricted). +%% 激活状态下 +-define(STATE_ACTIVATED, activated). + +-record(state, { + transport_pid :: undefined | pid(), + status = ?STATE_DENIED +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + erlang:process_flag(trap_exit, true), + erlang:start_timer(0, self(), create_transport), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({timeout, _, create_transport}, State = #state{status = ?STATE_DENIED}) -> + {ok, Props} = application:get_env(efka, tls_server), + Host = proplists:get_value(host, Props), + Port = proplists:get_value(port, Props), + {ok, TransportPid} = efka_transport:start_link(self(), Host, Port), + efka_transport:connect(TransportPid), + + {noreply, State#state{status = ?STATE_CONNECTING, transport_pid = TransportPid}}; + +handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, transport_pid = TransportPid}) -> + case Reply of + ok -> + efka_transport:auth_request(TransportPid, 5000), + {noreply, State#state{status = ?STATE_AUTH}}; + {error, Reason} -> + efka_logger:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]), + efka_transport:stop(TransportPid), + {noreply, ?STATE_DENIED, State#state{status = ?STATE_DENIED}} + end; + +handle_info({auth_reply, {ok, #auth_reply{code = 1, message = Message, repository_url = RepositoryUrl}}}, State = #state{status = ?STATE_AUTH}) -> + efka_logger:debug("[efka_agent] auth failed, message: ~p, repository_url: ~p", [Message, RepositoryUrl]), + {noreply, State#state{status = ?STATE_ACTIVATED}}; +handle_info({auth_reply, {ok, #auth_reply{code = -1, message = Message}}}, State = #state{status = ?STATE_AUTH}) -> + %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent + %% socket的连接状态需要维持 + efka_logger:debug("[efka_agent] auth denied, message: ~p", [Message]), + {noreply, State#state{status = ?STATE_RESTRICTED}}; +handle_info({auth_reply, {ok, #auth_reply{code = -2, message = Message}}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> + %% 其他类型的错误,需要间隔时间重试 + efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]), + efka_transport:stop(TransportPid), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; +handle_info({auth_reply, {error, Reason}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> + efka_logger:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), + efka_transport:stop(TransportPid), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + + +%% 云端服务器推送了消息 +%% 激活消息 + +%% TODO +handle_info({server_push_message, <<8:8, ActivatePush>>}, State = #state{transport_pid = TransportPid, status = Status}) -> + #activate_push{auth = Auth} = message_pb:decode_msg(ActivatePush, activate_push), + case {Auth, Status} of + {true, ?STATE_ACTIVATED} -> + {noreply, State}; + {true, ?STATE_DENIED} -> + %% 重新激活, 需要重新校验 + efka_transport:auth_request(TransportPid, 5000), + {noreply, State#state{status = ?STATE_AUTH}}; + {false, _} -> + %% 这个时候的主机应该是受限制的状态,不允许发送消息;但是能够接受服务器推送的消息 + {noreply, State#state{status = ?STATE_RESTRICTED}} + end; + +%% 收到需要回复的指令 +handle_info({server_push_message, PacketId, <<16:8, Directive>>}, State = #state{transport_pid = TransportPid, status = ?STATE_ACTIVATED}) -> + #topic_message{topic = Topic, content = Content} = message_pb:decode_msg(Directive, directive), + efka_logger:debug("[efka_agent] get directive with packet_id: ~p, to device_uuid: ~p, content: ~p", [PacketId, Topic, Content]), + %% 消息发送到订阅系统 + case PacketId > 0 of + true -> + CallbackFun = fun(Response) -> is_process_alive(TransportPid) andalso efka_transport:response(TransportPid, PacketId, Response) end, + efka_subscription:publish(PacketId, Topic, Content, CallbackFun); + false -> + efka_subscription:publish(Topic, Content) + end, + + {noreply, State}; + +%% transport进程退出 +handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) -> + efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), + erlang:start_timer(500000, self(), create_transport), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/efka/src/efka_client.erl b/apps/efka/src/efka_client.erl index 7f33671..fa4b34e 100644 --- a/apps/efka/src/efka_client.erl +++ b/apps/efka/src/efka_client.erl @@ -46,7 +46,7 @@ %% API -export([start_link/3]). -export([device_offline/1, device_online/1]). --export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2]). +-export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2, controller_process/1]). -export([test/0]). @@ -66,6 +66,7 @@ test() -> start_link(<<"test">>, "localhost", 18080). +-spec controller_process(ControllerPid :: pid()) -> ok. controller_process(ControllerPid) when is_pid(ControllerPid) -> gen_server:call(?MODULE, {controller_process, ControllerPid}). @@ -256,7 +257,7 @@ handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) when is_map(Params) -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of true -> Ref = make_ref(), @@ -278,7 +279,7 @@ handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) when is_list(Metrics) -> +handle_info({tcp, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> Message = case is_pid(ControllerPid) andalso is_process_alive(ControllerPid) of true -> Ref = make_ref(),