Compare commits

..

10 Commits

Author SHA1 Message Date
e1e5adef77 add logger handler 2026-01-09 14:37:33 +08:00
8cabec6c09 set logger 2026-01-09 14:26:51 +08:00
d39e1ebcf4 fix logger config 2026-01-08 17:49:54 +08:00
4afa622732 fix loggers 2026-01-08 17:21:58 +08:00
e2f54d21e0 fix 2026-01-08 16:51:56 +08:00
768c5d6206 add publisher 2026-01-08 16:02:20 +08:00
6788964624 fix config 2026-01-08 15:39:35 +08:00
336362c4e4 替换日志框架 2026-01-08 15:17:17 +08:00
f7687129b9 fix config 2026-01-08 15:04:54 +08:00
d7b6f5eb32 fix devic 2026-01-08 15:03:45 +08:00
13 changed files with 530 additions and 62 deletions

1
.gitignore vendored
View File

@ -1,4 +1,5 @@
.rebar3 .rebar3
config/sys.config
_build _build
_checkouts _checkouts
_vendor _vendor

View File

@ -10,9 +10,18 @@
-export([start/2, stop/1]). -export([start/2, stop/1]).
start(_StartType, _StartArgs) -> start(_StartType, _StartArgs) ->
set_logger_level(),
logger:warning("call me here start logger"),
aircon_sup:start_link(). aircon_sup:start_link().
stop(_State) -> stop(_State) ->
ok. ok.
%% internal functions %% internal functions
set_logger_level() ->
logger:set_application_level(kernel, notice),
logger:set_application_level(stdlib, notice),
logger:set_application_level(emqtt, debug),
logger:set_application_level(aircon, debug),
ok.

View File

