remove event

This commit is contained in:
anlicheng 2025-11-14 15:30:29 +08:00
parent 367af5139d
commit c1f487959a
9 changed files with 28 additions and 54 deletions

View File

@ -74,18 +74,10 @@
}). }).
-record(data, { -record(data, {
service_id :: binary(),
device_uuid :: binary(),
route_key :: binary(), route_key :: binary(),
metric :: binary() metric :: binary()
}). }).
-record(event, {
service_id :: binary(),
event_type :: integer(),
params :: binary()
}).
-record(task_event_stream, { -record(task_event_stream, {
task_id :: integer(), task_id :: integer(),
type :: binary(), type :: binary(),

View File

@ -12,7 +12,7 @@
%% API %% API
-export([start_link/1]). -export([start_link/1]).
-export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). -export([get_name/1, get_pid/1, forward/2, reload/2, clean_up/1]).
-export([get_alias_pid/1, is_support/1, get_protocol/1]). -export([get_alias_pid/1, is_support/1, get_protocol/1]).
-export([endpoint_record/1, parse_config/2]). -export([endpoint_record/1, parse_config/2]).
@ -47,9 +47,9 @@ get_alias_name(Name) when is_binary(Name) ->
get_alias_pid(Name) when is_binary(Name) -> get_alias_pid(Name) when is_binary(Name) ->
gproc:whereis_name({n, l, get_alias_name(Name)}). gproc:whereis_name({n, l, get_alias_name(Name)}).
-spec forward(Pid :: pid(), ServiceId :: binary(), Metric :: binary()) -> no_return(). -spec forward(Pid :: pid(), Metric :: binary()) -> no_return().
forward(Pid, ServiceId, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Metric) -> forward(Pid, Metric) when is_pid(Pid), is_binary(Metric) ->
gen_server:cast(Pid, {forward, ServiceId, Metric}). gen_server:cast(Pid, {forward, Metric}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}). gen_statem:cast(Pid, {reload, NEndpoint}).

View File

@ -69,8 +69,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> handle_cast({forward, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), NBuffer = endpoint_buffer:append(Metric, Buffer),
{noreply, State#state{buffer = NBuffer}}; {noreply, State#state{buffer = NBuffer}};
handle_cast(cleanup, State = #state{buffer = Buffer}) -> handle_cast(cleanup, State = #state{buffer = Buffer}) ->
@ -83,10 +83,9 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> handle_info({next_data, Id, Metric}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
Headers = [ Headers = [
{<<"Content-Type">>, <<"application/octet-stream">>}, {<<"Content-Type">>, <<"application/json">>}
{<<"Service-Id">>, ServiceId}
], ],
case hackney:request(post, Url, Headers, Metric) of case hackney:request(post, Url, Headers, Metric) of
{ok, 200, _, ClientRef} -> {ok, 200, _, ClientRef} ->

View File

@ -82,8 +82,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> handle_cast({forward, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), NBuffer = endpoint_buffer:append(Metric, Buffer),
{noreply, State#state{buffer = NBuffer}}. {noreply, State#state{buffer = NBuffer}}.
%% @private %% @private
@ -129,7 +129,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
{noreply, State}; {noreply, State};
%% mqtt服务器 %% mqtt服务器
handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid, handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, client_pid = ClientPid,
endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) ->
ReceiverPid = self(), ReceiverPid = self(),

View File

@ -83,8 +83,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> handle_cast({forward, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), NBuffer = endpoint_buffer:append(Metric, Buffer),
{noreply, State#state{buffer = NBuffer}}. {noreply, State#state{buffer = NBuffer}}.
%% @private %% @private
@ -132,10 +132,9 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
{keep_state, State}; {keep_state, State};
%% mqtt服务器 %% mqtt服务器
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) ->
Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]),
lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]), lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of
ok -> ok ->

View File

@ -13,7 +13,7 @@
%% API %% API
-export([start_link/0]). -export([start_link/0]).
-export([subscribe/2, publish/3]). -export([subscribe/2, publish/2]).
-export([match_components/2, is_valid_components/1, of_components/1]). -export([match_components/2, is_valid_components/1, of_components/1]).
%% gen_server callbacks %% gen_server callbacks
@ -45,9 +45,9 @@
subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) ->
gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}). gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}).
-spec publish(RouteKey :: binary(), ServiceId :: binary(), Content :: binary()) -> no_return(). -spec publish(RouteKey :: binary(), Content :: binary()) -> no_return().
publish(RouteKey, ServiceId, Content) when is_binary(RouteKey), is_binary(Content) -> publish(RouteKey, Content) when is_binary(RouteKey), is_binary(Content) ->
gen_server:cast(?SERVER, {publish, RouteKey, ServiceId, Content}). gen_server:cast(?SERVER, {publish, RouteKey, Content}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link() -> -spec(start_link() ->
@ -98,10 +98,10 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
%% %%
handle_cast({publish, RouteKey, ServiceId, Metric}, State = #state{subscribers = Subscribers}) -> handle_cast({publish, RouteKey, Metric}, State = #state{subscribers = Subscribers}) ->
MatchedSubscribers = match_subscribers(Subscribers, RouteKey), MatchedSubscribers = match_subscribers(Subscribers, RouteKey),
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
endpoint:forward(SubscriberPid, ServiceId, Metric) endpoint:forward(SubscriberPid, Metric)
end, MatchedSubscribers), end, MatchedSubscribers),
lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]), lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]),
{noreply, State}. {noreply, State}.

