fix route key
This commit is contained in:
parent
97d1322ca8
commit
f942f8090e
@ -13,7 +13,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([subscribe/2, publish/2]).
|
||||
-export([subscribe/2, publish/3]).
|
||||
-export([match_components/2, is_valid_components/1, of_components/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
@ -45,9 +45,9 @@
|
||||
subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) ->
|
||||
gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}).
|
||||
|
||||
-spec publish(Topic :: binary(), Content :: binary()) -> no_return().
|
||||
publish(Topic, Content) when is_binary(Topic), is_binary(Content) ->
|
||||
gen_server:cast(?SERVER, {publish, Topic, Content}).
|
||||
-spec publish(RouteKey :: binary(), ServiceId :: binary(), Content :: binary()) -> no_return().
|
||||
publish(RouteKey, ServiceId, Content) when is_binary(RouteKey), is_binary(Content) ->
|
||||
gen_server:cast(?SERVER, {publish, RouteKey, ServiceId, Content}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
@ -98,13 +98,12 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
%% 发布消息
|
||||
handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) ->
|
||||
MatchedSubscribers = match_subscribers(Subscribers, Topic),
|
||||
handle_cast({publish, RouteKey, ServiceId, Metric}, State = #state{subscribers = Subscribers}) ->
|
||||
MatchedSubscribers = match_subscribers(Subscribers, RouteKey),
|
||||
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
|
||||
SubscriberPid ! {topic_broadcast, Topic, Content}
|
||||
endpoint:forward(SubscriberPid, ServiceId, Metric)
|
||||
end, MatchedSubscribers),
|
||||
|
||||
lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]),
|
||||
lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
|
||||
@ -365,12 +365,7 @@ handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = D
|
||||
case iot_device:is_activated(Device) of
|
||||
true ->
|
||||
RouteKey = get_route_key(RouteKey0),
|
||||
case endpoint:get_alias_pid(RouteKey) of
|
||||
undefined ->
|
||||
ok;
|
||||
EndpointPid ->
|
||||
endpoint:forward(EndpointPid, ServiceId, Metric)
|
||||
end,
|
||||
endpoint_subscription:publish(RouteKey, ServiceId, Metric),
|
||||
NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE),
|
||||
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}};
|
||||
false ->
|
||||
@ -455,7 +450,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
|
||||
-spec get_route_key(binary()) -> binary().
|
||||
get_route_key(<<"">>) ->
|
||||
<<"default">>;
|
||||
<<"/">>;
|
||||
get_route_key(RouteKey) when is_binary(RouteKey) ->
|
||||
RouteKey.
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user