From 359c13924f7995f8f12961d24b7cc7d884231804 Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?=E5=AE=89=E7=A4=BC=E6=88=90?= Date: Thu, 30 Mar 2023 20:21:20 +0800 Subject: [PATCH] emqtt --- apps/iot/src/iot_app.erl | 6 ++-- apps/iot/src/iot_emqtt_client.erl | 2 +- apps/iot/src/iot_host.erl | 47 ++++++++++++++++++++++++++++--- config/sys.config | 6 ++-- 4 files changed, 50 insertions(+), 11 deletions(-) diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index c6f2e4b..f3eae03 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -15,8 +15,8 @@ start(_StartType, _StartArgs) -> %% 启动数据库 mnesia:start(), Tables = mnesia:system_info(tables), - iot_util:assert_exec(lists:member(router, [Tables]), fun() -> mnesia:wait_for_tables([router], infinity) end), - iot_util:assert_exec(lists:member(host, [Tables]), fun() -> mnesia:wait_for_tables([host], infinity) end), + lists:member(router, Tables) andalso mnesia:wait_for_tables([router], infinity), + lists:member(host, Tables) andalso mnesia:wait_for_tables([host], infinity), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), @@ -43,7 +43,7 @@ start_http_server() -> {"/host/[...]", http_protocol, [http_host_handler]}, {"/api/[...]", http_protocol, [http_api_handler]}, {"/router/[...]", http_protocol, [http_router_handler]} - ]} + ]} ]), TransOpts = [ {port, Port}, diff --git a/apps/iot/src/iot_emqtt_client.erl b/apps/iot/src/iot_emqtt_client.erl index 60a53f6..744b8fa 100644 --- a/apps/iot/src/iot_emqtt_client.erl +++ b/apps/iot/src/iot_emqtt_client.erl @@ -49,7 +49,7 @@ start_link() -> {stop, Reason :: term()} | ignore). init([]) -> %{ok, ServerOpts} = application:get_env(iot, emqx_server), - %{ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), + {ok, ConnPid} = emqtt:start_link([{clientid, iot_emqtt_client}, {owner, self()}|ServerOpts]), %{ok, _Props} = emqtt:connect(ConnPid), %SubOpts = [{qos, 1}], diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index d7904ce..ee16f25 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,7 +21,9 @@ -define(SERVER, ?MODULE). -record(state, { - host :: #host{} + host :: #host{}, + emqx_pid :: pid(), + is_connected = false :: boolean() }). %%%=================================================================== @@ -48,7 +50,41 @@ start_link(Name, Host = #host{}) -> {stop, Reason :: term()} | ignore). init([Host]) -> lager:debug("[iot_host] host is: ~p", [Host]), - {ok, #state{host = Host}}. + %% 建立到emqx服务器的连接 + {ok, Props} = application:get_env(iot, emqx_server), + Host = proplists:get_value(host, Props), + Port = proplists:get_value(port, Props, 18080), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + RetryInterval = proplists:get_value(retry_interval, Props, 5), + Opts = [ + {host, Host}, + {port, Port}, + {owner, self()}, + {tcp_opts, []}, + {username, Username}, + {password, Password}, + {keepalive, 'Keepalive'}, + {retry_interval, RetryInterval} + ], + + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + lager:debug("[iot_host] connect success"), + %% 监听和host相关的全部事件 + {ok, _Props} = emqtt:connect(ConnPid), + {ok, _Props, _ReasonCodes} = emqtt:subscribe(ConnPid, {<<"/host/123">>, qos1}), + %% ok = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 0}]). + %% {ok, _PktId} = emqtt:publish(ConnPid, <<"hello">>, #{}, <<"Hello World!">>, [{qos, 1}]). + + {ok, #state{host = Host, is_connected = true, emqx_pid = ConnPid}}; + ignore -> + lager:debug("[iot_host] connect emqx get ignore"), + {stop, ignore}; + {error, Reason} -> + lager:debug("[iot_host] connect emqx get error: ~p", [Reason]), + {stop, Reason} + end. %% @private %% @doc Handling call messages @@ -88,7 +124,10 @@ handle_info(_Info, State = #state{}) -> %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> +terminate(_Reason, _State = #state{emqx_pid = ConnPid}) -> + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, <<"hello">>), + ok = emqtt:disconnect(ConnPid), + ok = emqtt:stop(ConnPid), ok. %% @private @@ -101,4 +140,4 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions -%%%=================================================================== +%%%=================================================================== \ No newline at end of file diff --git a/config/sys.config b/config/sys.config index 53513b0..8d48f49 100644 --- a/config/sys.config +++ b/config/sys.config @@ -9,11 +9,11 @@ %% 目标服务器地址 {emqx_server, [ - {host, ""}, + {host, "103.113.157.7"}, {port, 18080}, {tcp_opts, []}, - {username, ""}, - {password, ""}, + {username, "test"}, + {password, "test1234"}, {keepalive, 'Keepalive'}, {retry_interval, 5} ]}