diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index f3eae03..1a7afab 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -15,12 +15,13 @@ start(_StartType, _StartArgs) -> %% 启动数据库 mnesia:start(), Tables = mnesia:system_info(tables), + %% 加载必须等待的数据库表 lists:member(router, Tables) andalso mnesia:wait_for_tables([router], infinity), lists:member(host, Tables) andalso mnesia:wait_for_tables([host], infinity), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), - + %% 启动http服务 start_http_server(), iot_sup:start_link(). diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 4c62d20..bf77e64 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -13,7 +13,7 @@ -behaviour(gen_server). %% API --export([start_link/2, get_name/1]). +-export([start_link/2, get_name/1, get_pid/1, publish/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -29,10 +29,16 @@ %%%=================================================================== %%% API %%%=================================================================== +get_pid(HostId) when is_binary(HostId) -> + Name = get_name(HostId), + global:whereis_name(Name). get_name(HostId) when is_binary(HostId) -> binary_to_atom(<<"iot_host:", HostId/binary>>). +publish(Pid, Topic, Message, Qos) when is_pid(Pid), is_binary(Topic) -> + gen_server:call(Pid, {publish, Topic, Message, Qos}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Name :: atom(), Host :: #host{}) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -48,7 +54,7 @@ start_link(Name, Host = #host{}) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Host]) -> +init([Host = #host{host_id = HostId}]) -> lager:debug("[iot_host] host is: ~p", [Host]), %% 建立到emqx服务器的连接 {ok, Props} = application:get_env(iot, emqx_server), @@ -71,12 +77,14 @@ init([Host]) -> case emqtt:start_link(Opts) of {ok, ConnPid} -> - lager:debug("[iot_host] connect success"), + lager:debug("[iot_host] connect success, pid: ~p", [ConnPid]), %% 监听和host相关的全部事件 - {ok, _Props} = emqtt:connect(ConnPid), - {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {<<"/host/123">>, qos1}), - %% ok = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 0}]). - %% {ok, _PktId} = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 1}]). + {ok, _} = emqtt:connect(ConnPid), + Topics = [ + {<<"/host/", HostId/binary>>, qos1} + ], + SubscribeResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("subscribe result is: ~p", [SubscribeResult]), {ok, #state{host = Host, is_connected = true, emqx_pid = ConnPid}}; ignore -> @@ -97,8 +105,9 @@ init([Host]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. +handle_call({publish, Topic, Message, Qos}, _From, State = #state{emqx_pid = ConnPid}) -> + Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}]), + {reply, Result, State}. %% @private %% @doc Handling cast messages @@ -115,7 +124,8 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> +handle_info(Info, State = #state{host = #host{host_id = HostId}}) -> + lager:debug("host_id: ~p, get info: ~p", [HostId, Info]), {noreply, State}. %% @private