fix agent
This commit is contained in:
parent
fc96042cae
commit
2e4d5a64c9
@ -6,7 +6,7 @@
|
|||||||
%%% @end
|
%%% @end
|
||||||
%%% Created : 21. 5月 2025 18:38
|
%%% Created : 21. 5月 2025 18:38
|
||||||
%%%-------------------------------------------------------------------
|
%%%-------------------------------------------------------------------
|
||||||
-module(efka_agent).
|
-module(efka_remote_agent).
|
||||||
-author("anlicheng").
|
-author("anlicheng").
|
||||||
-include("message_pb.hrl").
|
-include("message_pb.hrl").
|
||||||
-include("efka.hrl").
|
-include("efka.hrl").
|
||||||
@ -182,7 +182,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra
|
|||||||
efka_transport:auth_request(TransportPid, AuthBin),
|
efka_transport:auth_request(TransportPid, AuthBin),
|
||||||
{next_state, ?STATE_AUTH, State};
|
{next_state, ?STATE_AUTH, State};
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
|
lager:debug("[efka_remote_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
|
||||||
efka_transport:stop(TransportPid),
|
efka_transport:stop(TransportPid),
|
||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
||||||
end;
|
end;
|
||||||
@ -193,26 +193,26 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi
|
|||||||
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
|
#auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply),
|
||||||
case Code of
|
case Code of
|
||||||
0 ->
|
0 ->
|
||||||
lager:debug("[efka_agent] auth success, message: ~p", [Message]),
|
lager:debug("[efka_remote_agent] auth success, message: ~p", [Message]),
|
||||||
{next_state, ?STATE_ACTIVATED, State, [{next_event, info, flush_cache}]};
|
{next_state, ?STATE_ACTIVATED, State, [{next_event, info, flush_cache}]};
|
||||||
1 ->
|
1 ->
|
||||||
%% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent
|
%% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent
|
||||||
%% socket的连接状态需要维持
|
%% socket的连接状态需要维持
|
||||||
lager:debug("[efka_agent] auth denied, message: ~p", [Message]),
|
lager:debug("[efka_remote_agent] auth denied, message: ~p", [Message]),
|
||||||
{next_state, ?STATE_RESTRICTED, State};
|
{next_state, ?STATE_RESTRICTED, State};
|
||||||
2 ->
|
2 ->
|
||||||
% 其他类型的错误,需要间隔时间重试
|
% 其他类型的错误,需要间隔时间重试
|
||||||
lager:debug("[efka_agent] auth failed, message: ~p", [Message]),
|
lager:debug("[efka_remote_agent] auth failed, message: ~p", [Message]),
|
||||||
efka_transport:stop(TransportPid),
|
efka_transport:stop(TransportPid),
|
||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}};
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}};
|
||||||
_ ->
|
_ ->
|
||||||
% 其他类型的错误,需要间隔时间重试
|
% 其他类型的错误,需要间隔时间重试
|
||||||
lager:debug("[efka_agent] auth failed, invalid message"),
|
lager:debug("[efka_remote_agent] auth failed, invalid message"),
|
||||||
efka_transport:stop(TransportPid),
|
efka_transport:stop(TransportPid),
|
||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
||||||
end;
|
end;
|
||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]),
|
lager:debug("[efka_remote_agent] auth_request failed, error: ~p", [Reason]),
|
||||||
efka_transport:stop(TransportPid),
|
efka_transport:stop(TransportPid),
|
||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
||||||
end;
|
end;
|
||||||
@ -315,7 +315,7 @@ handle_event(info, {server_async_call, PacketId, <<?PUSH_INVOKE:8, InvokeBin/bin
|
|||||||
%% 处理task_log
|
%% 处理task_log
|
||||||
handle_event(info, {server_async_call, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {server_async_call, PacketId, <<?PUSH_TASK_LOG:8, TaskLogBin/binary>>}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
|
#fetch_task_log{task_id = TaskId} = message_pb:decode_msg(TaskLogBin, fetch_task_log),
|
||||||
lager:debug("[efka_agent] get task_log request: ~p", [TaskId]),
|
lager:debug("[efka_remote_agent] get task_log request: ~p", [TaskId]),
|
||||||
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
|
{ok, Logs} = efka_inetd_task_log:get_logs(TaskId),
|
||||||
Reply = case length(Logs) > 0 of
|
Reply = case length(Logs) > 0 of
|
||||||
true ->
|
true ->
|
||||||
@ -345,7 +345,7 @@ handle_event(info, {server_command, ?COMMAND_AUTH, <<Auth:8>>}, StateName, State
|
|||||||
|
|
||||||
%% 处理Pub/Sub机制
|
%% 处理Pub/Sub机制
|
||||||
handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) ->
|
handle_event(info, {server_pub, Topic, Content}, ?STATE_ACTIVATED, State) ->
|
||||||
lager:debug("[efka_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
|
lager:debug("[efka_remote_agent] get pub topic: ~p, content: ~p", [Topic, Content]),
|
||||||
%% 消息发送到订阅系统
|
%% 消息发送到订阅系统
|
||||||
efka_subscription:publish(Topic, Content),
|
efka_subscription:publish(Topic, Content),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
@ -391,7 +391,7 @@ handle_event(info, {timeout, _, {request_timeout, Ref}}, ?STATE_ACTIVATED, State
|
|||||||
|
|
||||||
%% transport进程退出
|
%% transport进程退出
|
||||||
handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) ->
|
handle_event(info, {'DOWN', MRef, process, TransportPid, Reason}, _, State = #state{transport_ref = MRef}) ->
|
||||||
lager:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
|
lager:debug("[efka_remote_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
|
||||||
erlang:start_timer(5000, self(), create_transport),
|
erlang:start_timer(5000, self(), create_transport),
|
||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined, transport_ref = undefined}}.
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined, transport_ref = undefined}}.
|
||||||
|
|
||||||
@ -157,11 +157,11 @@ handle_call(_Request, _From, State = #state{}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
|
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
|
||||||
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]),
|
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]),
|
||||||
efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
|
efka_remote_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
|
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
|
||||||
efka_agent:event(ServiceId, EventType, Params),
|
efka_remote_agent:event(ServiceId, EventType, Params),
|
||||||
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
|
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
|||||||
@ -157,11 +157,11 @@ handle_call(_Request, _From, State = #state{}) ->
|
|||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
|
handle_cast({metric_data, DeviceUUID, LineProtocolData}, State = #state{service_id = ServiceId}) ->
|
||||||
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]),
|
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, metric data: ~p", [ServiceId, DeviceUUID, LineProtocolData]),
|
||||||
efka_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
|
efka_remote_agent:metric_data(ServiceId, DeviceUUID, LineProtocolData),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
|
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
|
||||||
efka_agent:event(ServiceId, EventType, Params),
|
efka_remote_agent:event(ServiceId, EventType, Params),
|
||||||
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
|
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
|
|||||||
@ -65,12 +65,12 @@ init([]) ->
|
|||||||
},
|
},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'efka_agent',
|
id => 'efka_remote_agent',
|
||||||
start => {'efka_agent', start_link, []},
|
start => {'efka_remote_agent', start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
shutdown => 2000,
|
shutdown => 2000,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => ['efka_agent']
|
modules => ['efka_remote_agent']
|
||||||
},
|
},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user