emqtt
This commit is contained in:
parent
dfc3fa5132
commit
359c13924f
@ -15,8 +15,8 @@ start(_StartType, _StartArgs) ->
|
|||||||
%% 启动数据库
|
%% 启动数据库
|
||||||
mnesia:start(),
|
mnesia:start(),
|
||||||
Tables = mnesia:system_info(tables),
|
Tables = mnesia:system_info(tables),
|
||||||
iot_util:assert_exec(lists:member(router, [Tables]), fun() -> mnesia:wait_for_tables([router], infinity) end),
|
lists:member(router, Tables) andalso mnesia:wait_for_tables([router], infinity),
|
||||||
iot_util:assert_exec(lists:member(host, [Tables]), fun() -> mnesia:wait_for_tables([host], infinity) end),
|
lists:member(host, Tables) andalso mnesia:wait_for_tables([host], infinity),
|
||||||
|
|
||||||
%% 加速内存的回收
|
%% 加速内存的回收
|
||||||
erlang:system_flag(fullsweep_after, 16),
|
erlang:system_flag(fullsweep_after, 16),
|
||||||
|
|||||||
@ -49,7 +49,7 @@ start_link() ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([]) ->
|
init([]) ->
|
||||||
%{ok, ServerOpts} = application:get_env(iot, emqx_server),
|
%{ok, ServerOpts} = application:get_env(iot, emqx_server),
|
||||||
%{ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]),
|
{ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]),
|
||||||
%{ok, _Props} = emqtt:connect(ConnPid),
|
%{ok, _Props} = emqtt:connect(ConnPid),
|
||||||
|
|
||||||
%SubOpts = [{qos, 1}],
|
%SubOpts = [{qos, 1}],
|
||||||
|
|||||||
@ -21,7 +21,9 @@
|
|||||||
-define(SERVER, ?MODULE).
|
-define(SERVER, ?MODULE).
|
||||||
|
|
||||||
-record(state, {
|
-record(state, {
|
||||||
host :: #host{}
|
host :: #host{},
|
||||||
|
emqx_pid :: pid(),
|
||||||
|
is_connected = false :: boolean()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
@ -48,7 +50,41 @@ start_link(Name, Host = #host{}) ->
|
|||||||
{stop, Reason :: term()} | ignore).
|
{stop, Reason :: term()} | ignore).
|
||||||
init([Host]) ->
|
init([Host]) ->
|
||||||
lager:debug("[iot_host] host is: ~p", [Host]),
|
lager:debug("[iot_host] host is: ~p", [Host]),
|
||||||
{ok, #state{host = Host}}.
|
%% 建立到emqx服务器的连接
|
||||||
|
{ok, Props} = application:get_env(iot, emqx_server),
|
||||||
|
Host = proplists:get_value(host, Props),
|
||||||
|
Port = proplists:get_value(port, Props, 18080),
|
||||||
|
Username = proplists:get_value(username, Props),
|
||||||
|
Password = proplists:get_value(password, Props),
|
||||||
|
RetryInterval = proplists:get_value(retry_interval, Props, 5),
|
||||||
|
Opts = [
|
||||||
|
{host, Host},
|
||||||
|
{port, Port},
|
||||||
|
{owner, self()},
|
||||||
|
{tcp_opts, []},
|
||||||
|
{username, Username},
|
||||||
|
{password, Password},
|
||||||
|
{keepalive, 'Keepalive'},
|
||||||
|
{retry_interval, RetryInterval}
|
||||||
|
],
|
||||||
|
|
||||||
|
case emqtt:start_link(Opts) of
|
||||||
|
{ok, ConnPid} ->
|
||||||
|
lager:debug("[iot_host] connect success"),
|
||||||
|
%% 监听和host相关的全部事件
|
||||||
|
{ok, _Props} = emqtt:connect(ConnPid),
|
||||||
|
{ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {<<"/host/123">>, qos1}),
|
||||||
|
%% ok = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 0}]).
|
||||||
|
%% {ok, _PktId} = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 1}]).
|
||||||
|
|
||||||
|
{ok, #state{host = Host, is_connected = true, emqx_pid = ConnPid}};
|
||||||
|
ignore ->
|
||||||
|
lager:debug("[iot_host] connect emqx get ignore"),
|
||||||
|
{stop, ignore};
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:debug("[iot_host] connect emqx get error: ~p", [Reason]),
|
||||||
|
{stop, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
%% @doc Handling call messages
|
%% @doc Handling call messages
|
||||||
@ -88,7 +124,10 @@ handle_info(_Info, State = #state{}) ->
|
|||||||
%% with Reason. The return value is ignored.
|
%% with Reason. The return value is ignored.
|
||||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
State :: #state{}) -> term()).
|
State :: #state{}) -> term()).
|
||||||
terminate(_Reason, _State = #state{}) ->
|
terminate(_Reason, _State = #state{emqx_pid = ConnPid}) ->
|
||||||
|
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>),
|
||||||
|
ok = emqtt:disconnect(ConnPid),
|
||||||
|
ok = emqtt:stop(ConnPid),
|
||||||
ok.
|
ok.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
|
|||||||
@ -9,11 +9,11 @@
|
|||||||
|
|
||||||
%% 目标服务器地址
|
%% 目标服务器地址
|
||||||
{emqx_server, [
|
{emqx_server, [
|
||||||
{host, ""},
|
{host, "103.113.157.7"},
|
||||||
{port, 18080},
|
{port, 18080},
|
||||||
{tcp_opts, []},
|
{tcp_opts, []},
|
||||||
{username, ""},
|
{username, "test"},
|
||||||
{password, ""},
|
{password, "test1234"},
|
||||||
{keepalive, 'Keepalive'},
|
{keepalive, 'Keepalive'},
|
||||||
{retry_interval, 5}
|
{retry_interval, 5}
|
||||||
]}
|
]}
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user