@ -69,13 +69,13 @@ init([]) ->
{ok, Metrics} = efka_client:request_metric(), {ok, Metrics} = efka_client:request_metric(),
try convert_metric(Metrics) of try convert_metric(Metrics) of
{ok, MetricMap} -> {ok, MetricMap} ->
lager:debug("[aircon_args] init load metric_map: ~p", [MetricMap]), logger:debug("[aircon_args] init load metric_map: ~p", [MetricMap]),
{ok, Param} = efka_client:request_param(), {ok, Param} = efka_client:request_param(),
{ok, #state{metrics = MetricMap, param = Param}} {ok, #state{metrics = MetricMap, param = Param}}
catch catch
_:Error:Stack-> _:Error:Stack->
lager:warning("[aircon_args] request_metric get error: ~p, stack: ~p", [Error, Stack]), logger:warning("[aircon_args] request_metric get error: ~p, stack: ~p", [Error, Stack]),
{ok, #state{metrics = #{}, param = #{}}} {ok, #state{metrics = #{}, param = #{}}}
end. end.
@ -111,7 +111,7 @@ handle_cast({push_param, Param}, State = #state{}) ->
handle_cast({push_metric, Metrics}, State = #state{}) -> handle_cast({push_metric, Metrics}, State = #state{}) ->
try convert_metric(Metrics) of try convert_metric(Metrics) of
{ok, MetricMap} -> {ok, MetricMap} ->
lager:debug("[aircon_args] push metric_map: ~p", [MetricMap]), logger:debug("[aircon_args] push metric_map: ~p", [MetricMap]),
{noreply, State#state{metrics = MetricMap}} {noreply, State#state{metrics = MetricMap}}
catch _:_ -> catch _:_ ->
{noreply, State} {noreply, State}

View File

@ -0,0 +1,177 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 9 2024 16:13
%%%-------------------------------------------------------------------
-module(aircon_device).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/2]).
-export([get_pid/1, get_name/1]).
-export([metric_data/2, poll_status/1]).
-define(UNIT0, 16#10).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
device_uuid :: binary(),
heartbeat_ticker :: integer(),
%%
data_counter = 0,
%% 1: 线0: 线
status = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid(DeviceUUID :: binary()) -> undefined | pid().
get_pid(DeviceUUID) when is_binary(DeviceUUID) ->
whereis(get_name(DeviceUUID)).
-spec get_name(DeviceUUID :: binary()) -> atom().
get_name(DeviceUUID) when is_binary(DeviceUUID) ->
binary_to_atom(<<"aircon_device:", DeviceUUID/binary>>).
-spec metric_data(Pid :: pid(), Message :: binary()) -> no_return().
metric_data(Pid, Message) when is_pid(Pid), is_binary(Message) ->
gen_server:cast(Pid, {metric_data, Message}).
%% , 1: 线0: 线
-spec poll_status(Pid :: pid()) -> {ok, Status :: integer()}.
poll_status(Pid) when is_pid(Pid) ->
gen_server:call(Pid, poll_status).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Name :: atom(), DeviceUUID :: binary()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) ->
gen_server:start_link({local, get_name(DeviceUUID)}, ?MODULE, [DeviceUUID], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([DeviceUUID]) ->
{ok, HeartbeatTicker0} = application:get_env(aircon, heartbeat_ticker),
HeartbeatTicker = HeartbeatTicker0 * 1000,
erlang:start_timer(HeartbeatTicker, self(), heartbeat_ticker),
{ok, #state{device_uuid = DeviceUUID, heartbeat_ticker = HeartbeatTicker, status = 0}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(poll_status, _From, State = #state{status = Status}) ->
{reply, {ok, Status}, State};
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({metric_data, Message}, State = #state{device_uuid = DeviceUUID, data_counter = DataCounter, status = Status}) ->
case catch jiffy:decode(Message, [return_maps]) of
#{<<"properties">> := Props0} ->
Props = lists:map(fun(Fields) -> transform(Fields#{<<"device_uuid">> => DeviceUUID}) end, Props0),
Info = iolist_to_binary(jiffy:encode(Props, [force_utf8])),
case catch efka_client:send_metric_data(Props, #{}) of
{ok, _} ->
aircon_logger:write([<<"OK">>, Info]);
_ ->
aircon_logger:write([<<"ERROR">>, Info])
end,
%% 线线
Status == 0 andalso efka_client:device_online(DeviceUUID),
{noreply, State#state{data_counter = DataCounter + 1, status = 1}};
M when is_map(M) ->
logger:notice("[power_device] invalid map: ~p", [M]);
Error ->
logger:notice("[power_device] jiffy decode error: ~p", [Error]),
{noreply, State}
end.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, heartbeat_ticker}, State = #state{device_uuid = DeviceUUID, heartbeat_ticker = HeartbeatTicker, data_counter = DataCounter, status = Status}) ->
erlang:start_timer(HeartbeatTicker, self(), heartbeat_ticker),
case DataCounter > 0 of
true ->
{noreply, State};
false ->
Status == 1 andalso efka_client:device_offline(DeviceUUID),
{noreply, State#state{status = 0}}
end.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% value是类型unit才是数值
%% [{"value":"int","unit":44081,"type":"AI","timestamp":1730799797,"name":"使用次数","label":"","key":"use_times","device_uuid":"30409239002349977617259608405456"}]
-spec transform(Prop :: map()) -> map().
transform(Prop = #{<<"key">> := <<"total_power">>, <<"unit">> := Unit}) ->
Prop#{<<"name">> => <<"总能耗"/utf8>>, <<"value">> => Unit, <<"unit">> => 16#02};
transform(Prop = #{<<"key">> := <<"total_runtime">>, <<"unit">> := Unit}) ->
Prop#{<<"name">> => <<"总运行时间"/utf8>>, <<"value">> => Unit, <<"unit">> => ?UNIT0};
transform(Prop = #{<<"key">> := <<"use_times">>, <<"unit">> := Unit}) ->
Prop#{<<"name">> => <<"使用次数"/utf8>>, <<"label">> => <<""/utf8>>, <<"value">> => Unit, <<"unit">> => ?UNIT0};
transform(Prop = #{<<"key">> := <<"light_switch">>, <<"unit">> := Unit}) ->
Prop#{<<"name">> => <<"开关"/utf8>>, <<"value">> => Unit, <<"unit">> => ?UNIT0};
transform(Prop = #{<<"key">> := <<"light_brightness">>, <<"unit">> := Unit}) when is_integer(Unit) ->
Label = iolist_to_binary([<<"亮度设置为:"/utf8>>, integer_to_binary(Unit)]),
Prop#{<<"name">> => <<"亮度"/utf8>>, <<"label">> => Label, <<"value">> => Unit, <<"unit">> => ?UNIT0};
transform(Prop = #{<<"key">> := <<"light_change_time">>, <<"unit">> := Unit}) ->
Prop#{<<"name">> => <<"变亮变暗时间"/utf8>>, <<"value">> => Unit, <<"unit">> => ?UNIT0};
transform(Prop = #{<<"key">> := <<"light_status">>, <<"unit">> := Unit}) ->
Prop#{<<"name">> => <<"是否损坏"/utf8>>, <<"value">> => Unit, <<"unit">> => ?UNIT0};
transform(Prop = #{<<"unit">> := Unit}) ->
Prop#{<<"value">> => Unit, <<"unit">> => ?UNIT0}.

View File

@ -0,0 +1,80 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2024, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 9 2024 15:39
%%%-------------------------------------------------------------------
-module(aircon_device_sup).
-author("anlicheng").
-behaviour(supervisor).
%% API
-export([start_link/0]).
-export([ensure_device_started/1, delete_device/1]).
%% Supervisor callbacks
-export([init/1]).
-define(SERVER, ?MODULE).
%%%===================================================================
%%% API functions
%%%===================================================================
%% @doc Starts the supervisor
-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
supervisor:start_link({local, ?SERVER}, ?MODULE, []).
%%%===================================================================
%%% Supervisor callbacks
%%%===================================================================
%% @private
%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3],
%% this function is called by the new process to find out about
%% restart strategy, maximum restart frequency and child
%% specifications.
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
{ok, {SupFlags, []}}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec ensure_device_started(DeviceUUID :: binary()) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensure_device_started(DeviceUUID) when is_binary(DeviceUUID) ->
case aircon_device:get_pid(DeviceUUID) of
DevicePid when is_pid(DevicePid) ->
{ok, DevicePid};
undefined ->
case supervisor:start_child(?MODULE, child_spec(DeviceUUID)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, {'already_started', Pid}} when is_pid(Pid) ->
{ok, Pid};
{error, Error} ->
{error, Error}
end
end.
delete_device(DeviceUUID) when is_binary(DeviceUUID) ->
Id = aircon_device:get_name(DeviceUUID),
ok = supervisor:terminate_child(?MODULE, Id),
supervisor:delete_child(?MODULE, Id).
child_spec(DeviceUUID) when is_binary(DeviceUUID) ->
Name = aircon_device:get_name(DeviceUUID),
#{
id => Name,
start => {aircon_device, start_link, [Name, DeviceUUID]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['aircon_device']
}.

View File

@ -0,0 +1,165 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% 1.
%%% 2. host进程不能直接去监听topic线
%%% @end
%%% Created : 12. 3 2023 21:27
%%%-------------------------------------------------------------------
-module(aircon_mqtt_publisher).
-author("aresei").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([publish/3]).
-export([test/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
conn_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
test() ->
Topic = <<"/aircon/20525456021829/data">>,
Payload = <<"{\"key\":\"OnOff\",\"value\":0,\"unit\":10,\"type\":\"AI\",\"timestamp\":1767747017459,\"name\":\"开关状态\",\"label\":\"开关状态\",\"sessionid\":760143116038215}">>,
publish(Topic, Payload, 2).
-spec publish(Topic :: binary(), Payload :: binary(), Qos :: integer()) -> no_return().
publish(Topic, Payload, Qos) when is_binary(Topic), is_binary(Payload), is_integer(Qos) ->
gen_server:cast(?SERVER, {publish, Topic, Payload, Qos}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
%% emqx服务器的连接
Opts = emqx_opts(<<"aircon-data-publisher">>),
logger:debug("[mqtt_publisher] opts is: ~p", [Opts]),
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
logger:debug("[mqtt_publisher] start conntecting, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid),
logger:debug("[mqtt_publisher] connect success"),
{ok, #state{conn_pid = ConnPid}};
ignore ->
logger:debug("[mqtt_publisher] connect emqx get ignore"),
{stop, ignore};
{error, Reason} ->
logger:debug("[mqtt_publisher] connect emqx get error: ~p", [Reason]),
{stop, Reason}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({publish, Topic, Payload, Qos}, State = #state{conn_pid = ConnPid}) ->
Res = emqtt:publish(ConnPid, Topic, Payload, Qos),
logger:debug("[mqtt_publisher] publish result is: ~p", [Res]),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
logger:debug("[mqtt_publisher] receive puback packet: ~p", [Packet]),
{noreply, State};
handle_info(Info, State = #state{}) ->
logger:debug("[mqtt_publisher] get info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
ok = emqtt:disconnect(ConnPid),
logger:debug("[iot_mqtt_publisher] terminate with reason: ~p", [Reason]),
ok;
terminate(Reason, _State) ->
logger:debug("[iot_mqtt_publisher] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec emqx_opts(ClientSuffix :: binary()) -> list().
emqx_opts(ClientSuffix) when is_binary(ClientSuffix) ->
%% emqx服务器的连接
{ok, Props} = application:get_env(aircon, emqx_server),
EMQXHost = proplists:get_value(host, Props),
EMQXPort = proplists:get_value(port, Props, 1883),
Username = proplists:get_value(username, Props),
Password = proplists:get_value(password, Props),
RetryInterval = proplists:get_value(retry_interval, Props, 5),
Keepalive = proplists:get_value(keepalive, Props, 86400),
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>,
[
{clientid, ClientId},
{host, EMQXHost},
{port, EMQXPort},
{owner, self()},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, Keepalive},
{auto_ack, true},
{proto_ver, v3},
{retry_interval, RetryInterval}
].

View File

@ -51,23 +51,22 @@ start_link() ->
init([]) -> init([]) ->
%% emqx服务器的连接 %% emqx服务器的连接
Opts = emqx_opts(<<"aircon-data-subscriber">>), Opts = emqx_opts(<<"aircon-data-subscriber">>),
lager:debug("[opts] is: ~p", [Opts]), logger:debug("[mqtt_subscriber] opts is: ~p", [Opts]),
case emqtt:start_link(Opts) of case emqtt:start_link(Opts) of
{ok, ConnPid} -> {ok, ConnPid} ->
lager:debug("[mqtt_subscriber] start conntecting, pid: ~p", [ConnPid]), logger:debug("[mqtt_subscriber] start conntecting, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid), {ok, _} = emqtt:connect(ConnPid),
lager:debug("[mqtt_subscriber] connect success"), logger:debug("[mqtt_subscriber] connect success"),
%% %%
SubscribeResult = emqtt:subscribe(ConnPid, ?Topics), SubscribeResult = emqtt:subscribe(ConnPid, ?Topics),
lager:debug("[mqtt_subscriber] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]), logger:debug("[mqtt_subscriber] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]),
{ok, #state{conn_pid = ConnPid}}; {ok, #state{conn_pid = ConnPid}};
ignore -> ignore ->
lager:debug("[mqtt_subscriber] connect emqx get ignore"), logger:debug("[mqtt_subscriber] connect emqx get ignore"),
{stop, ignore}; {stop, ignore};
{error, Reason} -> {error, Reason} ->
lager:debug("[mqtt_subscriber] connect emqx get error: ~p", [Reason]), logger:debug("[mqtt_subscriber] connect emqx get error: ~p", [Reason]),
{stop, Reason} {stop, Reason}
end. end.
@ -100,11 +99,11 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> handle_info({disconnect, ReasonCode, Properties}, State = #state{}) ->
lager:debug("[mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), logger:debug("[mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State}; {stop, disconnected, State};
%% json反序列需要在host进程进行 %% json反序列需要在host进程进行
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) -> handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) ->
lager:debug("[mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), logger:debug("[mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]),
%% publisher进程去处理, ; topic格式: /aircon/20525456021829/data %% publisher进程去处理, ; topic格式: /aircon/20525456021829/data
case Topic of case Topic of
<<"/aircon/", DeviceInfo/binary>> -> <<"/aircon/", DeviceInfo/binary>> ->
@ -119,11 +118,11 @@ handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos,
end, end,
{noreply, State}; {noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[mqtt_subscriber] receive puback packet: ~p", [Packet]), logger:debug("[mqtt_subscriber] receive puback packet: ~p", [Packet]),
{noreply, State}; {noreply, State};
handle_info(Info, State = #state{}) -> handle_info(Info, State = #state{}) ->
lager:debug("[mqtt_subscriber] get info: ~p", [Info]), logger:debug("[mqtt_subscriber] get info: ~p", [Info]),
{noreply, State}. {noreply, State}.
%% @private %% @private
@ -139,10 +138,10 @@ terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
ok = emqtt:disconnect(ConnPid), ok = emqtt:disconnect(ConnPid),
lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), logger:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok; ok;
terminate(Reason, _State) -> terminate(Reason, _State) ->
lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), logger:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok. ok.
%% @private %% @private
@ -186,14 +185,16 @@ emqx_opts(ClientSuffix) when is_binary(ClientSuffix) ->
-spec dispatch(DeviceMac :: binary(), Message :: binary()) -> no_return(). -spec dispatch(DeviceMac :: binary(), Message :: binary()) -> no_return().
dispatch(DeviceMac, Message) when is_binary(DeviceMac), is_binary(Message) -> dispatch(DeviceMac, Message) when is_binary(DeviceMac), is_binary(Message) ->
case power_gateway_args:get_device_uuid(DeviceMac) of logger:notice("[mqtt_subscriber] device_mac: ~p, device_uuid not found", [DeviceMac]),
error -> ok.
lager:notice("[mqtt_subscriber] device_mac: ~p, device_uuid not found", [DeviceMac]); %case aircon_args:get_device_uuid(DeviceMac) of
{ok, DeviceUUID} -> % error ->
case power_device_sup:ensure_device_started(DeviceUUID) of % logger:notice("[mqtt_subscriber] device_mac: ~p, device_uuid not found", [DeviceMac]);
{ok, DevicePid} -> % {ok, DeviceUUID} ->
power_device:metric_data(DevicePid, Message); % case aircon_device_sup:ensure_device_started(DeviceUUID) of
{error, Reason} -> % {ok, DevicePid} ->
lager:notice("[mqtt_subscriber] start device get error: ~p", [Reason]) % aircon_device:metric_data(DevicePid, Message);
end % {error, Reason} ->
end. % logger:notice("[mqtt_subscriber] start device get error: ~p", [Reason])
% end
%end.

View File

@ -26,7 +26,7 @@ start_link() ->
%% type => worker(), % optional %% type => worker(), % optional
%% modules => modules()} % optional %% modules => modules()} % optional
init([]) -> init([]) ->
SupFlags = #{strategy => one_for_all, intensity => 0, period => 1}, SupFlags = #{strategy => one_for_all, intensity => 1000, period => 3600},
{ok, EfkaServer} = application:get_env(aircon, efka_server), {ok, EfkaServer} = application:get_env(aircon, efka_server),
Host = proplists:get_value(host, EfkaServer), Host = proplists:get_value(host, EfkaServer),
@ -36,14 +36,23 @@ init([]) ->
RegisterName = read_service_name(), RegisterName = read_service_name(),
ChildSpecs = [ ChildSpecs = [
#{ %#{
id => 'efka_client', % id => 'efka_client',
start => {'efka_client', start_link, [RegisterName, Host, Port]}, % start => {'efka_client', start_link, [RegisterName, Host, Port]},
restart => permanent, % restart => permanent,
shutdown => 2000, % shutdown => 2000,
type => worker, % type => worker,
modules => ['efka_client'] % modules => ['efka_client']
}, %},
%#{
% id => 'aircon_args',
% start => {'aircon_args', start_link, []},
% restart => permanent,
% shutdown => 2000,
% type => worker,
% modules => ['aircon_args']
%},
#{ #{
id => aircon_logger, id => aircon_logger,
@ -55,22 +64,22 @@ init([]) ->
}, },
#{ #{
id => 'aircon_args', id => 'aircon_device_sup',
start => {'aircon_args', start_link, []}, start => {'aircon_device_sup', start_link, []},
restart => permanent, restart => permanent,
shutdown => 2000, shutdown => 2000,
type => worker, type => worker,
modules => ['aircon_args'] modules => ['aircon_device_sup']
}, },
%#{ #{
% id => 'power_device_sup', id => 'aircon_mqtt_publisher',
% start => {'power_device_sup', start_link, []}, start => {'aircon_mqtt_publisher', start_link, []},
% restart => permanent, restart => permanent,
% shutdown => 2000, shutdown => 2000,
% type => worker, type => worker,
% modules => ['power_device_sup'] modules => ['aircon_mqtt_publisher']
%}, },
#{ #{
id => 'aircon_mqtt_subscriber', id => 'aircon_mqtt_subscriber',
@ -90,11 +99,11 @@ read_service_name() ->
%% %%
RootDir = code:root_dir(), RootDir = code:root_dir(),
VersionFile = RootDir ++ "/.version", VersionFile = RootDir ++ "/.version",
lager:debug("[aircon_sup] version path is: ~p", [VersionFile]), logger:info("[aircon_sup] version path is: ~p", [VersionFile]),
case file:read_file(VersionFile) of case file:read_file(VersionFile) of
{ok, RegisterName0} -> {ok, RegisterName0} ->
string:trim(RegisterName0); string:trim(RegisterName0);
{error, Reason} -> {error, Reason} ->
lager:warning("[aircon_sup] read .version file get error: ~p", [Reason]), logger:warning("[aircon_sup] read .version file get error: ~p", [Reason]),
<<"aircon">> <<"aircon">>
end. end.

View File

@ -162,7 +162,7 @@ do_register(RegisterName, Socket) ->
receive receive
{tcp, Socket, Data} -> {tcp, Socket, Data} ->
RegisterPacket = unpack(Data), RegisterPacket = unpack(Data),
lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]), logger:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]),
case RegisterPacket of case RegisterPacket of
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} -> #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} ->
ok; ok;
@ -389,7 +389,7 @@ unpack(<<PacketId:16, Type:8, Body/binary>>) ->
%%%=================================================================== %%%===================================================================
handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) -> handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) ->
case power_device:get_pid(DeviceUUID) of case aircon_device:get_pid(DeviceUUID) of
undefined -> undefined ->
#{ #{
<<"c">> => 1, <<"c">> => 1,
@ -403,7 +403,7 @@ handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query
0 => <<"离线"/utf8>>, 0 => <<"离线"/utf8>>,
1 => <<"在线"/utf8>> 1 => <<"在线"/utf8>>
}, },
{ok, Status} = power_device:poll_status(Pid), {ok, Status} = aircon_device:poll_status(Pid),
#{ #{
<<"c">> => 1, <<"c">> => 1,
<<"r">> => #{ <<"r">> => #{
@ -415,12 +415,12 @@ handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query
-spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}. -spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}.
handle_param(Params) when is_map(Params) -> handle_param(Params) when is_map(Params) ->
power_gateway_args:push_param(Params), aircon_args:push_param(Params),
ok. ok.
-spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}. -spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}.
handle_metric(Metric) when is_list(Metric) -> handle_metric(Metric) when is_list(Metric) ->
power_gateway_args:push_metric(Metric), aircon_args:push_metric(Metric),
ok. ok.
-spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) -> -spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) ->

View File

@ -4,18 +4,45 @@
{heartbeat_ticker, 120}, {heartbeat_ticker, 120},
{emqx_server, [ {emqx_server, [
{host, "39.98.184.67"}, {host, "118.178.229.213"},
{port, 1883}, {port, 1883},
{tcp_opts, []}, {tcp_opts, []},
{username, "test"}, {username, "aircon"},
{password, "test1234"}, {password, "A9K2rM8QxL7WZsP5D@B!3"},
{keepalive, 86400}, {keepalive, 86400},
{retry_interval, 5} {retry_interval, 5}
]}, ]},
{efka_server, [ {efka_server, [
{host, "39.98.184.67"}, {host, "118.178.229.213"},
{port, 3361} {port, 3361}
]} ]}
]},
{kernel, [
%% 设置 Logger 的 primary log level
{logger_level, debug},
{logger, [
{handler, default, logger_std_h,
#{
level => debug,
formatter => {logger_formatter, #{template => [time, " [", level, "] ", msg, "\n"]}}
}
},
{handler, disk, logger_disk_log_h,
#{
level => debug,
config => #{
file => "log/debug.log",
max_no_files => 10,
max_no_bytes => 524288000
},
formatter => {logger_formatter, #{template => [time, " [", level, "] ", msg, "\n"]}}
}
}
]} ]}
]}
]. ].

