diff --git a/apps/iot/src/endpoint/endpoint_subscription.erl b/apps/iot/src/endpoint/endpoint_subscription.erl index 484ce0e..d9d0f05 100644 --- a/apps/iot/src/endpoint/endpoint_subscription.erl +++ b/apps/iot/src/endpoint/endpoint_subscription.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([subscribe/2, publish/2]). +-export([subscribe/2, publish/3]). -export([match_components/2, is_valid_components/1, of_components/1]). %% gen_server callbacks @@ -45,9 +45,9 @@ subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}). --spec publish(Topic :: binary(), Content :: binary()) -> no_return(). -publish(Topic, Content) when is_binary(Topic), is_binary(Content) -> - gen_server:cast(?SERVER, {publish, Topic, Content}). +-spec publish(RouteKey :: binary(), ServiceId :: binary(), Content :: binary()) -> no_return(). +publish(RouteKey, ServiceId, Content) when is_binary(RouteKey), is_binary(Content) -> + gen_server:cast(?SERVER, {publish, RouteKey, ServiceId, Content}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> @@ -98,13 +98,12 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 发布消息 -handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> - MatchedSubscribers = match_subscribers(Subscribers, Topic), +handle_cast({publish, RouteKey, ServiceId, Metric}, State = #state{subscribers = Subscribers}) -> + MatchedSubscribers = match_subscribers(Subscribers, RouteKey), lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> - SubscriberPid ! {topic_broadcast, Topic, Content} + endpoint:forward(SubscriberPid, ServiceId, Metric) end, MatchedSubscribers), - - lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]), + lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]), {noreply, State}. %% @private diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 84dc24c..bf68e7e 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -365,12 +365,7 @@ handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = D case iot_device:is_activated(Device) of true -> RouteKey = get_route_key(RouteKey0), - case endpoint:get_alias_pid(RouteKey) of - undefined -> - ok; - EndpointPid -> - endpoint:forward(EndpointPid, ServiceId, Metric) - end, + endpoint_subscription:publish(RouteKey, ServiceId, Metric), NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE), {keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}}; false -> @@ -455,7 +450,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> -spec get_route_key(binary()) -> binary(). get_route_key(<<"">>) -> - <<"default">>; + <<"/">>; get_route_key(RouteKey) when is_binary(RouteKey) -> RouteKey.