View File

@ -292,15 +292,11 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
%% %%
handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, handle_event(cast, {handle, {data, #data{route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, has_session = true}) -> State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]), lager:debug("[iot_host] metric_data host: ~p, route_key: ~p, metric: ~p", [UUID, RouteKey0, Metric]),
RouteKey = get_route_key(RouteKey0), RouteKey = get_route_key(RouteKey0),
endpoint_subscription:publish(RouteKey, ServiceId, Metric), endpoint_subscription:publish(RouteKey, Metric),
{keep_state, State};
%%
handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
{keep_state, State}; {keep_state, State};
%% ping的数据是通过aes加密后的 %% ping的数据是通过aes加密后的

View File

@ -59,19 +59,11 @@ encode0(#jsonrpc_request{method = Method, params = Params}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?Bytes, ReqBody) marshal(?Bytes, ReqBody)
]); ]);
encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> encode0(#data{route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?Bytes, ServiceId),
marshal(?Bytes, DeviceUUID),
marshal(?Bytes, RouteKey), marshal(?Bytes, RouteKey),
marshal(?Bytes, Metric) marshal(?Bytes, Metric)
]); ]);
encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
marshal(?I32, EventType),
marshal(?Bytes, Params)
]);
encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) -> encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?I32, TaskId), marshal(?I32, TaskId),
@ -107,10 +99,8 @@ decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) -> decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) ->
#{<<"method">> := Method, <<"params">> := Params} = erlang:binary_to_term(ReqBody), #{<<"method">> := Method, <<"params">> := Params} = erlang:binary_to_term(ReqBody),
{ok, #jsonrpc_request{method = Method, params = Params}}; {ok, #jsonrpc_request{method = Method, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> decode0(?MESSAGE_DATA, [RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; {ok, #data{route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) -> decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) ->
{ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}}; {ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}};
decode0(_, _) -> decode0(_, _) ->

View File

@ -137,8 +137,6 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
case CastMessage of case CastMessage of
#data{} = Data -> #data{} = Data ->
iot_host:handle(HostPid, {data, Data}); iot_host:handle(HostPid, {data, Data});
#event{} = Event ->
iot_host:handle(HostPid, {event, Event});
#task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} -> #task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} ->
iot_event_stream_observer:stream_close(TaskId, Reason); iot_event_stream_observer:stream_close(TaskId, Reason);
#task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> #task_event_stream{task_id = TaskId, type = Type, stream = Stream} ->
@ -153,7 +151,7 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
% {noreply, State}; % {noreply, State};
%% %%
handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, inflight = Inflight}) when PacketId > 0 ->
{ok, RpcReply} = message_codec:decode(ResponseBin), {ok, RpcReply} = message_codec:decode(ResponseBin),
case maps:take(PacketId, Inflight) of case maps:take(PacketId, Inflight) of
error -> error ->