View File

@ -18,5 +18,4 @@
{port, 3361} {port, 3361}
]} ]}
]} ]}
]. ].

View File

@ -2,7 +2,7 @@
{deps, [ {deps, [
{sync, ".*", {git, "https://github.com/rustyio/sync.git", {branch, "master"}}}, {sync, ".*", {git, "https://github.com/rustyio/sync.git", {branch, "master"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}},
{emqtt, ".*", {git, "https://gitea.s5s8.com/anlicheng/emqtt.git", {tag, "v1.0"}}} {emqtt, ".*", {git, "https://gitea.s5s8.com/anlicheng/emqtt.git", {tag, "v1.1"}}}
]}. ]}.
{relx, [{release, {aircon, "0.1.0"}, {relx, [{release, {aircon, "0.1.0"},

View File

@ -1,7 +1,7 @@
{"1.2.0", {"1.2.0",
[{<<"emqtt">>, [{<<"emqtt">>,
{git,"https://gitea.s5s8.com/anlicheng/emqtt.git", {git,"https://gitea.s5s8.com/anlicheng/emqtt.git",
{ref,"5111914a9b1b92b0b497f825c77bdd365e3989b0"}}, {ref,"c590588b16f887bd5e1151626090f28aae55099b"}},
0}, 0},
{<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1}, {<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1},
{<<"jiffy">>, {<<"jiffy">>,