This commit is contained in:
anlicheng 2025-05-06 10:59:46 +08:00
parent b0d42ef2c8
commit c0254c5f93
5 changed files with 144 additions and 3 deletions

View File

@ -2,7 +2,7 @@
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% , 1: topic的pub/sub机制; 2. target的单点通讯和广播
%%% @end
%%% Created : 21. 4 2025 17:28
%%%-------------------------------------------------------------------
@ -11,6 +11,7 @@
%%
-define(PACKET_REQUEST, 16#01).
-define(PACKET_RESPONSE, 16#02).
%%
-define(PACKET_PUBLISH, 16#03).
-define(PACKET_PUBLISH_RESPONSE, 16#04).
@ -25,6 +26,7 @@
-define(METHOD_INFORM, 16#04).
-define(METHOD_FEEDBACK_STEP, 16#05).
-define(METHOD_FEEDBACK_RESULT, 16#06).
-define(METHOD_EVENT, 16#07).
%% ai识别的事件上报
-define(METHOD_AI_EVENT, 16#08).
@ -33,7 +35,6 @@
-define(EVENT_DEVICE, 16#01).
%%
-define(EVENT_HOST, 16#02).
%% ai相关的事件
-define(EVENT_AI, 16#03).

View File

@ -20,3 +20,19 @@
%% 0: , 1:
status = 0
}).
%%
-record(micro_data, {
id = 0 :: integer(),
device_uuid :: binary(),
service_name :: binary(),
at :: integer(),
tags :: map(),
fields :: binary()
}).
%% id生成器
-record(id_generator, {
id,
value = 1
}).

View File

@ -2,18 +2,21 @@
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% 线
%%% @end
%%% Created : 06. 5 2025 00:01
%%%-------------------------------------------------------------------
-module(efka_agent).
-author("anlicheng").
-include("message_pb.hrl").
-include("efka_tables.hrl").
-include("efka.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([data/5]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -38,6 +41,10 @@
%%% API
%%%===================================================================
%%
data(ServiceName, DeviceUUID, At, Tags, Fields) when is_binary(ServiceName), is_binary(DeviceUUID), is_integer(At), is_map(Tags), is_binary(Fields) ->
gen_server:cast(?SERVER, {data, ServiceName, DeviceUUID, At, Tags, Fields}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
@ -77,6 +84,19 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({data, ServiceName, DeviceUUID, At, Tags, Fields}, State = #state{status = Status}) ->
ok = data_model:insert(#micro_data{
device_uuid = DeviceUUID,
service_name = ServiceName,
at = At,
tags = Tags,
fields = Fields
}),
%%
Status =:= ?STATE_ACTIVATED andalso fetch_next(),
{noreply, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
@ -170,6 +190,28 @@ handle_info({server_push_message, PacketId, <<16:8, Directive>>}, State = #state
{noreply, State};
handle_info(fetch_next, State = #state{status = ?STATE_ACTIVATED, transport_pid = TransportPid}) ->
case data_model:fetch_next() of
error ->
ok;
{ok, #micro_data{device_uuid = DeviceUUID, service_name = ServiceName, at = At, tags = Tags, fields = Fields}} ->
DataPacket = message_pb:encode_msg(#data{
device_uuid = DeviceUUID,
service_name = ServiceName,
at = At,
tags = Tags,
fields = Fields
}),
efka_transport:send(TransportPid, ?METHOD_DATA, DataPacket),
%%
fetch_next()
end,
{noreply, State};
%%
handle_info(fetch_next, State) ->
{noreply, State};
%% transport进程退出
handle_info({'EXIT', TransportPid, Reason}, State = #state{transport_pid = TransportPid}) ->
efka_logger:debug("[efka_agent] transport pid: ~p, exit with reason: ~p", [TransportPid, Reason]),
@ -200,3 +242,6 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%===================================================================
%%% Internal functions
%%%===================================================================
fetch_next() ->
self() ! fetch_next.

View File

@ -0,0 +1,54 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 7 2023 12:31
%%%-------------------------------------------------------------------
-module(data_model).
-author("aresei").
-include("efka_tables.hrl").
-include_lib("stdlib/include/qlc.hrl").
-define(TAB, micro_data).
%% API
-export([create_table/0]).
-export([insert/1, get_all_data/0, fetch_next/0]).
create_table() ->
%% id生成器
mnesia:create_table(micro_data, [
{attributes, record_info(fields, micro_data)},
{record_name, micro_data},
{disc_copies, [node()]},
{type, bag}
]).
-spec insert(MicroData0 :: #micro_data{}) -> ok | {error, Reason :: any()}.
insert(MicroData0 = #micro_data{}) ->
MicroData = MicroData0#micro_data{id = id_model:next_id(?TAB)},
case mnesia:transaction(fun() -> mnesia:write(?TAB, MicroData, write) end) of
{'atomic', ok} ->
ok;
{'aborted', Reason} ->
{error, Reason}
end.
fetch_next() ->
ok.
-spec get_all_data() -> [#micro_data{}].
get_all_data() ->
Fun = fun() ->
Q = qlc:q([E || E <- mnesia:table(?TAB)]),
qlc:e(Q)
end,
case mnesia:transaction(Fun) of
{'atomic', Res} ->
Res;
{'aborted', _} ->
[]
end.

View File

@ -0,0 +1,25 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 5 2025 10:32
%%%-------------------------------------------------------------------
-module(id_model).
-author("anlicheng").
%% API
-export([create_table/0, next_id/1]).
create_table() ->
%% id生成器
mnesia:create_table(id_generator, [
{attributes, record_info(fields, id_generator)},
{record_name, id_generator},
{disc_copies, [node()]},
{type, ordered_set}
]).
next_id(Tab) when is_atom(Tab) ->
mnesia:dirty_update_counter(id_generator, Tab, 1).