diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 109ea90..d6b7182 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -27,6 +27,7 @@ -define(STATE_DENIED, denied). -define(STATE_CONNECTING, connecting). -define(STATE_AUTH, auth). + %% 不能推送消息到服务,但是可以接受服务器的部分指令 -define(STATE_RESTRICTED, restricted). %% 激活状态下 @@ -117,7 +118,6 @@ handle_cast({event, ServiceId, EventType, Params}, State) -> event_type = EventType, params = Params }), - safe_send(?METHOD_EVENT, EventPacket, State), {noreply, State}; @@ -191,42 +191,44 @@ handle_info({connect_reply, Reply}, State = #state{status = ?STATE_CONNECTING, t end; %% 收到auth回复 -handle_info({auth_reply, {ok, ReplyBin}}, State = #state{status = ?STATE_AUTH, transport_pid = TransportPid}) when is_pid(TransportPid) -> - #auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply), - case Code of - 0 -> - lager:debug("[efka_agent] auth failed, message: ~p", [Message]), +handle_info({auth_reply, Reply}, State = #state{status = ?STATE_AUTH, transport_pid = TransportPid}) when is_pid(TransportPid) -> + case Reply of + {ok, ReplyBin} -> + #auth_reply{code = Code, message = Message} = message_pb:decode_msg(ReplyBin, auth_reply), + case Code of + 0 -> + lager:debug("[efka_agent] auth failed, message: ~p", [Message]), - %% 上传缓冲区里面的所有数据 - CacheItems = cache_model:get_all_cache(), - lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) -> - efka_transport:send(TransportPid, Method, Packet), - cache_model:delete(Id) - end, CacheItems), + %% 上传缓冲区里面的所有数据 + CacheItems = cache_model:get_all_cache(), + lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) -> + efka_transport:send(TransportPid, Method, Packet), + cache_model:delete(Id) + end, CacheItems), - {noreply, State#state{status = ?STATE_ACTIVATED}}; - 1 -> - %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent - %% socket的连接状态需要维持 - lager:debug("[efka_agent] auth denied, message: ~p", [Message]), - {noreply, State#state{status = ?STATE_RESTRICTED}}; - 2 -> - % 其他类型的错误,需要间隔时间重试 - efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]), - efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; - _ -> - % 其他类型的错误,需要间隔时间重试 - lager:debug("[efka_agent] auth failed, invalid message"), + {noreply, State#state{status = ?STATE_ACTIVATED}}; + 1 -> + %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent + %% socket的连接状态需要维持 + lager:debug("[efka_agent] auth denied, message: ~p", [Message]), + {noreply, State#state{status = ?STATE_RESTRICTED}}; + 2 -> + % 其他类型的错误,需要间隔时间重试 + efka_logger:debug("[efka_agent] auth failed, message: ~p", [Message]), + efka_transport:stop(TransportPid), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; + _ -> + % 其他类型的错误,需要间隔时间重试 + lager:debug("[efka_agent] auth failed, invalid message"), + efka_transport:stop(TransportPid), + {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}} + end; + {error, Reason} -> + lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), efka_transport:stop(TransportPid), {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}} end; -handle_info({auth_reply, {error, Reason}}, State = #state{transport_pid = TransportPid, status = ?STATE_AUTH}) -> - lager:debug("[efka_agent] auth_request failed, error: ~p", [Reason]), - efka_transport:stop(TransportPid), - {noreply, State#state{transport_pid = undefined, status = ?STATE_DENIED}}; - %% 云端服务器推送了消息 %% 激活消息