diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 1387ff2..d68f898 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -194,7 +194,7 @@ handle_info(fetch_next, State = #state{status = ?STATE_ACTIVATED, transport_pid case micro_data_model:fetch_next() of error -> ok; - {ok, #micro_data{device_uuid = DeviceUUID, service_name = ServiceName, at = At, tags = Tags, fields = Fields}} -> + {ok, #micro_data{id = Id, device_uuid = DeviceUUID, service_name = ServiceName, at = At, tags = Tags, fields = Fields}} -> DataPacket = message_pb:encode_msg(#data{ device_uuid = DeviceUUID, service_name = ServiceName, @@ -203,6 +203,10 @@ handle_info(fetch_next, State = #state{status = ?STATE_ACTIVATED, transport_pid fields = Fields }), efka_transport:send(TransportPid, ?METHOD_DATA, DataPacket), + + %% 发送后删除记录 + ok = micro_data_model:delete(Id), + %% 触发下一次 fetch_next() end, diff --git a/apps/efka/src/mnesia/micro_data_model.erl b/apps/efka/src/mnesia/micro_data_model.erl index 51eef76..faaf240 100644 --- a/apps/efka/src/mnesia/micro_data_model.erl +++ b/apps/efka/src/mnesia/micro_data_model.erl @@ -15,7 +15,7 @@ %% API -export([create_table/0]). --export([insert/1, get_all_data/0, fetch_next/0]). +-export([insert/1, get_all_data/0, fetch_next/0, delete/1]). create_table() -> %% id生成器 @@ -23,7 +23,7 @@ create_table() -> {attributes, record_info(fields, micro_data)}, {record_name, micro_data}, {disc_copies, [node()]}, - {type, bag} + {type, ordered_set} ]). -spec insert(MicroData0 :: #micro_data{}) -> ok | {error, Reason :: any()}. @@ -37,7 +37,21 @@ insert(MicroData0 = #micro_data{}) -> end. fetch_next() -> - ok. + case mnesia:dirty_first(?TAB) of + '$end_of_table' -> + error; + Id -> + [Entry] = mnesia:dirty_read(?TAB, Id), + {ok, Entry} + end. + +delete(Id) when is_integer(Id) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB, Id, write) end) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. -spec get_all_data() -> [#micro_data{}]. get_all_data() ->