diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 9732b78..66d2b1f 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -107,6 +107,7 @@ handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?ST handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) -> {keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]}; +%% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> Packet = message_pb:encode_msg(#data{ service_id = ServiceId, @@ -115,6 +116,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STAT }), efka_transport:send(TransportPid, ?METHOD_DATA, Packet), {keep_state, State}; + handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) -> Packet = message_pb:encode_msg(#data{ service_id = ServiceId, @@ -124,6 +126,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, St ok = cache_model:insert(?METHOD_DATA, Packet), {keep_state, State}; +%% 异步发送事件 handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> EventPacket = message_pb:encode_msg(#event{ service_id = ServiceId, @@ -162,6 +165,7 @@ handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelA efka_transport:send(TransportPid, ?METHOD_PING, Ping), {keep_state, State}; +%% 异步建立到服务器的连接 handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) -> {ok, Props} = application:get_env(efka, tls_server), Host = proplists:get_value(host, Props), @@ -180,7 +184,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra {error, Reason} -> lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]), efka_transport:stop(TransportPid), - {next_state, ?STATE_DENIED, State} + {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}} end; handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) -> @@ -190,13 +194,7 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi case Code of 0 -> lager:debug("[efka_agent] auth success, message: ~p", [Message]), - %% 上传缓冲区里面的所有数据 - CacheItems = cache_model:get_all_cache(), - lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) -> - efka_transport:send(TransportPid, Method, Packet), - cache_model:delete(Id) - end, CacheItems), - {next_state, ?STATE_ACTIVATED, State}; + {next_state, ?STATE_ACTIVATED, State, [{next_event, info, flush_cache}]}; 1 -> %% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent %% socket的连接状态需要维持 @@ -219,6 +217,19 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi {next_state, ?STATE_DENIED, State#state{transport_pid = undefined}} end; +%% 将缓存中的数据推送到服务器端 +handle_event(info, flush_cache, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> + case cache_model:fetch_next() of + {ok, #cache{id = Id, method = Method, data = Packet}} -> + efka_transport:send(TransportPid, Method, Packet), + cache_model:delete(Id), + {keep_state, State, [{next_event, info, flush_cache}]}; + error -> + {keep_state, State} + end; +handle_event(info, flush_cache, _, State) -> + {keep_state, State}; + %% 云端服务器推送了消息 %% 激活消息 diff --git a/apps/efka/src/mnesia/cache_model.erl b/apps/efka/src/mnesia/cache_model.erl index 0718b2e..28fb6f1 100644 --- a/apps/efka/src/mnesia/cache_model.erl +++ b/apps/efka/src/mnesia/cache_model.erl @@ -16,6 +16,7 @@ %% API -export([create_table/0]). -export([insert/2, get_all_cache/0, fetch_next/0, delete/1, next_id/0]). +-export([first_key/0]). create_table() -> %% id生成器 @@ -62,10 +63,12 @@ get_all_cache() -> Q = qlc:q([E || E <- mnesia:table(?TAB)]), qlc:e(Q) end, - case mnesia:transaction(Fun) of {'atomic', Res} -> Res; {'aborted', _} -> [] - end. \ No newline at end of file + end. + +first_key() -> + mnesia:dirty_first(?TAB).