调整缓存的刷新策略
This commit is contained in:
parent
2e2865a62f
commit
3c8a232054
@ -107,6 +107,7 @@ handle_event({call, From}, {request_service_config, ReceiverPid, ServiceId}, ?ST
|
|||||||
handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) ->
|
handle_event({call, From}, {request_service_config, _ReceiverPid, _ServiceId}, _, State) ->
|
||||||
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
|
{keep_state, State, [{reply, From, {error, <<"transport is not alive">>}}]};
|
||||||
|
|
||||||
|
%% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia
|
||||||
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
Packet = message_pb:encode_msg(#data{
|
Packet = message_pb:encode_msg(#data{
|
||||||
service_id = ServiceId,
|
service_id = ServiceId,
|
||||||
@ -115,6 +116,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, ?STAT
|
|||||||
}),
|
}),
|
||||||
efka_transport:send(TransportPid, ?METHOD_DATA, Packet),
|
efka_transport:send(TransportPid, ?METHOD_DATA, Packet),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) ->
|
handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, State) ->
|
||||||
Packet = message_pb:encode_msg(#data{
|
Packet = message_pb:encode_msg(#data{
|
||||||
service_id = ServiceId,
|
service_id = ServiceId,
|
||||||
@ -124,6 +126,7 @@ handle_event(cast, {metric_data, ServiceId, DeviceUUID, LineProtocolData}, _, St
|
|||||||
ok = cache_model:insert(?METHOD_DATA, Packet),
|
ok = cache_model:insert(?METHOD_DATA, Packet),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
|
%% 异步发送事件
|
||||||
handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
EventPacket = message_pb:encode_msg(#event{
|
EventPacket = message_pb:encode_msg(#event{
|
||||||
service_id = ServiceId,
|
service_id = ServiceId,
|
||||||
@ -162,6 +165,7 @@ handle_event(cast, {ping, AdCode, BootTime, Province, City, EfkaVersion, KernelA
|
|||||||
efka_transport:send(TransportPid, ?METHOD_PING, Ping),
|
efka_transport:send(TransportPid, ?METHOD_PING, Ping),
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
|
|
||||||
|
%% 异步建立到服务器的连接
|
||||||
handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) ->
|
handle_event(info, {timeout, _, create_transport}, ?STATE_DENIED, State) ->
|
||||||
{ok, Props} = application:get_env(efka, tls_server),
|
{ok, Props} = application:get_env(efka, tls_server),
|
||||||
Host = proplists:get_value(host, Props),
|
Host = proplists:get_value(host, Props),
|
||||||
@ -180,7 +184,7 @@ handle_event(info, {connect_reply, Reply}, ?STATE_CONNECTING, State = #state{tra
|
|||||||
{error, Reason} ->
|
{error, Reason} ->
|
||||||
lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
|
lager:debug("[efka_agent] connect failed, error: ~p, pid: ~p", [Reason, TransportPid]),
|
||||||
efka_transport:stop(TransportPid),
|
efka_transport:stop(TransportPid),
|
||||||
{next_state, ?STATE_DENIED, State}
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
|
handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pid = TransportPid}) ->
|
||||||
@ -190,13 +194,7 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi
|
|||||||
case Code of
|
case Code of
|
||||||
0 ->
|
0 ->
|
||||||
lager:debug("[efka_agent] auth success, message: ~p", [Message]),
|
lager:debug("[efka_agent] auth success, message: ~p", [Message]),
|
||||||
%% 上传缓冲区里面的所有数据
|
{next_state, ?STATE_ACTIVATED, State, [{next_event, info, flush_cache}]};
|
||||||
CacheItems = cache_model:get_all_cache(),
|
|
||||||
lists:foreach(fun(#cache{id = Id, method = Method, data = Packet}) ->
|
|
||||||
efka_transport:send(TransportPid, Method, Packet),
|
|
||||||
cache_model:delete(Id)
|
|
||||||
end, CacheItems),
|
|
||||||
{next_state, ?STATE_ACTIVATED, State};
|
|
||||||
1 ->
|
1 ->
|
||||||
%% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent
|
%% 主机在后台的授权未通过;此时agent不能推送数据给云端服务器,但是云端服务器可以推送命令给agent
|
||||||
%% socket的连接状态需要维持
|
%% socket的连接状态需要维持
|
||||||
@ -219,6 +217,19 @@ handle_event(info, {auth_reply, Reply}, ?STATE_AUTH, State = #state{transport_pi
|
|||||||
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
{next_state, ?STATE_DENIED, State#state{transport_pid = undefined}}
|
||||||
end;
|
end;
|
||||||
|
|
||||||
|
%% 将缓存中的数据推送到服务器端
|
||||||
|
handle_event(info, flush_cache, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
|
||||||
|
case cache_model:fetch_next() of
|
||||||
|
{ok, #cache{id = Id, method = Method, data = Packet}} ->
|
||||||
|
efka_transport:send(TransportPid, Method, Packet),
|
||||||
|
cache_model:delete(Id),
|
||||||
|
{keep_state, State, [{next_event, info, flush_cache}]};
|
||||||
|
error ->
|
||||||
|
{keep_state, State}
|
||||||
|
end;
|
||||||
|
handle_event(info, flush_cache, _, State) ->
|
||||||
|
{keep_state, State};
|
||||||
|
|
||||||
%% 云端服务器推送了消息
|
%% 云端服务器推送了消息
|
||||||
%% 激活消息
|
%% 激活消息
|
||||||
|
|
||||||
|
|||||||
@ -16,6 +16,7 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([create_table/0]).
|
-export([create_table/0]).
|
||||||
-export([insert/2, get_all_cache/0, fetch_next/0, delete/1, next_id/0]).
|
-export([insert/2, get_all_cache/0, fetch_next/0, delete/1, next_id/0]).
|
||||||
|
-export([first_key/0]).
|
||||||
|
|
||||||
create_table() ->
|
create_table() ->
|
||||||
%% id生成器
|
%% id生成器
|
||||||
@ -62,10 +63,12 @@ get_all_cache() ->
|
|||||||
Q = qlc:q([E || E <- mnesia:table(?TAB)]),
|
Q = qlc:q([E || E <- mnesia:table(?TAB)]),
|
||||||
qlc:e(Q)
|
qlc:e(Q)
|
||||||
end,
|
end,
|
||||||
|
|
||||||
case mnesia:transaction(Fun) of
|
case mnesia:transaction(Fun) of
|
||||||
{'atomic', Res} ->
|
{'atomic', Res} ->
|
||||||
Res;
|
Res;
|
||||||
{'aborted', _} ->
|
{'aborted', _} ->
|
||||||
[]
|
[]
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
first_key() ->
|
||||||
|
mnesia:dirty_first(?TAB).
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user