From c1f487959a23226218717a18c1533e4b4374059f Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 14 Nov 2025 15:30:29 +0800 Subject: [PATCH] remove event --- apps/iot/include/message.hrl | 8 -------- apps/iot/src/endpoint/endpoint.erl | 8 ++++---- apps/iot/src/endpoint/endpoint_http.erl | 9 ++++----- apps/iot/src/endpoint/endpoint_kafka.erl | 6 +++--- apps/iot/src/endpoint/endpoint_mqtt.erl | 9 ++++----- apps/iot/src/endpoint/endpoint_subscription.erl | 12 ++++++------ apps/iot/src/iot_host.erl | 10 +++------- apps/iot/src/message/message_codec.erl | 16 +++------------- apps/iot/src/tcp/tcp_channel.erl | 4 +--- 9 files changed, 28 insertions(+), 54 deletions(-) diff --git a/apps/iot/include/message.hrl b/apps/iot/include/message.hrl index 804148c..5bc6e2b 100644 --- a/apps/iot/include/message.hrl +++ b/apps/iot/include/message.hrl @@ -74,18 +74,10 @@ }). -record(data, { - service_id :: binary(), - device_uuid :: binary(), route_key :: binary(), metric :: binary() }). --record(event, { - service_id :: binary(), - event_type :: integer(), - params :: binary() -}). - -record(task_event_stream, { task_id :: integer(), type :: binary(), diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 2e45372..e3a098e 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -12,7 +12,7 @@ %% API -export([start_link/1]). --export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). +-export([get_name/1, get_pid/1, forward/2, reload/2, clean_up/1]). -export([get_alias_pid/1, is_support/1, get_protocol/1]). -export([endpoint_record/1, parse_config/2]). @@ -47,9 +47,9 @@ get_alias_name(Name) when is_binary(Name) -> get_alias_pid(Name) when is_binary(Name) -> gproc:whereis_name({n, l, get_alias_name(Name)}). --spec forward(Pid :: pid(), ServiceId :: binary(), Metric :: binary()) -> no_return(). -forward(Pid, ServiceId, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Metric) -> - gen_server:cast(Pid, {forward, ServiceId, Metric}). +-spec forward(Pid :: pid(), Metric :: binary()) -> no_return(). +forward(Pid, Metric) when is_pid(Pid), is_binary(Metric) -> + gen_server:cast(Pid, {forward, Metric}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). diff --git a/apps/iot/src/endpoint/endpoint_http.erl b/apps/iot/src/endpoint/endpoint_http.erl index 585af1e..fe9a8b6 100644 --- a/apps/iot/src/endpoint/endpoint_http.erl +++ b/apps/iot/src/endpoint/endpoint_http.erl @@ -69,8 +69,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), +handle_cast({forward, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append(Metric, Buffer), {noreply, State#state{buffer = NBuffer}}; handle_cast(cleanup, State = #state{buffer = Buffer}) -> @@ -83,10 +83,9 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> +handle_info({next_data, Id, Metric}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> Headers = [ - {<<"Content-Type">>, <<"application/octet-stream">>}, - {<<"Service-Id">>, ServiceId} + {<<"Content-Type">>, <<"application/json">>} ], case hackney:request(post, Url, Headers, Metric) of {ok, 200, _, ClientRef} -> diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 7c16a8e..3bd2aaf 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -82,8 +82,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), +handle_cast({forward, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append(Metric, Buffer), {noreply, State#state{buffer = NBuffer}}. %% @private @@ -129,7 +129,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {noreply, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid, +handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, client_pid = ClientPid, endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> ReceiverPid = self(), diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index 87eabec..f60d52f 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -83,8 +83,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), +handle_cast({forward, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append(Metric, Buffer), {noreply, State#state{buffer = NBuffer}}. %% @private @@ -132,10 +132,9 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {keep_state, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, - endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> +handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, + endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) -> - Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]), lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of ok -> diff --git a/apps/iot/src/endpoint/endpoint_subscription.erl b/apps/iot/src/endpoint/endpoint_subscription.erl index d9d0f05..48ceffc 100644 --- a/apps/iot/src/endpoint/endpoint_subscription.erl +++ b/apps/iot/src/endpoint/endpoint_subscription.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([subscribe/2, publish/3]). +-export([subscribe/2, publish/2]). -export([match_components/2, is_valid_components/1, of_components/1]). %% gen_server callbacks @@ -45,9 +45,9 @@ subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}). --spec publish(RouteKey :: binary(), ServiceId :: binary(), Content :: binary()) -> no_return(). -publish(RouteKey, ServiceId, Content) when is_binary(RouteKey), is_binary(Content) -> - gen_server:cast(?SERVER, {publish, RouteKey, ServiceId, Content}). +-spec publish(RouteKey :: binary(), Content :: binary()) -> no_return(). +publish(RouteKey, Content) when is_binary(RouteKey), is_binary(Content) -> + gen_server:cast(?SERVER, {publish, RouteKey, Content}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> @@ -98,10 +98,10 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 发布消息 -handle_cast({publish, RouteKey, ServiceId, Metric}, State = #state{subscribers = Subscribers}) -> +handle_cast({publish, RouteKey, Metric}, State = #state{subscribers = Subscribers}) -> MatchedSubscribers = match_subscribers(Subscribers, RouteKey), lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> - endpoint:forward(SubscriberPid, ServiceId, Metric) + endpoint:forward(SubscriberPid, Metric) end, MatchedSubscribers), lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]), {noreply, State}. diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index e3d6c30..d8adbfa 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -292,15 +292,11 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; %% 数据分发 -handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, +handle_event(cast, {handle, {data, #data{route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> - lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]), + lager:debug("[iot_host] metric_data host: ~p, route_key: ~p, metric: ~p", [UUID, RouteKey0, Metric]), RouteKey = get_route_key(RouteKey0), - endpoint_subscription:publish(RouteKey, ServiceId, Metric), - {keep_state, State}; -%% 事件流 -handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> - lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), + endpoint_subscription:publish(RouteKey, Metric), {keep_state, State}; %% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 diff --git a/apps/iot/src/message/message_codec.erl b/apps/iot/src/message/message_codec.erl index 643242c..3de252d 100644 --- a/apps/iot/src/message/message_codec.erl +++ b/apps/iot/src/message/message_codec.erl @@ -59,19 +59,11 @@ encode0(#jsonrpc_request{method = Method, params = Params}) -> iolist_to_binary([ marshal(?Bytes, ReqBody) ]); -encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> +encode0(#data{route_key = RouteKey, metric = Metric}) -> iolist_to_binary([ - marshal(?Bytes, ServiceId), - marshal(?Bytes, DeviceUUID), marshal(?Bytes, RouteKey), marshal(?Bytes, Metric) ]); -encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) -> - iolist_to_binary([ - marshal(?Bytes, ServiceId), - marshal(?I32, EventType), - marshal(?Bytes, Params) - ]); encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) -> iolist_to_binary([ marshal(?I32, TaskId), @@ -107,10 +99,8 @@ decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) -> #{<<"method">> := Method, <<"params">> := Params} = erlang:binary_to_term(ReqBody), {ok, #jsonrpc_request{method = Method, params = Params}}; -decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> - {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; -decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> - {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; +decode0(?MESSAGE_DATA, [RouteKey, Metric]) -> + {ok, #data{route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) -> {ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}}; decode0(_, _) -> diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 9ceb712..9e2c6ea 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -137,8 +137,6 @@ handle_info({tcp, Socket, <>}, State = #state{sock case CastMessage of #data{} = Data -> iot_host:handle(HostPid, {data, Data}); - #event{} = Event -> - iot_host:handle(HostPid, {event, Event}); #task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} -> iot_event_stream_observer:stream_close(TaskId, Reason); #task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> @@ -153,7 +151,7 @@ handle_info({tcp, Socket, <>}, State = #state{sock % {noreply, State}; %% 主机端的消息响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) when PacketId > 0 -> {ok, RpcReply} = message_codec:decode(ResponseBin), case maps:take(PacketId, Inflight) of error ->