This commit is contained in:
anlicheng 2026-01-08 14:51:09 +08:00
parent 734545f873
commit 6c80d8d16c
9 changed files with 1100 additions and 7 deletions

View File

@ -0,0 +1,159 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 9 2023 16:37
%%%-------------------------------------------------------------------
-module(aircon_args).
-author("aresei").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([get_metric/0, get_param/0, push_metric/1, push_param/1, get_device_uuid/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
%% #{device_id => device_uuid}, serial
metrics = #{},
param = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_param() -> {ok, Param :: #{}}.
get_param() ->
gen_server:call(?MODULE, get_param).
-spec get_metric() -> {ok, Metric :: map()}.
get_metric() ->
gen_server:call(?MODULE, get_metric).
-spec get_device_uuid(DeviceId :: binary()) -> error | {ok, DeviceUUID :: binary()}.
get_device_uuid(DeviceId) when is_binary(DeviceId) ->
gen_server:call(?MODULE, {get_device_uuid, DeviceId}).
-spec push_param(Param :: map()) -> no_return().
push_param(Param) when is_map(Param) ->
gen_server:cast(?MODULE, {push_param, Param}).
-spec push_metric(Metrics :: list()) -> no_return().
push_metric(Metrics) when is_list(Metrics) ->
gen_server:cast(?MODULE, {push_metric, Metrics}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, Metrics} = efka_client:request_metric(),
try convert_metric(Metrics) of
{ok, MetricMap} ->
lager:debug("[aircon_args] init load metric_map: ~p", [MetricMap]),
{ok, Param} = efka_client:request_param(),
{ok, #state{metrics = MetricMap, param = Param}}
catch
_:Error:Stack->
lager:warning("[aircon_args] request_metric get error: ~p, stack: ~p", [Error, Stack]),
{ok, #state{metrics = #{}, param = #{}}}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(get_param, _From, State = #state{param = Param}) ->
{reply, {ok, Param}, State};
handle_call({get_device_uuid, DeviceId}, _From, State = #state{metrics = Metrics}) when is_map(Metrics) ->
Result = maps:find(DeviceId, Metrics),
{reply, Result, State};
handle_call({get_device_uuid, _DeviceId}, _From, State) ->
{reply, error, State};
handle_call(get_metric, _From, State = #state{metrics = Metrics}) ->
{reply, {ok, Metrics}, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({push_param, Param}, State = #state{}) ->
{noreply, State#state{param = Param}};
handle_cast({push_metric, Metrics}, State = #state{}) ->
try convert_metric(Metrics) of
{ok, MetricMap} ->
lager:debug("[aircon_args] push metric_map: ~p", [MetricMap]),
{noreply, State#state{metrics = MetricMap}}
catch _:_ ->
{noreply, State}
end.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec convert_metric(Metrics :: list()) -> {ok, map()}.
convert_metric(Metrics) when is_list(Metrics) ->
MetricTuples = lists:flatmap(fun
(#{<<"serial">> := DeviceId, <<"device_uuid">> := DeviceUUID}) ->
[{DeviceId, DeviceUUID}];
(_) ->
[]
end, Metrics),
{ok, maps:from_list(MetricTuples)}.

View File

@ -0,0 +1,169 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 9 2023 17:07
%%%-------------------------------------------------------------------
-module(aircon_logger).
-author("aresei").
-behaviour(gen_server).
%% API
-export([start_link/1, write/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
file_name :: string(),
date :: calendar:date(),
file
}).
%%%===================================================================
%%% API
%%%===================================================================
write(Data) ->
gen_server:cast(?SERVER, {write, Data}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(FileName :: string()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(FileName) when is_list(FileName) ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [FileName], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([FileName]) ->
ensure_dir(),
FilePath = make_file(FileName, erlang:date()),
{ok, File} = file:open(FilePath, [append, binary]),
{ok, #state{file = File, file_name = FileName, date = get_date()}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({write, Data}, State = #state{file = OldFile, file_name = FileName, date = Date}) ->
Line = iolist_to_binary([time_prefix(), <<" ">>, format(Data), <<$\n>>]),
NDate = erlang:date(),
case maybe_new_file(Date, NDate) of
true ->
file:close(OldFile),
FilePath = make_file(FileName, NDate),
{ok, File} = file:open(FilePath, [append, binary]),
ok = file:write(File, Line),
%% , :
delete_old_files(FileName, 10),
{noreply, State#state{file = File, date = get_date()}};
false ->
ok = file:write(OldFile, Line),
{noreply, State}
end.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
format(Data) when is_binary(Data) ->
iolist_to_binary(Data);
format(Items) when is_list(Items) ->
iolist_to_binary(lists:join(<<"\t">>, Items)).
time_prefix() ->
{{Y, M, D}, {H, I, S}} = calendar:local_time(),
iolist_to_binary(io_lib:format("[~b-~2..0b-~2..0b ~2..0b:~2..0b:~2..0b]", [Y, M, D, H, I, S])).
-spec make_file(LogFile :: string(), {Year :: integer(), Month :: integer(), Day :: integer()}) -> string().
make_file(LogFile, {Year, Month, Day}) when is_list(LogFile) ->
Suffix = io_lib:format("~b~2..0b~2..0b", [Year, Month, Day]),
RootDir = code:root_dir() ++ "/log/",
lists:flatten(RootDir ++ LogFile ++ "." ++ Suffix).
ensure_dir() ->
RootDir = code:root_dir() ++ "/log/",
case filelib:is_dir(RootDir) of
true ->
ok;
false ->
ok = file:make_dir(RootDir)
end.
%%
-spec get_date() -> Date :: calendar:date().
get_date() ->
{Date, _} = calendar:local_time(),
Date.
%%
-spec maybe_new_file(Date :: calendar:date(), Date0 :: calendar:date()) -> boolean().
maybe_new_file({Y, M, D}, {Y0, M0, D0}) ->
not (Y =:= Y0 andalso M =:= M0 andalso D =:= D0).
-spec delete_old_files(FileName :: string(), Days :: integer()) -> no_return().
delete_old_files(FileName, Days) when is_list(FileName), is_integer(Days) ->
Seconds0 = calendar:datetime_to_gregorian_seconds(calendar:local_time()),
Seconds = Seconds0 - Days * 86400,
lists:foreach(fun(Day) ->
{Date, _} = calendar:gregorian_seconds_to_datetime(Seconds - Day * 86400),
FilePath = make_file(FileName, Date),
filelib:is_file(FilePath) andalso file:delete(FilePath)
end, lists:seq(1, 10)).

View File

@ -0,0 +1,199 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% 1.
%%% 2. host进程不能直接去监听topic线
%%% @end
%%% Created : 12. 3 2023 21:27
%%%-------------------------------------------------------------------
-module(aircon_mqtt_subscriber).
-author("aresei").
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%%
-define(Topics,[
{<<"/aircon/+/data">>, 2}
]).
-record(state, {
conn_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
%% emqx服务器的连接
Opts = emqx_opts(<<"aircon-data-subscriber">>),
lager:debug("[opts] is: ~p", [Opts]),
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
lager:debug("[mqtt_subscriber] start conntecting, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid),
lager:debug("[mqtt_subscriber] connect success"),
%%
SubscribeResult = emqtt:subscribe(ConnPid, ?Topics),
lager:debug("[mqtt_subscriber] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]),
{ok, #state{conn_pid = ConnPid}};
ignore ->
lager:debug("[mqtt_subscriber] connect emqx get ignore"),
{stop, ignore};
{error, Reason} ->
lager:debug("[mqtt_subscriber] connect emqx get error: ~p", [Reason]),
{stop, Reason}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({disconnect, ReasonCode, Properties}, State = #state{}) ->
lager:debug("[mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State};
%% json反序列需要在host进程进行
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) ->
lager:debug("[mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]),
%% publisher进程去处理, ; topic格式: /aircon/20525456021829/data
case Topic of
<<"/aircon/", DeviceInfo/binary>> ->
case binary:split(DeviceInfo, <<"/">>) of
[DeviceMac, <<"data">>] ->
dispatch(DeviceMac, Payload);
_ ->
ok
end;
_ ->
ok
end,
{noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[mqtt_subscriber] receive puback packet: ~p", [Packet]),
{noreply, State};
handle_info(Info, State = #state{}) ->
lager:debug("[mqtt_subscriber] get info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
%% topic的订阅
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
ok = emqtt:disconnect(ConnPid),
lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok;
terminate(Reason, _State) ->
lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec emqx_opts(ClientSuffix :: binary()) -> list().
emqx_opts(ClientSuffix) when is_binary(ClientSuffix) ->
%% emqx服务器的连接
{ok, Props} = application:get_env(aircon, emqx_server),
EMQXHost = proplists:get_value(host, Props),
EMQXPort = proplists:get_value(port, Props, 1883),
Username = proplists:get_value(username, Props),
Password = proplists:get_value(password, Props),
RetryInterval = proplists:get_value(retry_interval, Props, 5),
Keepalive = proplists:get_value(keepalive, Props, 86400),
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-", ClientSuffix/binary>>,
[
{clientid, ClientId},
{host, EMQXHost},
{port, EMQXPort},
{owner, self()},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, Keepalive},
{auto_ack, true},
{proto_ver, v3},
{retry_interval, RetryInterval}
].
-spec dispatch(DeviceMac :: binary(), Message :: binary()) -> no_return().
dispatch(DeviceMac, Message) when is_binary(DeviceMac), is_binary(Message) ->
case power_gateway_args:get_device_uuid(DeviceMac) of
error ->
lager:notice("[mqtt_subscriber] device_mac: ~p, device_uuid not found", [DeviceMac]);
{ok, DeviceUUID} ->
case power_device_sup:ensure_device_started(DeviceUUID) of
{ok, DevicePid} ->
power_device:metric_data(DevicePid, Message);
{error, Reason} ->
lager:notice("[mqtt_subscriber] start device get error: ~p", [Reason])
end
end.

View File

@ -26,10 +26,75 @@ start_link() ->
%% type => worker(), % optional
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_all,
intensity => 0,
period => 1},
ChildSpecs = [],
SupFlags = #{strategy => one_for_all, intensity => 0, period => 1},
{ok, EfkaServer} = application:get_env(aircon, efka_server),
Host = proplists:get_value(host, EfkaServer),
Port = proplists:get_value(port, EfkaServer),
%%
RegisterName = read_service_name(),
ChildSpecs = [
#{
id => 'efka_client',
start => {'efka_client', start_link, [RegisterName, Host, Port]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['efka_client']
},
#{
id => aircon_logger,
start => {'aircon_logger', start_link, ["aircon_data"]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['aircon_logger']
},
#{
id => 'aircon_args',
start => {'aircon_args', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['aircon_args']
},
%#{
% id => 'power_device_sup',
% start => {'power_device_sup', start_link, []},
% restart => permanent,
% shutdown => 2000,
% type => worker,
% modules => ['power_device_sup']
%},
#{
id => 'aircon_mqtt_subscriber',
start => {'aircon_mqtt_subscriber', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['aircon_mqtt_subscriber']
}
],
{ok, {SupFlags, ChildSpecs}}.
%% internal functions
%%
read_service_name() ->
%%
RootDir = code:root_dir(),
VersionFile = RootDir ++ "/.version",
lager:debug("[aircon_sup] version path is: ~p", [VersionFile]),
case file:read_file(VersionFile) of
{ok, RegisterName0} ->
string:trim(RegisterName0);
{error, Reason} ->
lager:warning("[aircon_sup] read .version file get error: ~p", [Reason]),
<<"aircon">>
end.

View File

@ -0,0 +1,431 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. 8 2023 15:39
%%%-------------------------------------------------------------------
-module(efka_client).
-author("aresei").
-behaviour(gen_server).
%%
-record(efka_packet, {
packet_id :: integer(),
type :: integer(),
message :: any()
}).
%%
-define(EFKA_REQUEST_TIMEOUT, 5000).
%%
%%
-define(PACKET_TYPE_REGISTER, 16).
%%
-define(PACKET_TYPE_METRIC_DATA, 3).
%%
-define(PACKET_TYPE_INVOKE, 4).
%%
-define(PACKET_TYPE_RESPONSE, 7).
%% efka下发给微服务参数
-define(PACKET_TYPE_PUSH_PARAM, 5).
%% efka下发给微服务采集项
-define(PACKET_TYPE_PUSH_METRIC, 6).
%% : : 2025-4-16
-define(PACKET_TYPE_POLL, 20).
%% efka发送log消息
-define(PACKET_TYPE_LOG, 9).
%% efka获取自身的采集项
-define(PACKET_TYPE_REQUEST_METRIC, 10).
%% efka获取自身的参数
-define(PACKET_TYPE_REQUEST_PARAM, 12).
%% efka向微服务发送stream-call消息
-define(PACKET_TYPE_PUSH_STREAM_CALL, 11).
%%
-define(PACKET_TYPE_EVENT, 15).
%% API
-export([start_link/3]).
-export([device_offline/1, device_online/1]).
-export([send_metric_data/2, invoke_service/3, send_log/1, request_metric/0, request_param/0, send_event/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
packet_id = 0 :: integer(),
host :: string(),
port :: integer(),
%%
inflight = #{} :: map(),
socket :: gen_tcp:socket()
}).
-spec send_metric_data(Fields :: list(), Tags :: #{}) -> {ok, Result :: any()} | {error, Reason :: any()}.
send_metric_data(Fields, Tags) when is_list(Fields), is_map(Tags) ->
{ok, Ref} = gen_server:call(?MODULE, {send_metric_data, self(), Fields, Tags}),
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
-spec invoke_service(ToService :: binary(), Message :: map(), Timeout :: integer()) ->
{ok, Result :: any()} | {error, Reason :: any()}.
invoke_service(ToService, Message, Timeout) when is_binary(ToService), is_map(Message), is_integer(Timeout) ->
{ok, Ref} = gen_server:call(?MODULE, {invoke_service, self(), ToService, Message, Timeout}),
await_reply(Ref, ?EFKA_REQUEST_TIMEOUT).
-spec send_log(Message :: binary() | map()) -> no_return().
send_log(Message) when is_binary(Message); is_map(Message) ->
gen_server:cast(?MODULE, {send_log, Message}).
%% efka_server为了统一r对象为字符串2json_decode
-spec request_metric() -> {ok, Result :: list()} | {error, Reason :: any()}.
request_metric() ->
{ok, Ref} = gen_server:call(?MODULE, {request_metric, self()}),
case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of
{ok, Reply} ->
{ok, jiffy:decode(Reply, [return_maps])};
Error ->
Error
end.
-spec request_param() -> {ok, Result :: map()} | {error, Reason :: any()}.
request_param() ->
{ok, Ref} = gen_server:call(?MODULE, {request_param, self()}),
case await_reply(Ref, ?EFKA_REQUEST_TIMEOUT) of
{ok, Reply} ->
{ok, jiffy:decode(Reply, [return_maps])};
Error ->
Error
end.
-spec device_offline(DeviceUUID :: binary()) -> no_return().
device_offline(DeviceUUID) when is_binary(DeviceUUID) ->
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 0}).
-spec device_online(DeviceUUID :: binary()) -> no_return().
device_online(DeviceUUID) when is_binary(DeviceUUID) ->
send_event(1, #{<<"device_uuid">> => DeviceUUID, <<"status">> => 1}).
-spec send_event(EventType :: integer(), Params :: binary() | map()) -> no_return().
send_event(EventType, Params) when is_integer(EventType), is_binary(Params); is_map(Params) ->
gen_server:cast(?MODULE, {send_event, EventType, Params}).
%%%===================================================================
%%% API
%%%===================================================================
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: any()} | {error, Reason :: any()}.
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
receive
{response, Ref, {ok, Reply}} ->
{ok, Reply};
{response, Ref, {error, Reason}} ->
{error, Reason}
after
Timeout ->
{error, timeout}
end.
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(RegisterName :: binary(), Host :: string(), Port :: integer()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(RegisterName, Host, Port) when is_binary(RegisterName), is_list(Host), is_integer(Port) ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [RegisterName, Host, Port], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([RegisterName, Host, Port]) ->
{ok, Socket} = gen_tcp:connect(Host, Port, [binary, {packet, 4}, {active, true}]),
ok = gen_tcp:controlling_process(Socket, self()),
case do_register(RegisterName, Socket) of
ok ->
{ok, #state{packet_id = 1, host = Host, port = Port, socket = Socket}};
{error, Reason} ->
{stop, Reason}
end.
%% efka服务器的注册
do_register(RegisterName, Socket) ->
PacketId = 0,
Body = #{<<"name">> => RegisterName},
Packet = pack(PacketId, ?PACKET_TYPE_REGISTER, Body),
ok = gen_tcp:send(Socket, Packet),
receive
{tcp, Socket, Data} ->
RegisterPacket = unpack(Data),
lager:debug("[efka_client] get register reply packet: ~p", [RegisterPacket]),
case RegisterPacket of
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 1, <<"r">> := <<"ok">>}} ->
ok;
#efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = #{<<"c">> := 0, <<"e">> := Error}} ->
{error, Error};
_ ->
{error, invalid_register_packet}
end
after
?EFKA_REQUEST_TIMEOUT ->
{error, timeout}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({send_metric_data, ReceiverPid, Fields, Tags}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Body = #{<<"data">> => Fields, <<"tag">> => Tags},
Packet = pack(PacketId, ?PACKET_TYPE_METRIC_DATA, Body),
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({invoke_service, ReceiverPid, ToService, Message, Timeout}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Body = #{
<<"to">> => ToService,
<<"t">> => Timeout,
<<"m">> => Message
},
Packet = pack(PacketId, ?PACKET_TYPE_INVOKE, Body),
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({request_metric, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_METRIC),
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}};
handle_call({request_param, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Packet = pack(PacketId, ?PACKET_TYPE_REQUEST_PARAM),
ok = gen_tcp:send(Socket, Packet),
Ref = make_ref(),
{reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({send_log, Message}, State = #state{socket = Socket, packet_id = PacketId}) ->
Body = #{<<"l">> => Message},
Packet = pack(PacketId, ?PACKET_TYPE_LOG, Body),
ok = gen_tcp:send(Socket, Packet),
{noreply, State#state{packet_id = next_packet_id(PacketId)}};
handle_cast({send_event, EventType, Params}, State = #state{socket = Socket}) ->
Body = #{
<<"event_type">> => EventType,
<<"params">> => Params
},
Packet = pack(0, ?PACKET_TYPE_EVENT, Body),
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
handle_cast(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({tcp, Socket, Data}, State = #state{socket = Socket}) ->
Packet = unpack(Data),
self() ! {handle_packet, Packet},
{noreply, State};
handle_info({tcp_closed, Socket}, State = #state{socket = Socket}) ->
{stop, tcp_closed, State};
%%
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_RESPONSE, message = Message}}, State = #state{inflight = Inflight}) ->
case maps:take(PacketId, Inflight) of
error ->
{noreply, State};
{{Ref, ReceiverPid}, NInflight} ->
case Message of
#{<<"c">> := 1, <<"r">> := Result} ->
ReceiverPid ! {response, Ref, {ok, Result}};
#{<<"c">> := 0, <<"e">> := Error} ->
ReceiverPid ! {response, Ref, {error, Error}}
end,
{noreply, State#state{inflight = NInflight}}
end;
%% efka推送的参数设置
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_PARAM, message = Params}},
State = #state{socket = Socket}) when is_map(Params) ->
Message = case handle_param(Params) of
ok ->
#{<<"c">> => 1, <<"r">> => <<"ok">>};
{error, Reason} ->
#{<<"c">> => 0, <<"e">> => Reason}
end,
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% efka推送的采集项消息
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_METRIC, message = Metrics}},
State = #state{socket = Socket}) when is_list(Metrics) ->
Message = case handle_metric(Metrics) of
ok ->
#{<<"c">> => 1, <<"r">> => <<"ok">>};
{error, Reason} when is_binary(Reason) ->
#{<<"c">> => 0, <<"e">> => Reason}
end,
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%%
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_POLL, message = Command}}, State = #state{socket = Socket}) ->
Message = handle_poll_command(Command),
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%% efka的stream-call消息
handle_info({handle_packet, #efka_packet{packet_id = PacketId, type = ?PACKET_TYPE_PUSH_STREAM_CALL,
message = Msg = #{<<"service_name">> := ServiceName, <<"data">> := Data, <<"tag">> := Tag}}}, State = #state{socket = Socket}) ->
Message = case handle_stream_call(ServiceName, Data, Tag) of
{continue, NewServiceName, NewData, NewTag} ->
#{
<<"c">> => 1,
<<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag},
<<"k">> => true
};
%%
{break, NewServiceName, NewData, NewTag} ->
#{
<<"c">> => 1,
<<"r">> => Msg#{<<"service_name">> := NewServiceName, <<"data">> => NewData, <<"tag">> => NewTag},
<<"k">> => false
};
error ->
#{
<<"c">> => 0,
<<"r">> => Msg,
<<"k">> => true
}
end,
Packet = pack(PacketId, ?PACKET_TYPE_RESPONSE, Message),
ok = gen_tcp:send(Socket, Packet),
{noreply, State};
%%
handle_info({handle_packet, _Packet}, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{socket = Socket}) ->
gen_tcp:close(Socket),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer().
next_packet_id(PacketId) when PacketId >= 65535 ->
0;
next_packet_id(PacketId) ->
PacketId + 1.
-spec pack(PacketId :: integer(), Type :: integer()) -> binary().
pack(PacketId, Type) when is_integer(PacketId), is_integer(Type) ->
<<PacketId:16, Type:8>>.
-spec pack(PacketId :: integer(), Type :: integer(), Body :: map()) -> binary().
pack(PacketId, Type, Body) when is_integer(PacketId), is_integer(Type), is_map(Body) ->
Message = iolist_to_binary(jiffy:encode(Body, [force_utf8])),
<<PacketId:16, Type:8, Message/binary>>.
-spec unpack(binary()) -> #efka_packet{}.
unpack(<<PacketId:16, Type:8, Body/binary>>) ->
Message = catch jiffy:decode(Body, [return_maps]),
#efka_packet{packet_id = PacketId, type = Type, message = Message}.
%%%===================================================================
%%% simple callbacks
%%%===================================================================
handle_poll_command(#{<<"device_uuid">> := DeviceUUID, <<"command">> := <<"query_status">>}) when is_binary(DeviceUUID) ->
case power_device:get_pid(DeviceUUID) of
undefined ->
#{
<<"c">> => 1,
<<"r">> => #{
<<"edge_status">> => -1,
<<"message">> => <<"设备信息不存在"/utf8>>
}
};
Pid ->
StatusMap = #{
0 => <<"离线"/utf8>>,
1 => <<"在线"/utf8>>
},
{ok, Status} = power_device:poll_status(Pid),
#{
<<"c">> => 1,
<<"r">> => #{
<<"edge_status">> => Status,
<<"message">> => maps:get(Status, StatusMap)
}
}
end.
-spec handle_param(Params :: map()) -> ok | {error, Reason :: binary()}.
handle_param(Params) when is_map(Params) ->
power_gateway_args:push_param(Params),
ok.
-spec handle_metric(Metric :: list()) -> ok | {error, Reason :: binary()}.
handle_metric(Metric) when is_list(Metric) ->
power_gateway_args:push_metric(Metric),
ok.
-spec handle_stream_call(ServiceName :: binary(), Fields :: list(), Tag :: map()) ->
{continue, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()}
| {break, NewServiceName :: binary(), NewFields :: list(), NewTag :: map()}
| error.
handle_stream_call(ServiceName, Fields, Tag) when is_binary(ServiceName), is_list(Fields), is_map(Tag) ->
{continue, ServiceName, Fields, Tag}.

21
config/sys-dev.config Normal file
View File

@ -0,0 +1,21 @@
[
{aircon, [
%% 离线判断时间间隔,单位:(秒)
{heartbeat_ticker, 120},
{emqx_server, [
{host, "39.98.184.67"},
{port, 1883},
{tcp_opts, []},
{username, "test"},
{password, "test1234"},
{keepalive, 86400},
{retry_interval, 5}
]},
{efka_server, [
{host, "39.98.184.67"},
{port, 3361}
]}
]}
].

22
config/sys-prod.config Normal file
View File

@ -0,0 +1,22 @@
[
{aircon, [
%% 离线判断时间间隔,单位:(秒)
{heartbeat_ticker, 120},
{emqx_server, [
{host, "172.30.37.212"},
{port, 1883},
{tcp_opts, []},
{username, "aircon"},
{password, "A9K2rM8QxL7WZsP5D@B!3"},
{keepalive, 86400},
{retry_interval, 5}
]},
{efka_server, [
{host, "39.98.184.67"},
{port, 3361}
]}
]}
].

View File

@ -1,3 +0,0 @@
[
{aircon, []}
].

View File

@ -0,0 +1,30 @@
采集数据推送
topic:
/aircon/20525456021829/data
massage:
[
{"key":"OnOff","value":0,"unit":10,"type":"AI","timestamp":1767747017459,"name":"开关状态","label":"开关状态","sessionid":760143116038215},
{"key":"WorkModel","value":"0","unit":10,"type":"AI","timestamp":1767747017567,"name":"运行模式","label":"运行模式","sessionid":760143116480581},
{"key":"WindSpeed","value":0,"unit":10,"type":"AI","timestamp":1767747017567,"name":"设定风速","label":"设定风速","sessionid":760143116480582},
{"key":"Temperature","value":17.0,"unit":10,"type":"AI","timestamp":1767747017567,"name":"设定温度","label":"设定温度","sessionid":760143116480583},
{"key":"ValueTime","value":"2025-04-10 16:28:57","unit":10,"type":"AI","timestamp":1767747017567,"name":"采集时间","label":"采集时间","sessionid":760143116480584},
{"key":"Zvalue","value":0.000198,"unit":5,"type":"AI","timestamp":1767747017567,"name":"累计运行能耗","label":"累计运行能耗","sessionid":760143116480585},
{"key":"ShareValue","value":0.0,"unit":5,"type":"AI","timestamp":1767747017567,"name":"累计分摊能耗","label":"累计分摊能耗","sessionid":760143116480586}
]
控制指令下发
topic:
/aircon/20525456021829/command
massage:
{"key":"00000005","value":"25","timestamp":1767747017459,"name":"调温","sessionid":760143116038215}
控制指令应答
topic:
/aircon/20525456021829/command_reply
massage:
{"sessionid":760143116038215,"code":"0","msg":"指令流转失败","timestamp":1767747017459}
{"sessionid":760143116038215,"code":"1","msg":"指令下发成功","timestamp":1767747017459}
{"sessionid":760143116038215,"code":"2","msg":"指令响应超时","timestamp":1767747017459}
{"sessionid":760143116038215,"code":"3","msg":"指令下发失败","timestamp":1767747017459}