From 504f82109a3d5aff3b3884c7207c671d48ffd0e3 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 12 Jan 2026 15:36:02 +0800 Subject: [PATCH] add aircon command --- apps/iot/src/http_handler/host_handler.erl | 21 +++ apps/iot/src/iot_mqtt_aircon_gateway.erl | 177 +++++++++++++++++++++ apps/iot/src/iot_mqtt_client_sup.erl | 41 ----- apps/iot/src/iot_sup.erl | 9 ++ config/sys-dev.config | 8 +- config/sys-prod.config | 10 ++ 6 files changed, 221 insertions(+), 45 deletions(-) create mode 100644 apps/iot/src/iot_mqtt_aircon_gateway.erl delete mode 100644 apps/iot/src/iot_mqtt_client_sup.erl diff --git a/apps/iot/src/http_handler/host_handler.erl b/apps/iot/src/http_handler/host_handler.erl index 39bbf4a..4fba434 100644 --- a/apps/iot/src/http_handler/host_handler.erl +++ b/apps/iot/src/http_handler/host_handler.erl @@ -137,6 +137,27 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := {ok, 200, iot_util:json_data(<<"success">>)} end; +%% 空调控制参数下发 +handle_request("POST", "/host/publish_aircon_command", _, + PostParams = #{<<"uuid">> := UUID, <<"timeout">> := Timeout0, <<"device_id">> := DeviceId, <<"command">> := Command}) + when is_binary(UUID), is_integer(Timeout0) -> + + Timeout = Timeout0 * 1000, + lager:debug("[http_host_handler] publish_aircon_command body is: ~p", [PostParams]), + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Ref = iot_mqtt_aircon_gateway:send_command(DeviceId, Command), + receive + {send_command_reply, Ref, Reply} -> + {ok, 200, iot_util:json_data(Reply)} + after + Timeout -> + {ok, 200, iot_util:json_error(401, <<"timeout">>)} + end + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. diff --git a/apps/iot/src/iot_mqtt_aircon_gateway.erl b/apps/iot/src/iot_mqtt_aircon_gateway.erl new file mode 100644 index 0000000..e4f9ebc --- /dev/null +++ b/apps/iot/src/iot_mqtt_aircon_gateway.erl @@ -0,0 +1,177 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2026, +%%% @doc +%%% +%%% @end +%%% Created : 12. 1月 2026 14:30 +%%%------------------------------------------------------------------- +-module(iot_mqtt_aircon_gateway). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([send_command/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + conn_pid :: pid(), + %% 保存请求和响应的对应关系: #{{DeviceId, sessionId} => {Ref, ReceiverPid}} + inflight = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec send_command(DeviceId :: binary(), Command :: map()) -> Ref :: reference(). +send_command(DeviceId, Command) when is_binary(DeviceId), is_map(Command) -> + Ref = make_ref(), + ReceiverPid = self(), + gen_server:cast(?SERVER, {send_command, Ref, ReceiverPid, DeviceId, Command}), + Ref. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 建立到emqx服务器的连接 + {ok, Props} = application:get_env(iot, aircon_emqx_server), + EMQXHost = proplists:get_value(host, Props), + EMQXPort = proplists:get_value(port, Props, 1883), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + RetryInterval = proplists:get_value(retry_interval, Props, 5), + Keepalive = proplists:get_value(keepalive, Props, 86400), + + ClientId = <<"mqtt-client-iot-aircon-gateway">>, + Opts = [ + {clientid, ClientId}, + {host, EMQXHost}, + {port, EMQXPort}, + {username, Username}, + {password, Password}, + {keepalive, Keepalive}, + {retry_interval, RetryInterval} + ], + + %% 需要订阅的消息 + Topics = [{<<"/aircon/+/command_reply">>, 2}], + {ok, ConnPid} = iot_mqtt_connection:start_link(self(), Opts, Topics), + + {ok, #state{conn_pid = ConnPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({send_command, Ref, ReceiverPid, DeviceId, Command}, State = #state{conn_pid = ConnPid, inflight = Inflight}) -> + Topic = <<"/aircon/", DeviceId/binary, "/command">>, + SessionId = new_session_id(), + + NCommand = jiffy:encode(Command#{<<"sessionid">> => SessionId}, [force_utf8]), + iot_mqtt_connection:publish_msg(ConnPid, Topic, NCommand, 2), + + NInflight = maps:put({DeviceId, SessionId}, {Ref, ReceiverPid}, Inflight), + {noreply, State#state{inflight = NInflight}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({mqtt_message, Topic, Payload, _Qos}, State = #state{inflight = Inflight}) -> + case binary:split(Topic, <<"/">>) of + [<<>>, <<"aircon">>, DeviceId, <<"command_reply">>] -> + Reply = catch jiffy:decode(Payload, [return_maps]), + case Reply of + #{<<"sessionid">> := SessionId} -> + case maps:take({DeviceId, SessionId}, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {send_command_reply, Ref, Reply}; + false -> + ok + end, + {noreply, State#state{inflight = NInflight}} + end; + _ -> + {noreply, State} + end; + _ -> + lager:notice("[~p] get a invalid topic: ~p, message: ~p", [?MODULE, Topic, Payload]), + {noreply, State} + end; + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 生成固定长度 ID +-spec new_session_id() -> integer(). +new_session_id() -> + %% 10位时间戳(毫秒取后10位) + {Mega, Seconds, _} = os:timestamp(), + Timestamp = Mega * 1000000 + Seconds, + %% 5位单调序号 + Counter = erlang:unique_integer([monotonic, positive]) rem 100000, + %% 拼接成整数 + Timestamp * 100000 + Counter. \ No newline at end of file diff --git a/apps/iot/src/iot_mqtt_client_sup.erl b/apps/iot/src/iot_mqtt_client_sup.erl deleted file mode 100644 index ed73b6a..0000000 --- a/apps/iot/src/iot_mqtt_client_sup.erl +++ /dev/null @@ -1,41 +0,0 @@ -%%%------------------------------------------------------------------- -%% @doc iot top level supervisor. -%% @end -%%%------------------------------------------------------------------- - --module(iot_mqtt_client_sup). - --behaviour(supervisor). - --export([start_link/0]). - --export([init/1]). - --define(SERVER, ?MODULE). - -start_link() -> - supervisor:start_link({local, ?SERVER}, ?MODULE, []). - -%% sup_flags() = #{strategy => strategy(), % optional -%% intensity => non_neg_integer(), % optional -%% period => pos_integer()} % optional -%% child_spec() = #{id => child_id(), % mandatory -%% start => mfargs(), % mandatory -%% restart => restart(), % optional -%% shutdown => shutdown(), % optional -%% type => worker(), % optional -%% modules => modules()} % optional -init([]) -> - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - Specs = [ - #{ - id => iot_watchdog, - start => {'iot_watchdog', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_watchdog'] - } - ], - - {ok, {SupFlags, Specs}}. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 03834d8..79333eb 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -143,6 +143,15 @@ init([]) -> shutdown => 2000, type => worker, modules => ['iot_jinzhi_endpoint'] + }, + + #{ + id => 'iot_mqtt_aircon_gateway', + start => {'iot_mqtt_aircon_gateway', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_mqtt_aircon_gateway'] } ], diff --git a/config/sys-dev.config b/config/sys-dev.config index ee29974..b00e5e0 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -37,12 +37,12 @@ ]}, %% 目标服务器地址 - {emqx_server, [ - {host, {39, 98, 184, 67}}, + {aircon_emqx_server, [ + {host, "118.178.229.213"}, {port, 1883}, {tcp_opts, []}, - {username, "test"}, - {password, "test1234"}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, {keepalive, 86400}, {retry_interval, 5} ]}, diff --git a/config/sys-prod.config b/config/sys-prod.config index 7410074..d0b4a6e 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -25,6 +25,16 @@ 15 => 300 }}, + {aircon_emqx_server, [ + {host, "172.30.37.212"}, + {port, 1883}, + {tcp_opts, []}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, + {keepalive, 86400}, + {retry_interval, 5} + ]}, + {watchdog, [ {pri_key, "jinzhi_watchdog_pri.key"}, {url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"},