From 280a90b019ffcbf9b54dd276c8bd340b9a18387a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 14 Nov 2025 15:44:41 +0800 Subject: [PATCH] remove event --- README.md | 17 ------------- apps/efka/src/channel/ws_channel.erl | 9 ++----- apps/efka/src/efka_remote_agent.erl | 38 +++++----------------------- apps/efka/src/efka_service.erl | 19 +++++--------- apps/efka/src/efka_stream.erl | 2 +- 5 files changed, 16 insertions(+), 69 deletions(-) diff --git a/README.md b/README.md index c225d43..089a028 100644 --- a/README.md +++ b/README.md @@ -84,7 +84,6 @@ { "method": "metric_data", "params": { - "device_uuid": <二进制,设备唯一标识,必填>, "route_key": <二进制,路由键,必填>, "metric": <指标数据,必填> } @@ -94,22 +93,6 @@ #### 响应处理 服务端接收后无返回消息(处理逻辑:`efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric)`) - -### 4. 事件发送(event) -#### 功能 -向服务进程发送事件消息。 - -#### 请求格式 -```json -{ - "method": "event", - "params": { - "event_type": <二进制,事件类型,必填>, - "body": <事件内容,必填> - } -} -``` - ## 五、基础交互协议 1. **Ping/Pong 心跳**: - 客户端发送 `ping` 消息 diff --git a/apps/efka/src/channel/ws_channel.erl b/apps/efka/src/channel/ws_channel.erl index d1fbdb3..839a62d 100644 --- a/apps/efka/src/channel/ws_channel.erl +++ b/apps/efka/src/channel/ws_channel.erl @@ -193,13 +193,8 @@ handle_request(#{<<"method">> := <<"stream_chunk">>, %% 数据项 handle_request(#{<<"method">> := <<"metric_data">>, - <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), - {ok, State}; - -%% Event事件 -handle_request(#{<<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_service:send_event(ServicePid, EventType, Body), + <<"params">> := #{<<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> + efka_service:metric_data(ServicePid, RouteKey, Metric), {ok, State}. -spec json_result(Id :: integer(), Result :: term()) -> binary(). diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 20f4663..7c31c07 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -15,7 +15,7 @@ %% API -export([start_link/0]). --export([metric_data/4, event/3, ping/13, task_event_stream/3, close_task_event_stream/2]). +-export([metric_data/2, ping/13, task_event_stream/3, close_task_event_stream/2]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -41,13 +41,9 @@ %%%=================================================================== %% 发送数据 --spec metric_data(ServiceId :: binary(), DeviceUUID::binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). -metric_data(ServiceId, DeviceUUID, RouteKey, Metric) when is_binary(ServiceId), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> - gen_statem:cast(?SERVER, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}). - --spec event(ServiceId :: binary(), EventType :: integer(), Params :: binary()) -> no_return(). -event(ServiceId, EventType, Params) when is_binary(ServiceId), is_integer(EventType), is_binary(Params) -> - gen_statem:cast(?SERVER, {event, ServiceId, EventType, Params}). +-spec metric_data(RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(RouteKey, Metric) when is_binary(RouteKey), is_binary(Metric) -> + gen_statem:cast(?SERVER, {metric_data, RouteKey, Metric}). -spec task_event_stream(TaskId :: integer(), Type :: binary(), Stream :: binary()) -> no_return(). task_event_stream(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> @@ -90,44 +86,22 @@ callback_mode() -> %% process message, this function is called. %% 异步发送数据, 连接存在时候直接发送;否则缓存到mnesia -handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> +handle_event(cast, {metric_data, RouteKey, Metric}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> Packet = message_codec:encode(?MESSAGE_DATA, #data{ - service_id = ServiceId, - device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric }), efka_transport:send(TransportPid, Packet), {keep_state, State}; -handle_event(cast, {metric_data, ServiceId, DeviceUUID, RouteKey, Metric}, _, State) -> +handle_event(cast, {metric_data, RouteKey, Metric}, _, State) -> Packet = message_codec:encode(?MESSAGE_DATA, #data{ - service_id = ServiceId, - device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric }), ok = cache_model:insert(Packet), {keep_state, State}; -%% 异步发送事件 -handle_event(cast, {event, ServiceId, EventType, Params}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> - EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{ - service_id = ServiceId, - event_type = EventType, - params = Params - }), - efka_transport:send(TransportPid, EventPacket), - {keep_state, State}; -handle_event(cast, {event, ServiceId, EventType, Params}, _, State) -> - EventPacket = message_codec:encode(?MESSAGE_EVENT, #event{ - service_id = ServiceId, - event_type = EventType, - params = Params - }), - ok = cache_model:insert(EventPacket), - {keep_state, State}; - handle_event(cast, {task_event_stream, TaskId, Type, Stream}, ?STATE_ACTIVATED, State = #state{transport_pid = TransportPid}) -> lager:debug("[efka_remote_agent] event_stream task_id: ~p, stream: ~ts", [TaskId, Stream]), EventPacket = message_codec:encode(?MESSAGE_EVENT_STREAM, #task_event_stream{ diff --git a/apps/efka/src/efka_service.erl b/apps/efka/src/efka_service.erl index 56fb6fd..3ee08d8 100644 --- a/apps/efka/src/efka_service.erl +++ b/apps/efka/src/efka_service.erl @@ -17,7 +17,7 @@ %% API -export([start_link/2]). -export([get_name/1, get_pid/1, attach_channel/2]). --export([metric_data/4, send_event/3]). +-export([metric_data/3, send_event/3]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -40,9 +40,9 @@ get_name(ServiceId) when is_binary(ServiceId) -> get_pid(ServiceId) when is_binary(ServiceId) -> whereis(get_name(ServiceId)). --spec metric_data(Pid :: pid(), DeviceUUID :: binary(), RouteKey :: binary(), Metric :: binary()) -> no_return(). -metric_data(Pid, DeviceUUID, RouteKey, Metric) when is_pid(Pid), is_binary(DeviceUUID), is_binary(RouteKey), is_binary(Metric) -> - gen_server:cast(Pid, {metric_data, DeviceUUID, RouteKey, Metric}). +-spec metric_data(Pid :: pid(), RouteKey :: binary(), Metric :: binary()) -> no_return(). +metric_data(Pid, RouteKey, Metric) when is_pid(Pid), is_binary(RouteKey), is_binary(Metric) -> + gen_server:cast(Pid, {metric_data, RouteKey, Metric}). -spec send_event(Pid :: pid(), EventType :: integer(), Params :: binary()) -> no_return(). send_event(Pid, EventType, Params) when is_pid(Pid), is_integer(EventType), is_binary(Params) -> @@ -102,14 +102,9 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({metric_data, DeviceUUID, RouteKey, Metric}, State = #state{service_id = ServiceId}) -> - lager:debug("[efka_service] metric_data service_id: ~p, device_uuid: ~p, route_key: ~p, metric data: ~p", [ServiceId, DeviceUUID, RouteKey, Metric]), - efka_remote_agent:metric_data(ServiceId, DeviceUUID, RouteKey, Metric), - {noreply, State}; - -handle_cast({send_event, EventType, Params}, State = #state{service_id = ServiceId}) -> - efka_remote_agent:event(ServiceId, EventType, Params), - lager:debug("[efka_service] send_event, service_id: ~p, event_type: ~p, params: ~p", [ServiceId, EventType, Params]), +handle_cast({metric_data, RouteKey, Metric}, State = #state{service_id = ServiceId}) -> + lager:debug("[efka_service] metric_data service_id: ~p, route_key: ~p, metric data: ~p", [ServiceId, RouteKey, Metric]), + efka_remote_agent:metric_data(RouteKey, Metric), {noreply, State}; handle_cast(_Request, State = #state{}) -> diff --git a/apps/efka/src/efka_stream.erl b/apps/efka/src/efka_stream.erl index b3541ae..fdb7308 100644 --- a/apps/efka/src/efka_stream.erl +++ b/apps/efka/src/efka_stream.erl @@ -33,7 +33,7 @@ %%% API %%%=================================================================== --spec setup(StreamPid :: pid(), FileName :: string(), FileSize :: integer()) -> Path :: string(). +-spec setup(StreamPid :: pid(), FileName :: string(), FileSize :: integer()) -> {ok, Path :: string()}. setup(StreamPid, FileName, FileSize) when is_pid(StreamPid), is_list(FileName), is_integer(FileSize) -> gen_server:call(StreamPid, {setup, FileName, FileSize}).