diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index b709e37..2ac21bb 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -14,22 +14,24 @@ %% API -export([test/1]). --export([start_link/2, get_name/1, get_pid/1, publish/4]). +-export([start_link/2, get_name/1, get_pid/1, publish/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -define(SERVER, ?MODULE). +%% 消息的qos等级 +-define(QOS, 1). + -record(state, { host :: #host{}, - emqx_pid :: pid(), - is_connected = false :: boolean() + emqx_pid :: pid() }). test(Id) when is_binary(Id) -> Pid = get_pid(<<"1">>), - publish(Pid, <<"/host/", Id/binary>>, <<"hello world">>, 1). + publish(Pid, <<"hello world">>). %%%=================================================================== %%% API @@ -41,8 +43,9 @@ get_pid(HostId) when is_binary(HostId) -> get_name(HostId) when is_binary(HostId) -> binary_to_atom(<<"iot_host:", HostId/binary>>). -publish(Pid, Topic, Message, Qos) when is_pid(Pid), is_binary(Topic) -> - gen_server:call(Pid, {publish, Topic, Message, Qos}). +-spec publish(pid(), binary()) -> {ok, PacketId :: integer()} | {error, term()}. +publish(Pid, Message) when is_pid(Pid), is_binary(Message) -> + gen_server:call(Pid, {publish, Message}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link(Name :: atom(), Host :: #host{}) -> @@ -60,7 +63,7 @@ start_link(Name, Host = #host{}) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([Host = #host{host_id = HostId}]) -> - lager:debug("[iot_host] host is: ~p", [Host]), + lager:debug("[iot_host] host_id: ~p, host is: ~p", [HostId, Host]), %% 建立到emqx服务器的连接 {ok, Props} = application:get_env(iot, emqx_server), EMQXHost = proplists:get_value(host, Props), @@ -83,15 +86,10 @@ init([Host = #host{host_id = HostId}]) -> case emqtt:start_link(Opts) of {ok, ConnPid} -> lager:debug("[iot_host] connect success, pid: ~p", [ConnPid]), - %% 监听和host相关的全部事件 - {ok, _} = emqtt:connect(ConnPid), - Topics = [ - {<<"/host/", HostId/binary>>, qos1} - ], - SubscribeResult = emqtt:subscribe(ConnPid, Topics), - lager:debug("subscribe result is: ~p", [SubscribeResult]), + %% 快速启动避免阻塞iot_host_sup的启动 + erlang:start_timer(0, self(), subscribe_ticker), - {ok, #state{host = Host, is_connected = true, emqx_pid = ConnPid}}; + {ok, #state{host = Host, emqx_pid = ConnPid}}; ignore -> lager:debug("[iot_host] connect emqx get ignore"), {stop, ignore}; @@ -110,8 +108,9 @@ init([Host = #host{host_id = HostId}]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({publish, Topic, Message, Qos}, _From, State = #state{emqx_pid = ConnPid}) -> - Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}]), +handle_call({publish, Message}, _From, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) -> + Topic = <<"/host/", HostId/binary, "/upstream">>, + Result = emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, ?QOS}]), {reply, Result, State}. %% @private @@ -129,7 +128,20 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). +%% 处理topic的订阅事件 +handle_info({timeout, _, subscribe_ticker}, State = #state{host = #host{host_id = HostId}, emqx_pid = ConnPid}) -> + %% 监听和host相关的全部事件 + {ok, _} = emqtt:connect(ConnPid), + Topics = [ + {<<"/host/", HostId/binary, "/upstream">>, ?QOS} + ], + SubscribeResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[iot_host] host_id: ~p, subscribe result is: ~p", [HostId, SubscribeResult]), + + {noreply, State#state{emqx_pid = ConnPid}}; + handle_info({disconnect, ReasonCode, Properties}, State = #state{host = #host{host_id = HostId}}) -> + lager:debug("[iot_host] host: ~p, Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [HostId, ReasonCode, Properties]), {stop, disconnected, State}; handle_info({publish, Message = #{packet_id := PacketId, payload := Payload}}, State = #state{emqx_pid = ConnPid, host = #host{host_id = HostId}}) -> @@ -151,10 +163,13 @@ handle_info(Info, State = #state{host = #host{host_id = HostId}}) -> %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(_Reason, _State = #state{emqx_pid = ConnPid}) -> +terminate(_Reason, _State = #state{emqx_pid = ConnPid}) when is_pid(ConnPid) -> {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>), ok = emqtt:disconnect(ConnPid), ok = emqtt:stop(ConnPid), + ok; +terminate(Reason, _State) -> + lager:debug("[iot_host] terminate with reason: ~p", [Reason]), ok. %% @private