diff --git a/apps/efka/include/efka.hrl b/apps/efka/include/efka.hrl index ba176e7..472758f 100644 --- a/apps/efka/include/efka.hrl +++ b/apps/efka/include/efka.hrl @@ -2,7 +2,7 @@ %%% @author anlicheng %%% @copyright (C) 2025, %%% @doc -%%% +%%% 扩展部分, 1: 支持基于topic的pub/sub机制; 2. 基于target的单点通讯和广播 %%% @end %%% Created : 21. 4月 2025 17:28 %%%------------------------------------------------------------------- @@ -11,6 +11,7 @@ %% 消息体类型 -define(PACKET_REQUEST, 16#01). -define(PACKET_RESPONSE, 16#02). + %% 服务器端推送消息 -define(PACKET_PUBLISH, 16#03). -define(PACKET_PUBLISH_RESPONSE, 16#04). @@ -25,6 +26,7 @@ -define(METHOD_INFORM, 16#04). -define(METHOD_FEEDBACK_STEP, 16#05). -define(METHOD_FEEDBACK_RESULT, 16#06). + -define(METHOD_EVENT, 16#07). %% ai识别的事件上报 -define(METHOD_AI_EVENT, 16#08). @@ -33,7 +35,6 @@ -define(EVENT_DEVICE, 16#01). %% 主机的相关事件 -define(EVENT_HOST, 16#02). - %% ai相关的事件 -define(EVENT_AI, 16#03). diff --git a/apps/efka/include/efka_tables.hrl b/apps/efka/include/efka_tables.hrl index f31ce01..349b217 100644 --- a/apps/efka/include/efka_tables.hrl +++ b/apps/efka/include/efka_tables.hrl @@ -19,4 +19,20 @@ metrics :: binary(), %% 状态: 0: 停止, 1: 运行中 status = 0 +}). + +%% 数据缓存 +-record(micro_data, { + id = 0 :: integer(), + device_uuid :: binary(), + service_name :: binary(), + at :: integer(), + tags :: map(), + fields :: binary() +}). + +%% id生成器 +-record(id_generator, { + id, + value = 1 }). \ No newline at end of file diff --git a/apps/efka/src/efka_agent.erl b/apps/efka/src/efka_agent.erl index 706f479..687a65b 100644 --- a/apps/efka/src/efka_agent.erl +++ b/apps/efka/src/efka_agent.erl @@ -2,18 +2,21 @@ %%% @author anlicheng %%% @copyright (C) 2025, %%% @doc -%%% +%%% 需要支持 云服务离线时候的数据暂存 %%% @end %%% Created : 06. 5月 2025 00:01 %%%------------------------------------------------------------------- -module(efka_agent). -author("anlicheng"). -include("message_pb.hrl"). +-include("efka_tables.hrl"). +-include("efka.hrl"). -behaviour(gen_server). %% API -export([start_link/0]). +-export([data/5]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -38,6 +41,10 @@ %%% API %%%=================================================================== +%% 发送数据 +data(ServiceName, DeviceUUID, At, Tags, Fields) when is_binary(ServiceName), is_binary(DeviceUUID), is_integer(At), is_map(Tags), is_binary(Fields) -> + gen_server:cast(?SERVER, {data, ServiceName, DeviceUUID, At, Tags, Fields}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -77,6 +84,19 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({data, ServiceName, DeviceUUID, At, Tags, Fields}, State = #state{status = Status}) -> + ok = data_model:insert(#micro_data{ + device_uuid = DeviceUUID, + service_name = ServiceName, + at = At, + tags = Tags, + fields = Fields + }), + %% 触发下一次的数据处理 + Status =:= ?STATE_ACTIVATED andalso fetch_next(), + + {noreply, State}; + handle_cast(_Request, State = #state{}) -> {noreply, State}. @@ -170,6 +190,28 @@ handle_info({server_push_message, PacketId, <<16:8, Directive>>}, State = #state {noreply, State}; +handle_info(fetch_next, State = #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) -> + case data_model:fetch_next() of + error -> + ok; + {ok, #micro_data{device_uuid = DeviceUUID, service_name = ServiceName, at = At, tags = Tags, fields = Fields}} -> + DataPacket = message_pb:encode_msg(#data{ + device_uuid = DeviceUUID, + service_name = ServiceName, + at = At, + tags = Tags, + fields = Fields + }), + efka_transport:send(TransportPid, ?METHOD_DATA, DataPacket), + %% 触发下一次 + fetch_next() + end, + {noreply, State}; + +%% 其他情况下,忽略该消息 +handle_info(fetch_next, State) -> + {noreply, State}; + %% transport进程退出 handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) -> efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]), @@ -200,3 +242,6 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +fetch_next() -> + self() ! fetch_next. \ No newline at end of file diff --git a/apps/efka/src/mnesia/data_model.erl b/apps/efka/src/mnesia/data_model.erl new file mode 100644 index 0000000..cea0035 --- /dev/null +++ b/apps/efka/src/mnesia/data_model.erl @@ -0,0 +1,54 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 04. 7月 2023 12:31 +%%%------------------------------------------------------------------- +-module(data_model). +-author("aresei"). +-include("efka_tables.hrl"). +-include_lib("stdlib/include/qlc.hrl"). + +-define(TAB, micro_data). + +%% API +-export([create_table/0]). +-export([insert/1, get_all_data/0, fetch_next/0]). + +create_table() -> + %% id生成器 + mnesia:create_table(micro_data, [ + {attributes, record_info(fields, micro_data)}, + {record_name, micro_data}, + {disc_copies, [node()]}, + {type, bag} + ]). + +-spec insert(MicroData0 :: #micro_data{}) -> ok | {error, Reason :: any()}. +insert(MicroData0 = #micro_data{}) -> + MicroData = MicroData0#micro_data{id = id_model:next_id(?TAB)}, + case mnesia:transaction(fun() -> mnesia:write(?TAB, MicroData, write) end) of + {'atomic', ok} -> + ok; + {'aborted', Reason} -> + {error, Reason} + end. + +fetch_next() -> + ok. + +-spec get_all_data() -> [#micro_data{}]. +get_all_data() -> + Fun = fun() -> + Q = qlc:q([E || E <- mnesia:table(?TAB)]), + qlc:e(Q) + end, + + case mnesia:transaction(Fun) of + {'atomic', Res} -> + Res; + {'aborted', _} -> + [] + end. \ No newline at end of file diff --git a/apps/efka/src/mnesia/id_model.erl b/apps/efka/src/mnesia/id_model.erl new file mode 100644 index 0000000..9dc5c3a --- /dev/null +++ b/apps/efka/src/mnesia/id_model.erl @@ -0,0 +1,25 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 06. 5月 2025 10:32 +%%%------------------------------------------------------------------- +-module(id_model). +-author("anlicheng"). + +%% API +-export([create_table/0, next_id/1]). + +create_table() -> + %% id生成器 + mnesia:create_table(id_generator, [ + {attributes, record_info(fields, id_generator)}, + {record_name, id_generator}, + {disc_copies, [node()]}, + {type, ordered_set} + ]). + +next_id(Tab) when is_atom(Tab) -> + mnesia:dirty_update_counter(id_generator, Tab, 1). \ No newline at end of file