From 6c80d8d16c7c8e14718c6fb685aa1c62bdbeca43 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 8 Jan 2026 14:51:09 +0800 Subject: [PATCH] fix --- apps/aircon/src/aircon_args.erl | 159 ++++++++ apps/aircon/src/aircon_logger.erl | 169 ++++++++ apps/aircon/src/aircon_mqtt_subscriber.erl | 199 ++++++++++ apps/aircon/src/aircon_sup.erl | 73 +++- apps/aircon/src/efka_client.erl | 431 +++++++++++++++++++++ config/sys-dev.config | 21 + config/sys-prod.config | 22 ++ config/sys.config | 3 - mqtt数据示例最新.txt | 30 ++ 9 files changed, 1100 insertions(+), 7 deletions(-) create mode 100644 apps/aircon/src/aircon_args.erl create mode 100644 apps/aircon/src/aircon_logger.erl create mode 100644 apps/aircon/src/aircon_mqtt_subscriber.erl create mode 100644 apps/aircon/src/efka_client.erl create mode 100644 config/sys-dev.config create mode 100644 config/sys-prod.config delete mode 100644 config/sys.config create mode 100644 mqtt数据示例最新.txt diff --git a/apps/aircon/src/aircon_args.erl b/apps/aircon/src/aircon_args.erl new file mode 100644 index 0000000..8dc4c8b --- /dev/null +++ b/apps/aircon/src/aircon_args.erl @@ -0,0 +1,159 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 9月 2023 16:37 +%%%------------------------------------------------------------------- +-module(aircon_args). +-author("aresei"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([get_metric/0, get_param/0, push_metric/1, push_param/1, get_device_uuid/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + %% #{device_id => device_uuid}, 下发的时候是:serial + metrics = #{}, + param = #{} +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_param() -> {ok, Param :: #{}}. +get_param() -> + gen_server:call(?MODULE, get_param). + +-spec get_metric() -> {ok, Metric :: map()}. +get_metric() -> + gen_server:call(?MODULE, get_metric). + +-spec get_device_uuid(DeviceId :: binary()) -> error | {ok, DeviceUUID :: binary()}. +get_device_uuid(DeviceId) when is_binary(DeviceId) -> + gen_server:call(?MODULE, {get_device_uuid, DeviceId}). + +-spec push_param(Param :: map()) -> no_return(). +push_param(Param) when is_map(Param) -> + gen_server:cast(?MODULE, {push_param, Param}). + +-spec push_metric(Metrics :: list()) -> no_return(). +push_metric(Metrics) when is_list(Metrics) -> + gen_server:cast(?MODULE, {push_metric, Metrics}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, Metrics} = efka_client:request_metric(), + try convert_metric(Metrics) of + {ok, MetricMap} -> + lager:debug("[aircon_args] init load metric_map: ~p", [MetricMap]), + {ok, Param} = efka_client:request_param(), + + {ok, #state{metrics = MetricMap, param = Param}} + catch + _:Error:Stack-> + lager:warning("[aircon_args] request_metric get error: ~p, stack: ~p", [Error, Stack]), + {ok, #state{metrics = #{}, param = #{}}} + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(get_param, _From, State = #state{param = Param}) -> + {reply, {ok, Param}, State}; +handle_call({get_device_uuid, DeviceId}, _From, State = #state{metrics = Metrics}) when is_map(Metrics) -> + Result = maps:find(DeviceId, Metrics), + {reply, Result, State}; +handle_call({get_device_uuid, _DeviceId}, _From, State) -> + {reply, error, State}; + +handle_call(get_metric, _From, State = #state{metrics = Metrics}) -> + {reply, {ok, Metrics}, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({push_param, Param}, State = #state{}) -> + {noreply, State#state{param = Param}}; +handle_cast({push_metric, Metrics}, State = #state{}) -> + try convert_metric(Metrics) of + {ok, MetricMap} -> + lager:debug("[aircon_args] push metric_map: ~p", [MetricMap]), + {noreply, State#state{metrics = MetricMap}} + catch _:_ -> + {noreply, State} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec convert_metric(Metrics :: list()) -> {ok, map()}. +convert_metric(Metrics) when is_list(Metrics) -> + MetricTuples = lists:flatmap(fun + (#{<<"serial">> := DeviceId, <<"device_uuid">> := DeviceUUID}) -> + [{DeviceId, DeviceUUID}]; + (_) -> + [] + end, Metrics), + {ok, maps:from_list(MetricTuples)}. \ No newline at end of file diff --git a/apps/aircon/src/aircon_logger.erl b/apps/aircon/src/aircon_logger.erl new file mode 100644 index 0000000..ec0c799 --- /dev/null +++ b/apps/aircon/src/aircon_logger.erl @@ -0,0 +1,169 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 07. 9月 2023 17:07 +%%%------------------------------------------------------------------- +-module(aircon_logger). +-author("aresei"). + +-behaviour(gen_server). + +%% API +-export([start_link/1, write/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + file_name :: string(), + date :: calendar:date(), + file +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +write(Data) -> + gen_server:cast(?SERVER, {write, Data}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(FileName :: string()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(FileName) when is_list(FileName) -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [FileName], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([FileName]) -> + ensure_dir(), + FilePath = make_file(FileName, erlang:date()), + {ok, File} = file:open(FilePath, [append, binary]), + + {ok, #state{file = File, file_name = FileName, date = get_date()}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({write, Data}, State = #state{file = OldFile, file_name = FileName, date = Date}) -> + Line = iolist_to_binary([time_prefix(), <<" ">>, format(Data), <<$\n>>]), + NDate = erlang:date(), + case maybe_new_file(Date, NDate) of + true -> + file:close(OldFile), + + FilePath = make_file(FileName, NDate), + {ok, File} = file:open(FilePath, [append, binary]), + ok = file:write(File, Line), + %% 清理历史的文件, 单位: 天 + delete_old_files(FileName, 10), + + {noreply, State#state{file = File, date = get_date()}}; + false -> + ok = file:write(OldFile, Line), + {noreply, State} + end. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +format(Data) when is_binary(Data) -> + iolist_to_binary(Data); +format(Items) when is_list(Items) -> + iolist_to_binary(lists:join(<<"\t">>, Items)). + +time_prefix() -> + {{Y, M, D}, {H, I, S}} = calendar:local_time(), + iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])). + +-spec make_file(LogFile :: string(), {Year :: integer(), Month :: integer(), Day :: integer()}) -> string(). +make_file(LogFile, {Year, Month, Day}) when is_list(LogFile) -> + Suffix = io_lib:format("~b~2..0b~2..0b", [Year, Month, Day]), + RootDir = code:root_dir() ++ "/log/", + lists:flatten(RootDir ++ LogFile ++ "." ++ Suffix). + +ensure_dir() -> + RootDir = code:root_dir() ++ "/log/", + case filelib:is_dir(RootDir) of + true -> + ok; + false -> + ok = file:make_dir(RootDir) + end. + +%% 获取日期信息 +-spec get_date() -> Date :: calendar:date(). +get_date() -> + {Date, _} = calendar:local_time(), + Date. + +%% 通过日志判断是否需要生成新的日志文件 +-spec maybe_new_file(Date :: calendar:date(), Date0 :: calendar:date()) -> boolean(). +maybe_new_file({Y, M, D}, {Y0, M0, D0}) -> + not (Y =:= Y0 andalso M =:= M0 andalso D =:= D0). + +-spec delete_old_files(FileName :: string(), Days :: integer()) -> no_return(). +delete_old_files(FileName, Days) when is_list(FileName), is_integer(Days) -> + Seconds0 = calendar:datetime_to_gregorian_seconds(calendar:local_time()), + Seconds = Seconds0 - Days * 86400, + lists:foreach(fun(Day) -> + {Date, _} = calendar:gregorian_seconds_to_datetime(Seconds - Day * 86400), + FilePath = make_file(FileName, Date), + filelib:is_file(FilePath) andalso file:delete(FilePath) + end, lists:seq(1, 10)). \ No newline at end of file diff --git a/apps/aircon/src/aircon_mqtt_subscriber.erl b/apps/aircon/src/aircon_mqtt_subscriber.erl new file mode 100644 index 0000000..77c2ca2 --- /dev/null +++ b/apps/aircon/src/aircon_mqtt_subscriber.erl @@ -0,0 +1,199 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享 +%%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题 +%%% @end +%%% Created : 12. 3月 2023 21:27 +%%%------------------------------------------------------------------- +-module(aircon_mqtt_subscriber). +-author("aresei"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%% 需要订阅的主题信息 +-define(Topics,[ + {<<"/aircon/+/data">>, 2} +]). + +-record(state, { + conn_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 建立到emqx服务器的连接 + Opts = emqx_opts(<<"aircon-data-subscriber">>), + lager:debug("[opts] is: ~p", [Opts]), + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + lager:debug("[mqtt_subscriber] start conntecting, pid: ~p", [ConnPid]), + {ok, _} = emqtt:connect(ConnPid), + lager:debug("[mqtt_subscriber] connect success"), + + %% 监听和设备的全部事件 + SubscribeResult = emqtt:subscribe(ConnPid, ?Topics), + lager:debug("[mqtt_subscriber] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]), + + {ok, #state{conn_pid = ConnPid}}; + ignore -> + lager:debug("[mqtt_subscriber] connect emqx get ignore"), + {stop, ignore}; + {error, Reason} -> + lager:debug("[mqtt_subscriber] connect emqx get error: ~p", [Reason]), + {stop, Reason} + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> + lager:debug("[mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + {stop, disconnected, State}; +%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 +handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) -> + lager:debug("[mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + %% 将消息分发到对应的publisher进程去处理, 提高消息的处理速度; topic格式: /aircon/20525456021829/data + case Topic of + <<"/aircon/", DeviceInfo/binary>> -> + case binary:split(DeviceInfo, <<"/">>) of + [DeviceMac, <<"data">>] -> + dispatch(DeviceMac, Payload); + _ -> + ok + end; + _ -> + ok + end, + {noreply, State}; +handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> + lager:debug("[mqtt_subscriber] receive puback packet: ~p", [Packet]), + {noreply, State}; + +handle_info(Info, State = #state{}) -> + lager:debug("[mqtt_subscriber] get info: ~p", [Info]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> + %% 取消topic的订阅 + TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), + + ok = emqtt:disconnect(ConnPid), + lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), + ok; +terminate(Reason, _State) -> + lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec emqx_opts(ClientSuffix :: binary()) -> list(). +emqx_opts(ClientSuffix) when is_binary(ClientSuffix) -> + %% 建立到emqx服务器的连接 + {ok, Props} = application:get_env(aircon, emqx_server), + EMQXHost = proplists:get_value(host, Props), + EMQXPort = proplists:get_value(port, Props, 1883), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + RetryInterval = proplists:get_value(retry_interval, Props, 5), + Keepalive = proplists:get_value(keepalive, Props, 86400), + + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>, + [ + {clientid, ClientId}, + {host, EMQXHost}, + {port, EMQXPort}, + {owner, self()}, + {tcp_opts, []}, + {username, Username}, + {password, Password}, + {keepalive, Keepalive}, + {auto_ack, true}, + {proto_ver, v3}, + {retry_interval, RetryInterval} + ]. + +-spec dispatch(DeviceMac :: binary(), Message :: binary()) -> no_return(). +dispatch(DeviceMac, Message) when is_binary(DeviceMac), is_binary(Message) -> + case power_gateway_args:get_device_uuid(DeviceMac) of + error -> + lager:notice("[mqtt_subscriber] device_mac: ~p, device_uuid not found", [DeviceMac]); + {ok, DeviceUUID} -> + case power_device_sup:ensure_device_started(DeviceUUID) of + {ok, DevicePid} -> + power_device:metric_data(DevicePid, Message); + {error, Reason} -> + lager:notice("[mqtt_subscriber] start device get error: ~p", [Reason]) + end + end. \ No newline at end of file diff --git a/apps/aircon/src/aircon_sup.erl b/apps/aircon/src/aircon_sup.erl index 91ad135..b75a62e 100644 --- a/apps/aircon/src/aircon_sup.erl +++ b/apps/aircon/src/aircon_sup.erl @@ -26,10 +26,75 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - SupFlags = #{strategy => one_for_all, - intensity => 0, - period => 1}, - ChildSpecs = [], + SupFlags = #{strategy => one_for_all, intensity => 0, period => 1}, + + {ok, EfkaServer} = application:get_env(aircon, efka_server), + Host = proplists:get_value(host, EfkaServer), + Port = proplists:get_value(port, EfkaServer), + + %% 获取配置的名称 + RegisterName = read_service_name(), + + ChildSpecs = [ + #{ + id => 'efka_client', + start => {'efka_client', start_link, [RegisterName, Host, Port]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['efka_client'] + }, + + #{ + id => aircon_logger, + start => {'aircon_logger', start_link, ["aircon_data"]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['aircon_logger'] + }, + + #{ + id => 'aircon_args', + start => {'aircon_args', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['aircon_args'] + }, + + %#{ + % id => 'power_device_sup', + % start => {'power_device_sup', start_link, []}, + % restart => permanent, + % shutdown => 2000, + % type => worker, + % modules => ['power_device_sup'] + %}, + + #{ + id => 'aircon_mqtt_subscriber', + start => {'aircon_mqtt_subscriber', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['aircon_mqtt_subscriber'] + } + ], + {ok, {SupFlags, ChildSpecs}}. %% internal functions +%% 获取根目录下的配置文件 +read_service_name() -> + %% 获取配置的名称 + RootDir = code:root_dir(), + VersionFile = RootDir ++ "/.version", + lager:debug("[aircon_sup] version path is: ~p", [VersionFile]), + case file:read_file(VersionFile) of + {ok, RegisterName0} -> + string:trim(RegisterName0); + {error, Reason} -> + lager:warning("[aircon_sup] read .version file get error: ~p", [Reason]), + <<"aircon">> + end. \ No newline at end of file diff --git a/apps/aircon/src/efka_client.erl b/apps/aircon/src/efka_client.erl new file mode 100644 index 0000000..47dd03b --- /dev/null +++ b/apps/aircon/src/efka_client.erl @@ -0,0 +1,431 @@ +%%%------------------------------------------------------------------- +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 28. 8月 2023 15:39 +%%%------------------------------------------------------------------- +-module(efka_client). +-author("aresei"). + +-behaviour(gen_server). + +%% 消息包 +-record(efka_packet, { + packet_id :: integer(), + type :: integer(), + message :: any() +}). + +%% 请求超时时间 +-define(EFKA_REQUEST_TIMEOUT, 5000). + +%% 消息类型 + +%% 服务注册 +-define(PACKET_TYPE_REGISTER, 16). +%% 上传数据 +-define(PACKET_TYPE_METRIC_DATA, 3). +%% 调用其他微服务 +-define(PACKET_TYPE_INVOKE, 4). +%% 消息响应 +-define(PACKET_TYPE_RESPONSE, 7). +%% efka下发给微服务参数 +-define(PACKET_TYPE_PUSH_PARAM, 5). +%% efka下发给微服务采集项 +-define(PACKET_TYPE_PUSH_METRIC, 6). +%% 设备状态轮询: 增加日期: 2025-4-16 +-define(PACKET_TYPE_POLL, 20). +%% 微服务给efka发送log消息 +-define(PACKET_TYPE_LOG, 9). +%% 微服务从efka获取自身的采集项 +-define(PACKET_TYPE_REQUEST_METRIC, 10). +%% 微服务从efka获取自身的参数 +-define(PACKET_TYPE_REQUEST_PARAM, 12). +%% efka向微服务发送stream-call消息 +-define(PACKET_TYPE_PUSH_STREAM_CALL, 11). +%% 微服务事件上报 +-define(PACKET_TYPE_EVENT, 15). + +%% API +-export([start_link/3]). +-export([device_offline/1, device_online/1]). +-export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + packet_id = 0 :: integer(), + host :: string(), + port :: integer(), + %% 请求后未完成的请求 + inflight = #{} :: map(), + socket :: gen_tcp:socket() +}). + +-spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}. +send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) -> + {ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}), + await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). + +-spec invoke_service(ToService :: binary(), Message :: map(), Timeout :: integer()) -> + {ok, Result :: any()} | {error, Reason :: any()}. +invoke_service(ToService, Message, Timeout) when is_binary(ToService), is_map(Message), is_integer(Timeout) -> + {ok, Ref} = gen_server:call(?MODULE, {invoke_service, self(), ToService, Message, Timeout}), + await_reply(Ref, ?EFKA_REQUEST_TIMEOUT). + +-spec send_log(Message :: binary() | map()) -> no_return(). +send_log(Message) when is_binary(Message); is_map(Message) -> + gen_server:cast(?MODULE, {send_log, Message}). + +%% efka_server为了统一,r对象为字符串;需要2次json_decode +-spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}. +request_metric() -> + {ok, Ref} = gen_server:call(?MODULE, {request_metric, self()}), + case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of + {ok, Reply} -> + {ok, jiffy:decode(Reply, [return_maps])}; + Error -> + Error + end. + +-spec request_param() -> {ok, Result :: map()} | {error, Reason :: any()}. +request_param() -> + {ok, Ref} = gen_server:call(?MODULE, {request_param, self()}), + case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of + {ok, Reply} -> + {ok, jiffy:decode(Reply, [return_maps])}; + Error -> + Error + end. + +-spec device_offline(DeviceUUID :: binary()) -> no_return(). +device_offline(DeviceUUID) when is_binary(DeviceUUID) -> + send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}). + +-spec device_online(DeviceUUID :: binary()) -> no_return(). +device_online(DeviceUUID) when is_binary(DeviceUUID) -> + send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}). + +-spec send_event(EventType :: integer(), Params :: binary() | map()) -> no_return(). +send_event(EventType, Params) when is_integer(EventType), is_binary(Params); is_map(Params) -> + gen_server:cast(?MODULE, {send_event, EventType, Params}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: any()} | {error, Reason :: any()}. +await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> + receive + {response, Ref, {ok, Reply}} -> + {ok, Reply}; + {response, Ref, {error, Reason}} -> + {error, Reason} + after + Timeout -> + {error, timeout} + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(RegisterName :: binary(), Host :: string(), Port :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host), is_integer(Port) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [RegisterName, Host, Port], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([RegisterName, Host, Port]) -> + {ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]), + ok = gen_tcp:controlling_process(Socket, self()), + case do_register(RegisterName, Socket) of + ok -> + {ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}}; + {error, Reason} -> + {stop, Reason} + end. + +%% 执行到efka服务器的注册 +do_register(RegisterName, Socket) -> + PacketId = 0, + Body = #{<<"name">> => RegisterName}, + Packet = pack(PacketId, ?PACKET_TYPE_REGISTER, Body), + ok = gen_tcp:send(Socket, Packet), + receive + {tcp, Socket, Data} -> + RegisterPacket = unpack(Data), + lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]), + case RegisterPacket of + #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} -> + ok; + #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 0, <<"e">> := Error}} -> + {error, Error}; + _ -> + {error, invalid_register_packet} + end + after + ?EFKA_REQUEST_TIMEOUT -> + {error, timeout} + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Body = #{<<"data">> => Fields, <<"tag">> => Tags}, + Packet = pack(PacketId, ?PACKET_TYPE_METRIC_DATA, Body), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + +handle_call({invoke_service, ReceiverPid, ToService, Message, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Body = #{ + <<"to">> => ToService, + <<"t">> => Timeout, + <<"m">> => Message + }, + Packet = pack(PacketId, ?PACKET_TYPE_INVOKE, Body), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + +handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_METRIC), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}; + +handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> + Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_PARAM), + ok = gen_tcp:send(Socket, Packet), + Ref = make_ref(), + {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({send_log, Message}, State = #state{socket = Socket, packet_id = PacketId}) -> + Body = #{<<"l">> => Message}, + Packet = pack(PacketId, ?PACKET_TYPE_LOG, Body), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State#state{packet_id = next_packet_id(PacketId)}}; + +handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) -> + Body = #{ + <<"event_type">> => EventType, + <<"params">> => Params + }, + Packet = pack(0, ?PACKET_TYPE_EVENT, Body), + ok = gen_tcp:send(Socket, Packet), + {noreply, State}; + +handle_cast(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({tcp, Socket, Data}, State = #state{socket = Socket}) -> + Packet = unpack(Data), + self() ! {handle_packet, Packet}, + {noreply, State}; +handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) -> + {stop, tcp_closed, State}; + +%% 收到请求的响应 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = Message}}, State = #state{inflight = Inflight}) -> + case maps:take(PacketId, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + case Message of + #{<<"c">> := 1, <<"r">> := Result} -> + ReceiverPid ! {response, Ref, {ok, Result}}; + #{<<"c">> := 0, <<"e">> := Error} -> + ReceiverPid ! {response, Ref, {error, Error}} + end, + {noreply, State#state{inflight = NInflight}} + end; + +%% 收到efka推送的参数设置 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_PARAM, message = Params}}, + State = #state{socket = Socket}) when is_map(Params) -> + + Message = case handle_param(Params) of + ok -> + #{<<"c">> => 1, <<"r">> => <<"ok">>}; + {error, Reason} -> + #{<<"c">> => 0, <<"e">> => Reason} + end, + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 收到efka推送的采集项消息 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_METRIC, message = Metrics}}, + State = #state{socket = Socket}) when is_list(Metrics) -> + + Message = case handle_metric(Metrics) of + ok -> + #{<<"c">> => 1, <<"r">> => <<"ok">>}; + {error, Reason} when is_binary(Reason) -> + #{<<"c">> => 0, <<"e">> => Reason} + end, + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 收到设备状态的轮询请求 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_POLL, message = Command}}, State = #state{socket = Socket}) -> + Message = handle_poll_command(Command), + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 收到efka的stream-call消息 +handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_STREAM_CALL, + message = Msg = #{<<"service_name">> := ServiceName, <<"data">> := Data, <<"tag">> := Tag}}}, State = #state{socket = Socket}) -> + + Message = case handle_stream_call(ServiceName, Data, Tag) of + {continue, NewServiceName, NewData, NewTag} -> + #{ + <<"c">> => 1, + <<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag}, + <<"k">> => true + }; + %% 处理到当前节点为止,不继续往下传递 + {break, NewServiceName, NewData, NewTag} -> + #{ + <<"c">> => 1, + <<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag}, + <<"k">> => false + }; + error -> + #{ + <<"c">> => 0, + <<"r">> => Msg, + <<"k">> => true + } + end, + Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message), + ok = gen_tcp:send(Socket, Packet), + + {noreply, State}; + +%% 其他消息为非法消息 +handle_info({handle_packet, _Packet}, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{socket = Socket}) -> + gen_tcp:close(Socket), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). +next_packet_id(PacketId) when PacketId >= 65535 -> + 0; +next_packet_id(PacketId) -> + PacketId + 1. + +-spec pack(PacketId :: integer(), Type :: integer()) -> binary(). +pack(PacketId, Type) when is_integer(PacketId), is_integer(Type) -> + <>. + +-spec pack(PacketId :: integer(), Type :: integer(), Body :: map()) -> binary(). +pack(PacketId, Type, Body) when is_integer(PacketId), is_integer(Type), is_map(Body) -> + Message = iolist_to_binary(jiffy:encode(Body, [force_utf8])), + <>. + +-spec unpack(binary()) -> #efka_packet{}. +unpack(<>) -> + Message = catch jiffy:decode(Body, [return_maps]), + #efka_packet{packet_id = PacketId, type = Type, message = Message}. + +%%%=================================================================== +%%% simple callbacks +%%%=================================================================== + +handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) -> + case power_device:get_pid(DeviceUUID) of + undefined -> + #{ + <<"c">> => 1, + <<"r">> => #{ + <<"edge_status">> => -1, + <<"message">> => <<"设备信息不存在"/utf8>> + } + }; + Pid -> + StatusMap = #{ + 0 => <<"离线"/utf8>>, + 1 => <<"在线"/utf8>> + }, + {ok, Status} = power_device:poll_status(Pid), + #{ + <<"c">> => 1, + <<"r">> => #{ + <<"edge_status">> => Status, + <<"message">> => maps:get(Status, StatusMap) + } + } + end. + +-spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}. +handle_param(Params) when is_map(Params) -> + power_gateway_args:push_param(Params), + ok. + +-spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}. +handle_metric(Metric) when is_list(Metric) -> + power_gateway_args:push_metric(Metric), + ok. + +-spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) -> + {continue, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()} + | {break, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()} + | error. +handle_stream_call(ServiceName, Fields, Tag) when is_binary(ServiceName), is_list(Fields), is_map(Tag) -> + {continue, ServiceName, Fields, Tag}. \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config new file mode 100644 index 0000000..52032cf --- /dev/null +++ b/config/sys-dev.config @@ -0,0 +1,21 @@ +[ + {aircon, [ + %% 离线判断时间间隔,单位:(秒) + {heartbeat_ticker, 120}, + + {emqx_server, [ + {host, "39.98.184.67"}, + {port, 1883}, + {tcp_opts, []}, + {username, "test"}, + {password, "test1234"}, + {keepalive, 86400}, + {retry_interval, 5} + ]}, + + {efka_server, [ + {host, "39.98.184.67"}, + {port, 3361} + ]} + ]} +]. diff --git a/config/sys-prod.config b/config/sys-prod.config new file mode 100644 index 0000000..8d09770 --- /dev/null +++ b/config/sys-prod.config @@ -0,0 +1,22 @@ +[ + {aircon, [ + %% 离线判断时间间隔,单位:(秒) + {heartbeat_ticker, 120}, + + {emqx_server, [ + {host, "172.30.37.212"}, + {port, 1883}, + {tcp_opts, []}, + {username, "aircon"}, + {password, "A9K2rM8QxL7WZsP5D@B!3"}, + {keepalive, 86400}, + {retry_interval, 5} + ]}, + + {efka_server, [ + {host, "39.98.184.67"}, + {port, 3361} + ]} + ]} + +]. diff --git a/config/sys.config b/config/sys.config deleted file mode 100644 index 5f8f907..0000000 --- a/config/sys.config +++ /dev/null @@ -1,3 +0,0 @@ -[ - {aircon, []} -]. diff --git a/mqtt数据示例最新.txt b/mqtt数据示例最新.txt new file mode 100644 index 0000000..0c499d3 --- /dev/null +++ b/mqtt数据示例最新.txt @@ -0,0 +1,30 @@ +采集数据推送 +topic: +/aircon/20525456021829/data +massage: +[ +{"key":"OnOff","value":0,"unit":10,"type":"AI","timestamp":1767747017459,"name":"开关状态","label":"开关状态","sessionid":760143116038215}, +{"key":"WorkModel","value":"0","unit":10,"type":"AI","timestamp":1767747017567,"name":"运行模式","label":"运行模式","sessionid":760143116480581}, +{"key":"WindSpeed","value":0,"unit":10,"type":"AI","timestamp":1767747017567,"name":"设定风速","label":"设定风速","sessionid":760143116480582}, +{"key":"Temperature","value":17.0,"unit":10,"type":"AI","timestamp":1767747017567,"name":"设定温度","label":"设定温度","sessionid":760143116480583}, +{"key":"ValueTime","value":"2025-04-10 16:28:57","unit":10,"type":"AI","timestamp":1767747017567,"name":"采集时间","label":"采集时间","sessionid":760143116480584}, +{"key":"Zvalue","value":0.000198,"unit":5,"type":"AI","timestamp":1767747017567,"name":"累计运行能耗","label":"累计运行能耗","sessionid":760143116480585}, +{"key":"ShareValue","value":0.0,"unit":5,"type":"AI","timestamp":1767747017567,"name":"累计分摊能耗","label":"累计分摊能耗","sessionid":760143116480586} +] + +控制指令下发 +topic: +/aircon/20525456021829/command +massage: +{"key":"00000005","value":"25","timestamp":1767747017459,"name":"调温","sessionid":760143116038215} + +控制指令应答 +topic: +/aircon/20525456021829/command_reply + +massage: +{"sessionid":760143116038215,"code":"0","msg":"指令流转失败","timestamp":1767747017459} +{"sessionid":760143116038215,"code":"1","msg":"指令下发成功","timestamp":1767747017459} +{"sessionid":760143116038215,"code":"2","msg":"指令响应超时","timestamp":1767747017459} +{"sessionid":760143116038215,"code":"3","msg":"指令下发失败","timestamp":1767747017459} +