Merge pull request 'command_mst' (#1) from command_mst into master
Reviewed-on: http://101.43.184.190:3000/alc/iot/pulls/1
This commit is contained in:
commit
d318ed42f5
@ -44,10 +44,6 @@
|
|||||||
-define(PACKET_PUBLISH, 16#03).
|
-define(PACKET_PUBLISH, 16#03).
|
||||||
-define(PACKET_PUBLISH_RESPONSE, 16#04).
|
-define(PACKET_PUBLISH_RESPONSE, 16#04).
|
||||||
|
|
||||||
%% 服务端指令
|
|
||||||
-define(PACKET_COMMAND, 16#05).
|
|
||||||
-define(PACKET_COMMAND_RESPONSE, 16#06).
|
|
||||||
|
|
||||||
%% 事件类型
|
%% 事件类型
|
||||||
-define(EVENT_DEVICE, 16#01).
|
-define(EVENT_DEVICE, 16#01).
|
||||||
%% 主机的相关事件
|
%% 主机的相关事件
|
||||||
@ -56,6 +52,9 @@
|
|||||||
%% ai相关的事件
|
%% ai相关的事件
|
||||||
-define(EVENT_AI, 16#03).
|
-define(EVENT_AI, 16#03).
|
||||||
|
|
||||||
|
%% 指令相关
|
||||||
|
-define(DIRECTIVE_ZD_CTRL, 16#01).
|
||||||
|
|
||||||
%% 缓存数据库表
|
%% 缓存数据库表
|
||||||
-record(kv, {
|
-record(kv, {
|
||||||
key :: binary(),
|
key :: binary(),
|
||||||
|
|||||||
256
apps/iot/src/consumer/iot_zd_consumer.erl
Normal file
256
apps/iot/src/consumer/iot_zd_consumer.erl
Normal file
@ -0,0 +1,256 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author aresei
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享
|
||||||
|
%%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 12. 3月 2023 21:27
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_zd_consumer).
|
||||||
|
-author("aresei").
|
||||||
|
-include("iot.hrl").
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/0]).
|
||||||
|
-export([mock/0, mock/1]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
-define(RETRY_INTERVAL, 5000).
|
||||||
|
|
||||||
|
%% 执行超时时间
|
||||||
|
-define(EXECUTE_TIMEOUT, 10 * 1000).
|
||||||
|
|
||||||
|
%% 需要订阅的主题信息
|
||||||
|
-define(Topics,[
|
||||||
|
{<<"CET/NX/download">>, 2}
|
||||||
|
]).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
conn_pid :: pid(),
|
||||||
|
logger_pid :: pid(),
|
||||||
|
mqtt_props :: list(),
|
||||||
|
is_connected = false
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
mock() ->
|
||||||
|
LocationCode = <<"0001000290040150004002560">>,
|
||||||
|
mock(LocationCode).
|
||||||
|
|
||||||
|
mock(LocationCode) when is_binary(LocationCode) ->
|
||||||
|
Req = #{
|
||||||
|
<<"version">> => <<"1.0">>,
|
||||||
|
<<"ts">> => iot_util:current_time(),
|
||||||
|
<<"properties">> => #{
|
||||||
|
<<"type">> => <<"ctrl">>,
|
||||||
|
<<"stype">> => 0,
|
||||||
|
<<"ctype">> => 1,
|
||||||
|
<<"value">> => 1234,
|
||||||
|
<<"timestamp">> => iot_util:current_time()
|
||||||
|
},
|
||||||
|
<<"location_code">> => LocationCode
|
||||||
|
},
|
||||||
|
gen_server:call(?MODULE, {mock, Req}).
|
||||||
|
|
||||||
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
|
-spec(start_link() ->
|
||||||
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Initializes the server
|
||||||
|
-spec(init(Args :: term()) ->
|
||||||
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term()} | ignore).
|
||||||
|
init([]) ->
|
||||||
|
erlang:process_flag(trap_exit, true),
|
||||||
|
|
||||||
|
{ok, Props} = application:get_env(iot, zhongdian),
|
||||||
|
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||||
|
erlang:start_timer(0, self(), create_consumer),
|
||||||
|
%% 启动日志记录器
|
||||||
|
{ok, LoggerPid} = iot_logger:start_link("directive_data"),
|
||||||
|
|
||||||
|
{ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling call messages
|
||||||
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
|
State :: #state{}) ->
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_call({mock, Request}, _From, State = #state{conn_pid = _ConnPid}) ->
|
||||||
|
publish_directive(Request),
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling cast messages
|
||||||
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_cast(_Request, State = #state{}) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling all non call/cast messages
|
||||||
|
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_info({disconnect, ReasonCode, Properties}, State) ->
|
||||||
|
lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
||||||
|
{stop, disconnected, State};
|
||||||
|
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
||||||
|
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
||||||
|
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]),
|
||||||
|
|
||||||
|
Request = catch jiffy:decode(Payload, [return_maps]),
|
||||||
|
publish_directive(Request),
|
||||||
|
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
||||||
|
lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, is_connected = false}) ->
|
||||||
|
try
|
||||||
|
{ok, ConnPid} = create_consumer(Props),
|
||||||
|
{noreply, State#state{conn_pid = ConnPid}}
|
||||||
|
catch _:Error:Stack ->
|
||||||
|
lager:warning("[iot_zd_consumer] config: ~p, create consumer get error: ~p, stack: ~p", [Props, Error, Stack]),
|
||||||
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
|
||||||
|
{noreply, State#state{conn_pid = undefined}}
|
||||||
|
end;
|
||||||
|
|
||||||
|
%% postman进程挂掉时,重新建立新的
|
||||||
|
handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||||
|
lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]),
|
||||||
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer),
|
||||||
|
|
||||||
|
{noreply, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
|
handle_info({directive_reply, Reply}, State = #state{}) ->
|
||||||
|
lager:debug("[iot_zd_consumer] get directive_reply: ~ts", [Reply]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info(Info, State = #state{}) ->
|
||||||
|
lager:debug("[iot_zd_consumer] get info: ~p", [Info]),
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
|
%% with Reason. The return value is ignored.
|
||||||
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
|
State :: #state{}) -> term()).
|
||||||
|
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
|
||||||
|
%% 取消topic的订阅
|
||||||
|
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
|
||||||
|
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
|
||||||
|
|
||||||
|
ok = emqtt:disconnect(ConnPid),
|
||||||
|
lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]),
|
||||||
|
ok;
|
||||||
|
terminate(Reason, _State) ->
|
||||||
|
lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]),
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
|
Extra :: term()) ->
|
||||||
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}) ->
|
||||||
|
%% 通过LocationCode查找到主机和Device_uuid
|
||||||
|
case redis_client:hgetall(LocationCode) of
|
||||||
|
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
|
||||||
|
case iot_host:get_pid(HostUUID) of
|
||||||
|
undefined ->
|
||||||
|
lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]);
|
||||||
|
Pid ->
|
||||||
|
ReceiverPid = self(),
|
||||||
|
spawn(fun() ->
|
||||||
|
DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT),
|
||||||
|
ReceiverPid ! {directive_reply, DirectiveResult}
|
||||||
|
end)
|
||||||
|
end;
|
||||||
|
{ok, Map} ->
|
||||||
|
lager:notice("[iot_zd_consumer] location_code: ~p, redis data invalid: ~p", [LocationCode, Map]);
|
||||||
|
_ ->
|
||||||
|
lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode])
|
||||||
|
end;
|
||||||
|
publish_directive(Other) ->
|
||||||
|
lager:notice("[iot_zd_consumer] get a unknown directive", [Other]).
|
||||||
|
|
||||||
|
-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
|
||||||
|
create_consumer(Props) when is_list(Props) ->
|
||||||
|
Node = atom_to_binary(node()),
|
||||||
|
ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt_consumer">>,
|
||||||
|
|
||||||
|
%% 建立到emqx服务器的连接
|
||||||
|
Host = proplists:get_value(host, Props),
|
||||||
|
Port = proplists:get_value(port, Props, 18080),
|
||||||
|
Username = proplists:get_value(username, Props),
|
||||||
|
Password = proplists:get_value(password, Props),
|
||||||
|
Keepalive = proplists:get_value(keepalive, Props, 86400),
|
||||||
|
|
||||||
|
Opts = [
|
||||||
|
{clientid, ClientId},
|
||||||
|
{host, Host},
|
||||||
|
{port, Port},
|
||||||
|
{owner, self()},
|
||||||
|
{tcp_opts, []},
|
||||||
|
{username, Username},
|
||||||
|
{password, Password},
|
||||||
|
{keepalive, Keepalive},
|
||||||
|
{auto_ack, true},
|
||||||
|
{connect_timeout, 5000},
|
||||||
|
{proto_ver, v5},
|
||||||
|
{retry_interval, 5000}
|
||||||
|
],
|
||||||
|
|
||||||
|
%% 建立到emqx服务器的连接
|
||||||
|
lager:debug("[iot_zd_consumer] opts is: ~p", [Opts]),
|
||||||
|
case emqtt:start_link(Opts) of
|
||||||
|
{ok, ConnPid} ->
|
||||||
|
%% 监听和host相关的全部事件
|
||||||
|
lager:debug("[iot_zd_consumer] start conntecting, pid: ~p", [ConnPid]),
|
||||||
|
{ok, _} = emqtt:connect(ConnPid),
|
||||||
|
lager:debug("[iot_zd_consumer] connect success, pid: ~p", [ConnPid]),
|
||||||
|
SubscribeResult = emqtt:subscribe(ConnPid, ?Topics),
|
||||||
|
lager:debug("[iot_zd_consumer] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]),
|
||||||
|
|
||||||
|
{ok, ConnPid};
|
||||||
|
ignore ->
|
||||||
|
{error, ignore};
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
@ -13,7 +13,7 @@
|
|||||||
-behaviour(gen_statem).
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1]).
|
-export([start_link/0]).
|
||||||
-export([get_pid/0, forward/3, get_stat/0]).
|
-export([get_pid/0, forward/3, get_stat/0]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
@ -57,8 +57,8 @@ get_stat() ->
|
|||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
%% function does not return until Module:init/1 has returned.
|
%% function does not return until Module:init/1 has returned.
|
||||||
start_link(Opts) when is_list(Opts) ->
|
start_link() ->
|
||||||
gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_statem callbacks
|
%%% gen_statem callbacks
|
||||||
@ -68,7 +68,9 @@ start_link(Opts) when is_list(Opts) ->
|
|||||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
%% process to initialize.
|
%% process to initialize.
|
||||||
init([Opts]) ->
|
init([]) ->
|
||||||
|
{ok, Opts} = application:get_env(iot, jinzhi),
|
||||||
|
|
||||||
PoolSize = proplists:get_value(pool_size, Opts),
|
PoolSize = proplists:get_value(pool_size, Opts),
|
||||||
PriFile = proplists:get_value(pri_key, Opts),
|
PriFile = proplists:get_value(pri_key, Opts),
|
||||||
Url = proplists:get_value(url, Opts),
|
Url = proplists:get_value(url, Opts),
|
||||||
|
|||||||
@ -13,7 +13,7 @@
|
|||||||
-behaviour(gen_statem).
|
-behaviour(gen_statem).
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1]).
|
-export([start_link/0]).
|
||||||
-export([get_pid/0, forward/3, get_stat/0]).
|
-export([get_pid/0, forward/3, get_stat/0]).
|
||||||
|
|
||||||
%% gen_statem callbacks
|
%% gen_statem callbacks
|
||||||
@ -57,8 +57,8 @@ get_stat() ->
|
|||||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||||
%% initialize. To ensure a synchronized start-up procedure, this
|
%% initialize. To ensure a synchronized start-up procedure, this
|
||||||
%% function does not return until Module:init/1 has returned.
|
%% function does not return until Module:init/1 has returned.
|
||||||
start_link(Opts) when is_list(Opts) ->
|
start_link() ->
|
||||||
gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
|
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
|
||||||
|
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
%%% gen_statem callbacks
|
%%% gen_statem callbacks
|
||||||
@ -68,7 +68,9 @@ start_link(Opts) when is_list(Opts) ->
|
|||||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||||
%% process to initialize.
|
%% process to initialize.
|
||||||
init([Opts]) ->
|
init([]) ->
|
||||||
|
{ok, Opts} = application:get_env(iot, zhongdian),
|
||||||
|
|
||||||
erlang:process_flag(trap_exit, true),
|
erlang:process_flag(trap_exit, true),
|
||||||
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||||
erlang:start_timer(0, self(), create_postman),
|
erlang:start_timer(0, self(), create_postman),
|
||||||
@ -233,6 +235,9 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields =
|
|||||||
<<"ts">> => Timestamp,
|
<<"ts">> => Timestamp,
|
||||||
<<"properties">> => Fields
|
<<"properties">> => Fields
|
||||||
},
|
},
|
||||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
try
|
||||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||||
ok.
|
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}
|
||||||
|
catch _:_ ->
|
||||||
|
self() ! {ack, Id, <<"json error">>}
|
||||||
|
end.
|
||||||
138
apps/iot/src/iot_api.erl
Normal file
138
apps/iot/src/iot_api.erl
Normal file
@ -0,0 +1,138 @@
|
|||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
%%% @author anlicheng
|
||||||
|
%%% @copyright (C) 2023, <COMPANY>
|
||||||
|
%%% @doc
|
||||||
|
%%%
|
||||||
|
%%% @end
|
||||||
|
%%% Created : 24. 12月 2023 15:42
|
||||||
|
%%%-------------------------------------------------------------------
|
||||||
|
-module(iot_api).
|
||||||
|
-author("anlicheng").
|
||||||
|
|
||||||
|
-behaviour(gen_server).
|
||||||
|
|
||||||
|
%% API
|
||||||
|
-export([start_link/0]).
|
||||||
|
-export([ai_event/1]).
|
||||||
|
|
||||||
|
%% gen_server callbacks
|
||||||
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
|
|
||||||
|
-define(SERVER, ?MODULE).
|
||||||
|
-define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>).
|
||||||
|
|
||||||
|
-record(state, {
|
||||||
|
|
||||||
|
}).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% API
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
ai_event(Id) when is_integer(Id) ->
|
||||||
|
gen_server:cast(?MODULE, {ai_event, Id}).
|
||||||
|
|
||||||
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
|
-spec(start_link() ->
|
||||||
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
|
start_link() ->
|
||||||
|
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% gen_server callbacks
|
||||||
|
%%%===================================================================
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Initializes the server
|
||||||
|
-spec(init(Args :: term()) ->
|
||||||
|
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term()} | ignore).
|
||||||
|
init([]) ->
|
||||||
|
{ok, #state{}}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling call messages
|
||||||
|
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||||
|
State :: #state{}) ->
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}} |
|
||||||
|
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_call(_Request, _From, State = #state{}) ->
|
||||||
|
{reply, ok, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling cast messages
|
||||||
|
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
handle_cast({ai_event, Id}, State = #state{}) ->
|
||||||
|
spawn_monitor(fun() ->
|
||||||
|
Token = iot_util:md5(<<?API_TOKEN/binary, (integer_to_binary(Id))/binary, ?API_TOKEN/binary>>),
|
||||||
|
{ok, Url} = application:get_env(iot, api_url),
|
||||||
|
|
||||||
|
Headers = [
|
||||||
|
{<<"content-type">>, <<"application/json">>}
|
||||||
|
],
|
||||||
|
ReqData = #{
|
||||||
|
<<"token">> => Token,
|
||||||
|
<<"id">> => Id
|
||||||
|
},
|
||||||
|
Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])),
|
||||||
|
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
|
||||||
|
{ok, 200, _, ClientRef} ->
|
||||||
|
{ok, RespBody} = hackney:body(ClientRef),
|
||||||
|
lager:debug("[iot_api] send body: ~p, get error is: ~p", [Body, RespBody]),
|
||||||
|
hackney:close(ClientRef);
|
||||||
|
{ok, HttpCode, _, ClientRef} ->
|
||||||
|
{ok, RespBody} = hackney:body(ClientRef),
|
||||||
|
hackney:close(ClientRef),
|
||||||
|
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]);
|
||||||
|
{error, Reason} ->
|
||||||
|
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason])
|
||||||
|
end
|
||||||
|
end),
|
||||||
|
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Handling all non call/cast messages
|
||||||
|
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||||
|
{noreply, NewState :: #state{}} |
|
||||||
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
|
%% Task进程挂掉
|
||||||
|
handle_info({'DOWN', _MRef, process, _Pid, normal}, State) ->
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) ->
|
||||||
|
lager:notice("[iot_api] task process down with reason: ~p", [Reason]),
|
||||||
|
{noreply, State};
|
||||||
|
|
||||||
|
handle_info(_Info, State = #state{}) ->
|
||||||
|
{noreply, State}.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc This function is called by a gen_server when it is about to
|
||||||
|
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||||
|
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||||
|
%% with Reason. The return value is ignored.
|
||||||
|
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||||
|
State :: #state{}) -> term()).
|
||||||
|
terminate(_Reason, _State = #state{}) ->
|
||||||
|
ok.
|
||||||
|
|
||||||
|
%% @private
|
||||||
|
%% @doc Convert process state when code is changed
|
||||||
|
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||||
|
Extra :: term()) ->
|
||||||
|
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||||
|
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||||
|
{ok, State}.
|
||||||
|
|
||||||
|
%%%===================================================================
|
||||||
|
%%% Internal functions
|
||||||
|
%%%===================================================================
|
||||||
@ -22,6 +22,7 @@
|
|||||||
%% API
|
%% API
|
||||||
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
||||||
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
||||||
|
-export([publish_directive/6, send_directive/5]).
|
||||||
-export([create_session/2, attach_channel/2]).
|
-export([create_session/2, attach_channel/2]).
|
||||||
-export([reload_device/2, delete_device/2, activate_device/3]).
|
-export([reload_device/2, delete_device/2, activate_device/3]).
|
||||||
-export([heartbeat/1]).
|
-export([heartbeat/1]).
|
||||||
@ -109,6 +110,45 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(
|
|||||||
{error, Reason}
|
{error, Reason}
|
||||||
end.
|
end.
|
||||||
|
|
||||||
|
-spec publish_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map(), Timeout :: integer()) ->
|
||||||
|
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
||||||
|
publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Timeout)
|
||||||
|
when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams), is_integer(Timeout) ->
|
||||||
|
|
||||||
|
Directive = #{
|
||||||
|
<<"device_uuid">> => DeviceUUID,
|
||||||
|
<<"version">> => Version,
|
||||||
|
<<"directive_type">> => DirectiveType,
|
||||||
|
<<"directive">> => DirectiveParams
|
||||||
|
},
|
||||||
|
|
||||||
|
case gen_statem:call(Pid, {publish_directive, self(), Directive}) of
|
||||||
|
{ok, Ref} ->
|
||||||
|
receive
|
||||||
|
{ws_response, Ref} ->
|
||||||
|
ok;
|
||||||
|
{ws_response, Ref, Response} ->
|
||||||
|
{ok, Response}
|
||||||
|
after Timeout ->
|
||||||
|
{error, timeout}
|
||||||
|
end;
|
||||||
|
{error, Reason} ->
|
||||||
|
{error, Reason}
|
||||||
|
end.
|
||||||
|
|
||||||
|
-spec send_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map()) ->
|
||||||
|
ok | {error, Reason :: any()}.
|
||||||
|
send_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams)
|
||||||
|
when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams) ->
|
||||||
|
|
||||||
|
Directive = #{
|
||||||
|
<<"device_uuid">> => DeviceUUID,
|
||||||
|
<<"version">> => Version,
|
||||||
|
<<"directive_type">> => DirectiveType,
|
||||||
|
<<"directive">> => DirectiveParams
|
||||||
|
},
|
||||||
|
gen_statem:call(Pid, {send_directive, Directive}).
|
||||||
|
|
||||||
%% 设备管理相关
|
%% 设备管理相关
|
||||||
|
|
||||||
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
|
||||||
@ -232,6 +272,38 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid =
|
|||||||
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]),
|
||||||
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
|
||||||
|
|
||||||
|
%% 发送指令时, 指令要通过aes加密,必须要求session是存在的
|
||||||
|
handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ACTIVATED,
|
||||||
|
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
||||||
|
|
||||||
|
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
||||||
|
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
||||||
|
%% 通过websocket发送请求
|
||||||
|
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>),
|
||||||
|
|
||||||
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
|
%% 其他情况下,发送指令是失败的
|
||||||
|
handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{uuid = UUID}) ->
|
||||||
|
lager:debug("[iot_host] uuid: ~p, publish_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]),
|
||||||
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
||||||
|
|
||||||
|
%% 发送指令时, 指令要通过aes加密,必须要求session是存在的
|
||||||
|
handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED,
|
||||||
|
State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) ->
|
||||||
|
|
||||||
|
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
||||||
|
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
||||||
|
%% 通过websocket发送请求
|
||||||
|
ws_channel:send(ChannelPid, <<16:8, Directive/binary>>),
|
||||||
|
|
||||||
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
|
%% 其他情况下,发送指令是失败的
|
||||||
|
handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = UUID}) ->
|
||||||
|
lager:debug("[iot_host] uuid: ~p, send_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]),
|
||||||
|
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
|
||||||
|
|
||||||
%% 激活主机
|
%% 激活主机
|
||||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) ->
|
||||||
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
||||||
@ -418,7 +490,12 @@ handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #stat
|
|||||||
|
|
||||||
%% 保存数据到mysql
|
%% 保存数据到mysql
|
||||||
Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
|
Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
|
||||||
ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message),
|
case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of
|
||||||
|
{ok, LogId} ->
|
||||||
|
iot_api:ai_event(LogId);
|
||||||
|
_ ->
|
||||||
|
ok
|
||||||
|
end,
|
||||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
||||||
|
|
||||||
iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
|
iot_ai_router:route_uuid(DeviceUUID, EventType, Params)
|
||||||
|
|||||||
@ -26,11 +26,17 @@ start_link() ->
|
|||||||
%% type => worker(), % optional
|
%% type => worker(), % optional
|
||||||
%% modules => modules()} % optional
|
%% modules => modules()} % optional
|
||||||
init([]) ->
|
init([]) ->
|
||||||
{ok, MqttOpts} = application:get_env(iot, zhongdian),
|
|
||||||
{ok, JinZhiOpts} = application:get_env(iot, jinzhi),
|
|
||||||
|
|
||||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||||
Specs = [
|
Specs = [
|
||||||
|
#{
|
||||||
|
id => 'iot_api',
|
||||||
|
start => {'iot_api', start_link, []},
|
||||||
|
restart => permanent,
|
||||||
|
shutdown => 2000,
|
||||||
|
type => supervisor,
|
||||||
|
modules => ['iot_api']
|
||||||
|
},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'iot_database_buffer',
|
id => 'iot_database_buffer',
|
||||||
start => {'iot_database_buffer', start_link, []},
|
start => {'iot_database_buffer', start_link, []},
|
||||||
@ -60,16 +66,25 @@ init([]) ->
|
|||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'iot_zd_endpoint',
|
id => 'iot_zd_endpoint',
|
||||||
start => {'iot_zd_endpoint', start_link, [MqttOpts]},
|
start => {'iot_zd_endpoint', start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
shutdown => 2000,
|
shutdown => 2000,
|
||||||
type => worker,
|
type => worker,
|
||||||
modules => ['iot_zd_endpoint']
|
modules => ['iot_zd_endpoint']
|
||||||
},
|
},
|
||||||
|
|
||||||
|
%#{
|
||||||
|
% id => 'iot_zd_consumer',
|
||||||
|
% start => {'iot_zd_consumer', start_link, []},
|
||||||
|
% restart => permanent,
|
||||||
|
% shutdown => 2000,
|
||||||
|
% type => worker,
|
||||||
|
% modules => ['iot_zd_consumer']
|
||||||
|
%},
|
||||||
|
|
||||||
#{
|
#{
|
||||||
id => 'iot_jinzhi_endpoint',
|
id => 'iot_jinzhi_endpoint',
|
||||||
start => {'iot_jinzhi_endpoint', start_link, [JinZhiOpts]},
|
start => {'iot_jinzhi_endpoint', start_link, []},
|
||||||
restart => permanent,
|
restart => permanent,
|
||||||
shutdown => 2000,
|
shutdown => 2000,
|
||||||
type => worker,
|
type => worker,
|
||||||
|
|||||||
@ -102,7 +102,7 @@ handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode
|
|||||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
||||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||||
ok ->
|
ok ->
|
||||||
ReceiverPid ! {ack, Id},
|
ReceiverPid ! {ack, Id, Message},
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
{ok, PacketId} ->
|
{ok, PacketId} ->
|
||||||
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};
|
{noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}};
|
||||||
|
|||||||
@ -10,7 +10,7 @@
|
|||||||
-author("aresei").
|
-author("aresei").
|
||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([hget/2]).
|
-export([hget/2, hgetall/1]).
|
||||||
|
|
||||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||||
%% HashTable处理
|
%% HashTable处理
|
||||||
@ -18,4 +18,25 @@
|
|||||||
|
|
||||||
-spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}.
|
-spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}.
|
||||||
hget(Key, Field) when is_binary(Key), is_binary(Field) ->
|
hget(Key, Field) when is_binary(Key), is_binary(Field) ->
|
||||||
poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end).
|
poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end).
|
||||||
|
|
||||||
|
-spec hgetall(Key :: binary()) -> {ok, Fields :: map()} | {error, Reason :: binary()}.
|
||||||
|
hgetall(Key) when is_binary(Key) ->
|
||||||
|
poolboy:transaction(redis_pool, fun(Conn) ->
|
||||||
|
case eredis:q(Conn, ["HGETALL", Key]) of
|
||||||
|
{ok, Items} ->
|
||||||
|
{ok, to_map(Items)};
|
||||||
|
Error ->
|
||||||
|
Error
|
||||||
|
end
|
||||||
|
end).
|
||||||
|
|
||||||
|
|
||||||
|
to_map(Items) when is_list(Items), length(Items) rem 2 == 0 ->
|
||||||
|
to_map(Items, #{}).
|
||||||
|
to_map([], Target) ->
|
||||||
|
Target;
|
||||||
|
to_map([K, V|Tail], Target) ->
|
||||||
|
to_map(Tail, Target#{K => V}).
|
||||||
|
|
||||||
|
|
||||||
|
|||||||
@ -18,6 +18,8 @@
|
|||||||
{port, 18080}
|
{port, 18080}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
{api_url, "http://39.98.184.67:8800/api/v1/taskLog"},
|
||||||
|
|
||||||
%% 目标服务器地址
|
%% 目标服务器地址
|
||||||
{emqx_server, [
|
{emqx_server, [
|
||||||
{host, {39, 98, 184, 67}},
|
{host, {39, 98, 184, 67}},
|
||||||
|
|||||||
@ -23,6 +23,8 @@
|
|||||||
{<<"test">>, <<"iot2023">>}
|
{<<"test">>, <<"iot2023">>}
|
||||||
]},
|
]},
|
||||||
|
|
||||||
|
{api_url, "https://lgsiot.njau.edu.cn/api/v1/taskLog"},
|
||||||
|
|
||||||
%% 配置中电的数据转发, mqtt协议
|
%% 配置中电的数据转发, mqtt协议
|
||||||
{zhongdian, [
|
{zhongdian, [
|
||||||
{host, "172.30.6.161"},
|
{host, "172.30.6.161"},
|
||||||
|
|||||||
@ -233,6 +233,14 @@ Body: 事件内容,AES加密
|
|||||||
|
|
||||||
## 指令说明
|
## 指令说明
|
||||||
|
|
||||||
|
### 指令返回格式说明(按照json_rpc 2.0的规范)
|
||||||
|
|
||||||
|
```text
|
||||||
|
成功: {"result": map | array | string | any}
|
||||||
|
|
||||||
|
失败: {"error": {code: int, message: "错误描述"}}
|
||||||
|
```
|
||||||
|
|
||||||
### 服务器对主机推送的指令格式
|
### 服务器对主机推送的指令格式
|
||||||
|
|
||||||
<<0x05, PacketId:4, Body:任意长度>>
|
<<0x05, PacketId:4, Body:任意长度>>
|
||||||
@ -246,12 +254,14 @@ Body: 事件内容,AES加密
|
|||||||
{
|
{
|
||||||
"device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式
|
"device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式
|
||||||
"version": "1.0",
|
"version": "1.0",
|
||||||
"command_type": 0x01, // 中电计费电表控制
|
"directive_type": 0x01, // 中电计费电表控制
|
||||||
"command": {
|
"timeout": 10, // 指令执行超时时间
|
||||||
|
"directive": {
|
||||||
"type": "ctrl", // 遥控
|
"type": "ctrl", // 遥控
|
||||||
"stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数
|
"stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数
|
||||||
"ctype": int, // 遥控动作, 0: 打开,1: 闭合
|
"ctype": int, // 遥控动作, 0: 打开,1: 闭合
|
||||||
"value": double, // 控制参数
|
"value": double, // 控制参数
|
||||||
"timestamp": 17031000000 // 发命令时间
|
"timestamp": 17031000000 // 发命令时间
|
||||||
}
|
}
|
||||||
}
|
}
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user