remove event

This commit is contained in:
anlicheng 2025-11-14 15:44:41 +08:00
parent 894cba6c02
commit 280a90b019
5 changed files with 16 additions and 69 deletions

View File

@ -84,7 +84,6 @@
{
"method": "metric_data",
"params": {
"device_uuid": <二进制设备唯一标识必填>,
"route_key": <二进制路由键必填>,
"metric": <指标数据必填>
}
@ -94,22 +93,6 @@
#### 响应处理
服务端接收后无返回消息(处理逻辑:`efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric)`
### 4. 事件发送event
#### 功能
向服务进程发送事件消息。
#### 请求格式
```json
{
"method": "event",
"params": {
"event_type": <二进制事件类型必填>,
"body": <事件内容必填>
}
}
```
## 五、基础交互协议
1. **Ping/Pong 心跳**
- 客户端发送 `ping` 消息

View File

@ -193,13 +193,8 @@ handle_request(#{<<"method">> := <<"stream_chunk">>,
%%
handle_request(#{<<"method">> := <<"metric_data">>,
<<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) ->
efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric),
{ok, State};
%% Event事件
handle_request(#{<<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) ->
efka_service:send_event(ServicePid, EventType, Body),
<<"params">> := #{<<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) ->
efka_service:metric_data(ServicePid, RouteKey, Metric),
{ok, State}.
-spec json_result(Id :: integer(), Result :: term()) -> binary().

View File

@ -15,7 +15,7 @@
%% API
-export([start_link/0]).
-export([metric_data/4, event/3, ping/13, task_event_stream/3, close_task_event_stream/2]).
-export([metric_data/2, ping/13, task_event_stream/3, close_task_event_stream/2]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
@ -41,13 +41,9 @@
%%%===================================================================
%%
-spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}).
-spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return().
event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) ->
gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}).
-spec metric_data(RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(RouteKey, Metric) when is_binary(RouteKey), is_binary(Metric) ->
gen_statem:cast(?SERVER, {metric_data, RouteKey, Metric}).
-spec task_event_stream(TaskId :: integer(), Type :: binary(), Stream :: binary()) -> no_return().
task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) ->
@ -90,44 +86,22 @@ callback_mode() ->
%% process message, this function is called.
%% , mnesia
handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
handle_event(cast, {metric_data, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
Packet = message_codec:encode(?MESSAGE_DATA, #data{
service_id = ServiceId,
device_uuid = DeviceUUID,
route_key = RouteKey,
metric = Metric
}),
efka_transport:send(TransportPid, Packet),
{keep_state, State};
handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, _, State) ->
handle_event(cast, {metric_data, RouteKey, Metric}, _, State) ->
Packet = message_codec:encode(?MESSAGE_DATA, #data{
service_id = ServiceId,
device_uuid = DeviceUUID,
route_key = RouteKey,
metric = Metric
}),
ok = cache_model:insert(Packet),
{keep_state, State};
%%
handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
efka_transport:send(TransportPid, EventPacket),
{keep_state, State};
handle_event(cast, {event, ServiceId, EventType, Params}, _, State) ->
EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{
service_id = ServiceId,
event_type = EventType,
params = Params
}),
ok = cache_model:insert(EventPacket),
{keep_state, State};
handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) ->
lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]),
EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{

View File

@ -17,7 +17,7 @@
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, attach_channel/2]).
-export([metric_data/4, send_event/3]).
-export([metric_data/3, send_event/3]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -40,9 +40,9 @@ get_name(ServiceId) when is_binary(ServiceId) ->
get_pid(ServiceId) when is_binary(ServiceId) ->
whereis(get_name(ServiceId)).
-spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) ->
gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}).
-spec metric_data(Pid :: pid(), RouteKey :: binary(), Metric :: binary()) -> no_return().
metric_data(Pid, RouteKey, Metric) when is_pid(Pid), is_binary(RouteKey), is_binary(Metric) ->
gen_server:cast(Pid, {metric_data, RouteKey, Metric}).
-spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return().
send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) ->
@ -102,14 +102,9 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{service_id = ServiceId}) ->
lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", [ServiceId, DeviceUUID, RouteKey, Metric]),
efka_remote_agent:metric_data(ServiceId, DeviceUUID, RouteKey, Metric),
{noreply, State};
handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) ->
efka_remote_agent:event(ServiceId, EventType, Params),
lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]),
handle_cast({metric_data, RouteKey, Metric}, State = #state{service_id = ServiceId}) ->
lager:debug("[efka_service] metric_data service_id: ~p, route_key: ~p, metric data: ~p", [ServiceId, RouteKey, Metric]),
efka_remote_agent:metric_data(RouteKey, Metric),
{noreply, State};
handle_cast(_Request, State = #state{}) ->

View File

@ -33,7 +33,7 @@
%%% API
%%%===================================================================
-spec setup(StreamPid :: pid(), FileName :: string(), FileSize :: integer()) -> Path :: string().
-spec setup(StreamPid :: pid(), FileName :: string(), FileSize :: integer()) -> {ok, Path :: string()}.
setup(StreamPid, FileName, FileSize) when is_pid(StreamPid), is_list(FileName), is_integer(FileSize) ->
gen_server:call(StreamPid, {setup, FileName, FileSize}).