diff --git a/apps/iot/src/http_handler/host_handler.erl b/apps/iot/src/http_handler/host_handler.erl index 39bbf4a..4fba434 100644 --- a/apps/iot/src/http_handler/host_handler.erl +++ b/apps/iot/src/http_handler/host_handler.erl @@ -137,6 +137,27 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := {ok, 200, iot_util:json_data(<<"success">>)} end; +%% 空调控制参数下发 +handle_request("POST", "/host/publish_aircon_command", _, + PostParams = #{<<"uuid">> := UUID, <<"timeout">> := Timeout0, <<"device_id">> := DeviceId, <<"command">> := Command}) + when is_binary(UUID), is_integer(Timeout0) -> + + Timeout = Timeout0 * 1000, + lager:debug("[http_host_handler] publish_aircon_command body is: ~p", [PostParams]), + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Ref = iot_mqtt_aircon_gateway:send_command(DeviceId, Command), + receive + {send_command_reply, Ref, Reply} -> + {ok, 200, iot_util:json_data(Reply)} + after + Timeout -> + {ok, 200, iot_util:json_error(401, <<"timeout">>)} + end + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. diff --git a/apps/iot/src/iot_mqtt_aircon_gateway.erl b/apps/iot/src/iot_mqtt_aircon_gateway.erl new file mode 100644 index 0000000..e4f9ebc --- /dev/null +++ b/apps/iot/src/iot_mqtt_aircon_gateway.erl @@ -0,0 +1,177 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2026, +%%% @doc +%%% +%%% @end +%%% Created : 12. 1月 2026 14:30 +%%%------------------------------------------------------------------- +-module(iot_mqtt_aircon_gateway). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([send_command/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + conn_pid :: pid(), + %% 保存请求和响应的对应关系: #{{DeviceId, sessionId} => {Ref, ReceiverPid}} + inflight = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec send_command(DeviceId :: binary(), Command :: map()) -> Ref :: reference(). +send_command(DeviceId, Command) when is_binary(DeviceId), is_map(Command) -> + Ref = make_ref(), + ReceiverPid = self(), + gen_server:cast(?SERVER, {send_command, Ref, ReceiverPid, DeviceId, Command}), + Ref. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 建立到emqx服务器的连接 + {ok, Props} = application:get_env(iot, aircon_emqx_server), + EMQXHost = proplists:get_value(host, Props), + EMQXPort = proplists:get_value(port, Props, 1883), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + RetryInterval = proplists:get_value(retry_interval, Props, 5), + Keepalive = proplists:get_value(keepalive, Props, 86400), + + ClientId = <<"mqtt-client-iot-aircon-gateway">>, + Opts = [ + {clientid, ClientId}, + {host, EMQXHost}, + {port, EMQXPort}, + {username, Username}, + {password, Password}, + {keepalive, Keepalive}, + {retry_interval, RetryInterval} + ], + + %% 需要订阅的消息 + Topics = [{<<"/aircon/+/command_reply">>, 2}], + {ok, ConnPid} = iot_mqtt_connection:start_link(self(), Opts, Topics), + + {ok, #state{conn_pid = ConnPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({send_command, Ref, ReceiverPid, DeviceId, Command}, State = #state{conn_pid = ConnPid, inflight = Inflight}) -> + Topic = <<"/aircon/", DeviceId/binary, "/command">>, + SessionId = new_session_id(), + + NCommand = jiffy:encode(Command#{<<"sessionid">> => SessionId}, [force_utf8]), + iot_mqtt_connection:publish_msg(ConnPid, Topic, NCommand, 2), + + NInflight = maps:put({DeviceId, SessionId}, {Ref, ReceiverPid}, Inflight), + {noreply, State#state{inflight = NInflight}}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({mqtt_message, Topic, Payload, _Qos}, State = #state{inflight = Inflight}) -> + case binary:split(Topic, <<"/">>) of + [<<>>, <<"aircon">>, DeviceId, <<"command_reply">>] -> + Reply = catch jiffy:decode(Payload, [return_maps]), + case Reply of + #{<<"sessionid">> := SessionId} -> + case maps:take({DeviceId, SessionId}, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {send_command_reply, Ref, Reply}; + false -> + ok + end, + {noreply, State#state{inflight = NInflight}} + end; + _ -> + {noreply, State} + end; + _ -> + lager:notice("[~p] get a invalid topic: ~p, message: ~p", [?MODULE, Topic, Payload]), + {noreply, State} + end; + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 生成固定长度 ID +-spec new_session_id() -> integer(). +new_session_id() -> + %% 10位时间戳(毫秒取后10位) + {Mega, Seconds, _} = os:timestamp(), + Timestamp = Mega * 1000000 + Seconds, + %% 5位单调序号 + Counter = erlang:unique_integer([monotonic, positive]) rem 100000, + %% 拼接成整数 + Timestamp * 100000 + Counter. \ No newline at end of file diff --git a/apps/iot/src/iot_mqtt_connection.erl b/apps/iot/src/iot_mqtt_connection.erl new file mode 100644 index 0000000..267fb88 --- /dev/null +++ b/apps/iot/src/iot_mqtt_connection.erl @@ -0,0 +1,202 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @doc +%%% MQTT Client based on gen_statem (state_functions) +%%% 自动重连 + 掉线期间 publish 入队 + flush 队列 +%%%------------------------------------------------------------------- +-module(iot_mqtt_connection). +-author("aresei"). +-behaviour(gen_statem). + +%% API +-export([start_link/3, publish_status/1, publish_msg/4]). +%% gen_statem callbacks +-export([init/1, callback_mode/0, disconnected/3, connected/3, terminate/3, code_change/4]). + +-define(CONNECT_ACTION(T), {state_timeout, T, start_connect}). + +-record(state, { + parent_pid :: pid(), + opts = [], + conn_pid :: pid() | undefined, + retry :: non_neg_integer(), + topics :: list(), + queue :: queue:queue() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== +-spec start_link(pid(), list(), list()) -> + {ok, pid()} | ignore | {error, term()}. +start_link(ParentPid, Opts, Topics) when is_pid(ParentPid), is_list(Topics) -> + gen_statem:start_link(?MODULE, [ParentPid, Opts, Topics], []). + +-spec publish_status(pid()) -> map(). +publish_status(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, status). + +-spec publish_msg(Pid :: pid(), Topic :: binary(), Payload :: binary(), Qos :: integer()) -> ok. +publish_msg(Pid, Topic, Payload, Qos) when is_pid(Pid), is_binary(Topic), is_binary(Payload), is_integer(Qos) -> + gen_statem:cast(Pid, {publish, Topic, Payload, Qos}). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +callback_mode() -> + state_functions. + +init([ParentPid, Opts0, Topics]) -> + erlang:process_flag(trap_exit, true), + + DefaultOpts = [ + {owner, self()}, + {tcp_opts, []}, + {auto_ack, true}, + {proto_ver, v3} + ], + Opts = Opts0 ++ DefaultOpts, + + {ok, disconnected, #state{parent_pid = ParentPid, opts = Opts, topics = Topics, retry = 0, conn_pid = undefined, queue = queue:new()}, ?CONNECT_ACTION(0)}. + +%%%=================================================================== +%%% disconnected state +%%%=================================================================== + +disconnected(state_timeout, start_connect, State = #state{retry = Retry, opts = Opts, topics = Topics}) -> + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + case emqtt:connect(ConnPid) of + {ok, _} -> + maybe_subscribe(ConnPid, Topics), + NState = flush_queue(ConnPid, State#state{conn_pid = ConnPid, retry = 0}), + {next_state, connected, NState}; + {error, _Reason} -> + %% 连接错误发生的时候,emqtt进程一定会挂掉,在进程错误的时候处理; 否则连接错误和服务端关闭需要2个分支来处理 + %% 这里保存进程id是为了,错误时候的模式匹配 + {keep_state, State#state{conn_pid = ConnPid}} + end; + _ -> + {keep_state, State#state{retry = Retry + 1}, ?CONNECT_ACTION(5000)} + end; + +%% 掉线期间 publish → 入队 +disconnected(cast, {publish, Topic, Payload, Qos}, State = #state{queue = Q}) -> + {keep_state, State#state{queue = enqueue(Q, {Topic, Payload, Qos})}}; + +%% status +disconnected({call, From}, status, State) -> + reply_status(disconnected, From, State); + +%% 处理mqtt socket断开的问题 +disconnected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {keep_state, State, ?CONNECT_ACTION(5000)}; + +%% 兜底 +disconnected(_Type, _Event, State) -> + {keep_state, State}. + +%%%=================================================================== +%%% connected state +%%%=================================================================== + +%% 正常 publish +connected(cast, {publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) -> + do_publish(ConnPid, Topic, Payload, Qos), + {keep_state, State}; + +%% MQTT 断线 +connected(info, {disconnect, _ReasonCode, _Props}, State) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; + +%% 处理mqtt socket断开的问题 +connected(info, {'EXIT', ConnPid, _Reason}, State = #state{conn_pid = ConnPid}) -> + {next_state, disconnected, State#state{conn_pid = undefined}, ?CONNECT_ACTION(5000)}; + +%% 收到订阅消息 +connected(info, {publish, #{topic := Topic, payload := Payload, qos := Qos}}, State = #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_message, Topic, Payload, Qos}, + {keep_state, State}; + +%% puback +connected(info, {puback, Ack}, #state{parent_pid = ParentPid}) -> + ParentPid ! {mqtt_puback, Ack}, + keep_state_and_data; + +%% status +connected({call, From}, status, State) -> + reply_status(connected, From, State); + +%% 兜底 +connected(_Type, _Event, State) -> + {keep_state, State}. + +%%%=================================================================== +%%% Internal helpers +%%%=================================================================== + +-spec enqueue(queue:queue(), any()) -> queue:queue(). +enqueue(Q, Msg) -> + case queue:len(Q) < 1000 of + true -> + queue:in(Msg, Q); + false -> + Q + end. + +-spec flush_queue(pid(), #state{}) -> #state{}. +flush_queue(ConnPid, State = #state{queue = Q}) -> + lists:foreach( + fun({Topic, Payload, Qos}) -> do_publish(ConnPid, Topic, Payload, Qos) end, + queue:to_list(Q) + ), + State#state{queue = queue:new()}. + +do_publish(ConnPid, Topic, Payload, Qos) -> + try + emqtt:publish(ConnPid, Topic, Payload, Qos), + ok + catch + _ -> + gen_statem:cast(self(), {publish, Topic, Payload, Qos}), + ok + end. + +maybe_subscribe(_ConnPid, []) -> + ok; +maybe_subscribe(ConnPid, Topics) -> + SubResult = emqtt:subscribe(ConnPid, Topics), + lager:debug("[iot_mqtt_connection] subscribe topics: ~p, result: ~p", [Topics, SubResult]). + +reply_status(StateName, From, + #state{retry = Retry, queue = Q} = State) -> + Reply = #{ + state => StateName, + retry => Retry, + queue_len => queue:len(Q) + }, + {keep_state, State, [{reply, From, Reply}]}. + +%%%=================================================================== +%%% terminate / code_change +%%%=================================================================== + +terminate(_Reason, _StateName, + #state{conn_pid = undefined}) -> + ok; +terminate(_Reason, _StateName, + #state{conn_pid = ConnPid, topics = Topics}) -> + maybe_unsubscribe(ConnPid, Topics), + emqtt:disconnect(ConnPid), + ok. + +maybe_unsubscribe(_ConnPid, []) -> + ok; +maybe_unsubscribe(ConnPid, Topics) -> + TopicNames = [T || {T, _} <- Topics], + emqtt:unsubscribe(ConnPid, #{}, TopicNames), + ok. + +code_change(_OldVsn, _StateName, State, _Extra) -> + {ok, State}. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 03834d8..79333eb 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -143,6 +143,15 @@ init([]) -> shutdown => 2000, type => worker, modules => ['iot_jinzhi_endpoint'] + }, + + #{ + id => 'iot_mqtt_aircon_gateway', + start => {'iot_mqtt_aircon_gateway', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_mqtt_aircon_gateway'] } ], diff --git a/config/sys-dev.config b/config/sys-dev.config index ee29974..b00e5e0 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -37,12 +37,12 @@ ]}, %% 目标服务器地址 - {emqx_server, [ - {host, {39, 98, 184, 67}}, + {aircon_emqx_server, [ + {host, "118.178.229.213"}, {port, 1883}, {tcp_opts, []}, - {username, "test"}, - {password, "test1234"}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, {keepalive, 86400}, {retry_interval, 5} ]}, diff --git a/config/sys-prod.config b/config/sys-prod.config index 7410074..d0b4a6e 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -25,6 +25,16 @@ 15 => 300 }}, + {aircon_emqx_server, [ + {host, "172.30.37.212"}, + {port, 1883}, + {tcp_opts, []}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, + {keepalive, 86400}, + {retry_interval, 5} + ]}, + {watchdog, [ {pri_key, "jinzhi_watchdog_pri.key"}, {url, "http://172.30.37.242:8080/hqtaskcenterapp/sys/taskCenter/taskReceive/sendNotice.do"},