From 5f28301ed187a34548a179199508c8a776ada457 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 22 Dec 2023 15:19:21 +0800 Subject: [PATCH 01/16] fix ws command --- apps/iot/src/websocket/ws_channel.erl | 47 ++++++++++++++++++++++++++- 1 file changed, 46 insertions(+), 1 deletion(-) diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 943ec6c..7a0d326 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -14,6 +14,7 @@ -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). -export([publish/3, stop/2, send/2]). +-export([publish_command/3, send_command/2]). -record(state, { uuid :: undefined | binary(), @@ -23,7 +24,9 @@ packet_id = 1 :: integer(), %% 请求响应的对应关系 - inflight = #{} + inflight = #{}, + %% 指令的对应关系 + cmd_inflight = #{} }). %% 向通道中写入消息 @@ -38,6 +41,20 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. +%% 发送指令信息 + +%% 向通道中写入消息 +-spec publish_command(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). +publish_command(Pid, ReceiverPid, Cmd) when is_pid(Pid), is_binary(Cmd) -> + Ref = make_ref(), + Pid ! {publish_command, ReceiverPid, Ref, Cmd}, + Ref. + +%% 向通道中写入消息 +-spec send_command(Pid :: pid(), Cmd :: binary()) -> no_return(). +send_command(Pid, Cmd) when is_pid(Pid), is_binary(Cmd) -> + Pid ! {send_command, Cmd}. + %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> @@ -140,6 +157,25 @@ websocket_handle({binary, <> {ok, State#state{inflight = NInflight}} end; +%% 命令的响应 +websocket_handle({binary, <>}, State = #state{uuid = UUID, cmd_inflight = CmdInflight}) when PacketId > 0 -> + lager:debug("[ws_channel] uuid: ~p, get command response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), + case maps:take(PacketId, CmdInflight) of + error -> + lager:warning("[ws_channel] get unknown command response message: ~p, packet_id: ~p", [Body, PacketId]), + {ok, State}; + {{ReceiverPid, Ref}, NCmdInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true when Body == <<>> -> + ReceiverPid ! {cmd_response, Ref}; + true -> + ReceiverPid ! {cmd_response, Ref, Body}; + false -> + lager:warning("[ws_channel] get command response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) + end, + {ok, State#state{cmd_inflight = NCmdInflight}} + end; + websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. @@ -158,6 +194,15 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; +%% 下发指令 +websocket_info({publish_command, ReceiverPid, Ref, Cmd}, State = #state{packet_id = PacketId, cmd_inflight = CmdInflight}) when is_binary(Cmd) -> + NCmdInflight = maps:put(PacketId, {ReceiverPid, Ref}, CmdInflight), + {reply, {binary, <>}, State#state{packet_id = PacketId + 1, cmd_inflight = NCmdInflight}}; + +%% 发送指令, 不需要等待回复 +websocket_info({send_command, Cmd}, State) when is_binary(Cmd) -> + {reply, {binary, <>}, State}; + %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), From cbc659187ea02cdd2f844cc1a2b55370760d53c6 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 22 Dec 2023 16:28:28 +0800 Subject: [PATCH 02/16] fix directive --- apps/iot/include/iot.hrl | 5 +- apps/iot/src/iot_host.erl | 73 ++++++++++++++++++++++++++- apps/iot/src/websocket/ws_channel.erl | 53 +++++++++---------- docs/websocket.md | 4 +- 4 files changed, 104 insertions(+), 31 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index f1dc1b7..4d6264e 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -45,8 +45,9 @@ -define(PACKET_PUBLISH_RESPONSE, 16#04). %% 服务端指令 --define(PACKET_COMMAND, 16#05). --define(PACKET_COMMAND_RESPONSE, 16#06). +% directive +-define(PACKET_DIRECTIVE, 16#05). +-define(PACKET_DIRECTIVE_RESPONSE, 16#06). %% 事件类型 -define(EVENT_DEVICE, 16#01). diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 2f4f3a4..0764b96 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,7 +21,7 @@ %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). +-export([get_metric/1, publish_message/4, get_aes/1, get_status/1, publish_directive/6, send_directive/5]). -export([create_session/2, attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -109,6 +109,45 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer( {error, Reason} end. +-spec publish_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map(), Timeout :: integer()) -> + ok | {ok, Response :: binary()} | {error, Reason :: any()}. +publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Timeout) + when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams), is_integer(Timeout) -> + + Directive = #{ + <<"device_uuid">> => DeviceUUID, + <<"version">> => Version, + <<"directive_type">> => DirectiveType, + <<"directive">> => DirectiveParams + }, + + case gen_statem:call(Pid, {publish_directive, self(), Directive}) of + {ok, Ref} -> + receive + {directive_response, Ref} -> + ok; + {directive_response, Ref, Response} -> + {ok, Response} + after Timeout -> + {error, timeout} + end; + {error, Reason} -> + {error, Reason} + end. + +-spec send_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map()) -> + ok | {error, Reason :: any()}. +send_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams) + when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams) -> + + Directive = #{ + <<"device_uuid">> => DeviceUUID, + <<"version">> => Version, + <<"directive_type">> => DirectiveType, + <<"directive">> => DirectiveParams + }, + gen_statem:call(Pid, {send_directive, Directive}). + %% 设备管理相关 -spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. @@ -232,6 +271,38 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid = lager:debug("[iot_host] uuid: ~p, publish_message invalid state: ~p", [UUID, state_map(State)]), {keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]}; +%% 发送指令时, 指令要通过aes加密,必须要求session是存在的 +handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> + + lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), + Directive = iot_cipher_aes:encrypt(AES, Directive0), + %% 通过websocket发送请求 + Ref = ws_channel:publish_directive(ChannelPid, ReceiverPid, Directive), + + {keep_state, State, [{reply, From, {ok, Ref}}]}; + +%% 其他情况下,发送指令是失败的 +handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] uuid: ~p, publish_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]), + {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; + +%% 发送指令时, 指令要通过aes加密,必须要求session是存在的 +handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, aes = AES, channel_pid = ChannelPid, has_session = true}) -> + + lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), + Directive = iot_cipher_aes:encrypt(AES, Directive0), + %% 通过websocket发送请求 + ws_channel:send_directive(ChannelPid, Directive), + + {keep_state, State, [{reply, From, ok}]}; + +%% 其他情况下,发送指令是失败的 +handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = UUID}) -> + lager:debug("[iot_host] uuid: ~p, send_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]), + {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; + %% 激活主机 handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes = Aes, channel_pid = ChannelPid}) when is_pid(ChannelPid) -> BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 7a0d326..bdf5dad 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -14,7 +14,7 @@ -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). -export([publish/3, stop/2, send/2]). --export([publish_command/3, send_command/2]). +-export([publish_directive/3, send_directive/2]). -record(state, { uuid :: undefined | binary(), @@ -26,7 +26,7 @@ %% 请求响应的对应关系 inflight = #{}, %% 指令的对应关系 - cmd_inflight = #{} + directive_inflight = #{} }). %% 向通道中写入消息 @@ -42,18 +42,15 @@ send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. %% 发送指令信息 - -%% 向通道中写入消息 --spec publish_command(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). -publish_command(Pid, ReceiverPid, Cmd) when is_pid(Pid), is_binary(Cmd) -> +-spec publish_directive(Pid :: pid(), ReceiverPid :: pid(), Directive :: binary()) -> Ref :: reference(). +publish_directive(Pid, ReceiverPid, Directive) when is_pid(Pid), is_binary(Directive) -> Ref = make_ref(), - Pid ! {publish_command, ReceiverPid, Ref, Cmd}, + Pid ! {publish_directive, ReceiverPid, Ref, Directive}, Ref. -%% 向通道中写入消息 --spec send_command(Pid :: pid(), Cmd :: binary()) -> no_return(). -send_command(Pid, Cmd) when is_pid(Pid), is_binary(Cmd) -> - Pid ! {send_command, Cmd}. +-spec send_directive(Pid :: pid(), Directive :: binary()) -> no_return(). +send_directive(Pid, Directive) when is_pid(Pid), is_binary(Directive) -> + Pid ! {send_directive, Directive}. %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). @@ -157,23 +154,27 @@ websocket_handle({binary, <> {ok, State#state{inflight = NInflight}} end; -%% 命令的响应 -websocket_handle({binary, <>}, State = #state{uuid = UUID, cmd_inflight = CmdInflight}) when PacketId > 0 -> - lager:debug("[ws_channel] uuid: ~p, get command response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), - case maps:take(PacketId, CmdInflight) of +%% 命令的响应, PacketId == 0 的指令就不应该有返回值 +websocket_handle({binary, <>}, State = #state{uuid = UUID}) -> + lager:debug("[ws_channel] uuid: ~p, get send directive response message: ~p", [UUID, Body]), + {ok, State}; + +websocket_handle({binary, <>}, State = #state{uuid = UUID, directive_inflight = DirectiveInflight}) when PacketId > 0 -> + lager:debug("[ws_channel] uuid: ~p, get directive response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), + case maps:take(PacketId, DirectiveInflight) of error -> - lager:warning("[ws_channel] get unknown command response message: ~p, packet_id: ~p", [Body, PacketId]), + lager:warning("[ws_channel] get unknown directive response message: ~p, packet_id: ~p", [Body, PacketId]), {ok, State}; - {{ReceiverPid, Ref}, NCmdInflight} -> + {{ReceiverPid, Ref}, NDirectiveInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true when Body == <<>> -> - ReceiverPid ! {cmd_response, Ref}; + ReceiverPid ! {directive_response, Ref}; true -> - ReceiverPid ! {cmd_response, Ref, Body}; + ReceiverPid ! {directive_response, Ref, Body}; false -> - lager:warning("[ws_channel] get command response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) + lager:warning("[ws_channel] get directive response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) end, - {ok, State#state{cmd_inflight = NCmdInflight}} + {ok, State#state{directive_inflight = NDirectiveInflight}} end; websocket_handle(Info, State) -> @@ -195,13 +196,13 @@ websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; %% 下发指令 -websocket_info({publish_command, ReceiverPid, Ref, Cmd}, State = #state{packet_id = PacketId, cmd_inflight = CmdInflight}) when is_binary(Cmd) -> - NCmdInflight = maps:put(PacketId, {ReceiverPid, Ref}, CmdInflight), - {reply, {binary, <>}, State#state{packet_id = PacketId + 1, cmd_inflight = NCmdInflight}}; +websocket_info({publish_directive, ReceiverPid, Ref, Directive}, State = #state{packet_id = PacketId, directive_inflight = DirectiveInflight}) when is_binary(Directive) -> + NDirectiveInflight = maps:put(PacketId, {ReceiverPid, Ref}, DirectiveInflight), + {reply, {binary, <>}, State#state{packet_id = PacketId + 1, directive_inflight = NDirectiveInflight}}; %% 发送指令, 不需要等待回复 -websocket_info({send_command, Cmd}, State) when is_binary(Cmd) -> - {reply, {binary, <>}, State}; +websocket_info({send_directive, Directive}, State) when is_binary(Directive) -> + {reply, {binary, <>}, State}; %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> diff --git a/docs/websocket.md b/docs/websocket.md index 5ae07ef..37eae6c 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -246,8 +246,8 @@ Body: 事件内容,AES加密 { "device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式 "version": "1.0", - "command_type": 0x01, // 中电计费电表控制 - "command": { + "directive_type": 0x01, // 中电计费电表控制 + "directive": { "type": "ctrl", // 遥控 "stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数 "ctype": int, // 遥控动作, 0: 打开,1: 闭合 From 4b72498d3a78a54d63e5eb4ae9a33bfe3a3681f9 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 22 Dec 2023 17:28:24 +0800 Subject: [PATCH 03/16] add mqtt consumer --- apps/iot/src/consumer/iot_zd_consumer.erl | 201 ++++++++++++++++++++++ 1 file changed, 201 insertions(+) create mode 100644 apps/iot/src/consumer/iot_zd_consumer.erl diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl new file mode 100644 index 0000000..cb10871 --- /dev/null +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -0,0 +1,201 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% 1. 需要考虑集群部署的相关问题,上行的数据可能在集群中共享 +%%% 2. host进程不能直接去监听topic,这样涉及到新增和下线的很多问题 +%%% @end +%%% Created : 12. 3月 2023 21:27 +%%%------------------------------------------------------------------- +-module(iot_zd_consumer). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(RETRY_INTERVAL, 5000). + +%% 需要订阅的主题信息 +-define(Topics,[ + {<<"CET/NX/upload">>, 2} +]). + +-record(state, { + conn_pid :: pid(), + logger_pid :: pid(), + mqtt_props :: list(), + is_connected = false +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(Props :: list()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(Props) when is_list(Props) -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [Props], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([Props]) -> + erlang:process_flag(trap_exit, true), + + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_consumer), + %% 启动日志记录器 + {ok, LoggerPid} = iot_logger:start_link("directive_data"), + + {ok, disconnected, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({disconnect, ReasonCode, Properties}, State) -> + lager:debug("[iot_zd_consumer] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + {stop, disconnected, State}; +%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 +handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> + lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + %% 将消息分发到对应的host进程去处理 + {noreply, State}; +handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> + lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), + {noreply, State}; + +handle_info({timeout, _, create_consumer}, State = #state{mqtt_props = Props, is_connected = false}) -> + try + {ok, ConnPid} = create_consumer(Props), + {noreply, State#state{conn_pid = ConnPid}} + catch _:Error:Stack -> + lager:warning("[iot_zd_consumer] config: ~p, create consumer get error: ~p, stack: ~p", [Props, Error, Stack]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), + {noreply, State#state{conn_pid = undefined}} + end; + +%% postman进程挂掉时,重新建立新的 +handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> + lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), + + {next_state, disconnected, State#state{conn_pid = undefined}}; + +handle_info(Info, State = #state{}) -> + lager:debug("[iot_zd_consumer] get info: ~p", [Info]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> + %% 取消topic的订阅 + TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), + + ok = emqtt:disconnect(ConnPid), + lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]), + ok; +terminate(Reason, _State) -> + lager:debug("[iot_zd_consumer] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. +create_consumer(Props) when is_list(Props) -> + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt_consumer">>, + + %% 建立到emqx服务器的连接 + Host = proplists:get_value(host, Props), + Port = proplists:get_value(port, Props, 18080), + Username = proplists:get_value(username, Props), + Password = proplists:get_value(password, Props), + Keepalive = proplists:get_value(keepalive, Props, 86400), + + Opts = [ + {clientid, ClientId}, + {host, Host}, + {port, Port}, + {owner, self()}, + {tcp_opts, []}, + {username, Username}, + {password, Password}, + {keepalive, Keepalive}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + %% 建立到emqx服务器的连接 + lager:debug("[iot_zd_consumer] opts is: ~p", [Opts]), + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + %% 监听和host相关的全部事件 + lager:debug("[iot_zd_consumer] start conntecting, pid: ~p", [ConnPid]), + {ok, _} = emqtt:connect(ConnPid), + lager:debug("[iot_zd_consumer] connect success, pid: ~p", [ConnPid]), + SubscribeResult = emqtt:subscribe(ConnPid, ?Topics), + lager:debug("[iot_zd_consumer] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]), + + {ok, ConnPid}; + ignore -> + {error, ignore}; + {error, Reason} -> + {error, Reason} + end. \ No newline at end of file From 231ddbad22c41c3cb1899a131cd9c0bc1e76f977 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 14:52:51 +0800 Subject: [PATCH 04/16] fix sup --- apps/iot/src/consumer/iot_zd_consumer.erl | 11 ++++++----- apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 10 ++++++---- apps/iot/src/endpoint/iot_zd_endpoint.erl | 10 ++++++---- apps/iot/src/iot_sup.erl | 7 ++----- 4 files changed, 20 insertions(+), 18 deletions(-) diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index cb10871..8d79d6d 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -14,7 +14,7 @@ -behaviour(gen_server). %% API --export([start_link/1]). +-export([start_link/0]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -39,10 +39,10 @@ %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Props :: list()) -> +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(Props) when is_list(Props) -> - gen_server:start_link({local, ?MODULE}, ?MODULE, [Props], []). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). %%%=================================================================== %%% gen_server callbacks @@ -53,9 +53,10 @@ start_link(Props) when is_list(Props) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([Props]) -> +init([]) -> erlang:process_flag(trap_exit, true), + {ok, Props} = application:get_env(iot, zhongdian), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_consumer), %% 启动日志记录器 diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl index 56eb676..fd1a5d9 100644 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl @@ -13,7 +13,7 @@ -behaviour(gen_statem). %% API --export([start_link/1]). +-export([start_link/0]). -export([get_pid/0, forward/3, get_stat/0]). %% gen_statem callbacks @@ -57,8 +57,8 @@ get_stat() -> %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(Opts) when is_list(Opts) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). %%%=================================================================== %%% gen_statem callbacks @@ -68,7 +68,9 @@ start_link(Opts) when is_list(Opts) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([Opts]) -> +init([]) -> + {ok, Opts} = application:get_env(iot, jinzhi), + PoolSize = proplists:get_value(pool_size, Opts), PriFile = proplists:get_value(pri_key, Opts), Url = proplists:get_value(url, Opts), diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index 5d8ecd7..e089ed7 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -13,7 +13,7 @@ -behaviour(gen_statem). %% API --export([start_link/1]). +-export([start_link/0]). -export([get_pid/0, forward/3, get_stat/0]). %% gen_statem callbacks @@ -57,8 +57,8 @@ get_stat() -> %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. -start_link(Opts) when is_list(Opts) -> - gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). +start_link() -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []). %%%=================================================================== %%% gen_statem callbacks @@ -68,7 +68,9 @@ start_link(Opts) when is_list(Opts) -> %% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. -init([Opts]) -> +init([]) -> + {ok, Opts} = application:get_env(iot, zhongdian), + erlang:process_flag(trap_exit, true), %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 erlang:start_timer(0, self(), create_postman), diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 9e63a51..1cd72c6 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -26,9 +26,6 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> - {ok, MqttOpts} = application:get_env(iot, zhongdian), - {ok, JinZhiOpts} = application:get_env(iot, jinzhi), - SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ #{ @@ -60,7 +57,7 @@ init([]) -> #{ id => 'iot_zd_endpoint', - start => {'iot_zd_endpoint', start_link, [MqttOpts]}, + start => {'iot_zd_endpoint', start_link, []}, restart => permanent, shutdown => 2000, type => worker, @@ -69,7 +66,7 @@ init([]) -> #{ id => 'iot_jinzhi_endpoint', - start => {'iot_jinzhi_endpoint', start_link, [JinZhiOpts]}, + start => {'iot_jinzhi_endpoint', start_link, []}, restart => permanent, shutdown => 2000, type => worker, From e7044534a58f26b0e9cd19273ebf5787a14d63f5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 15:28:58 +0800 Subject: [PATCH 05/16] fix --- apps/iot/src/consumer/iot_zd_consumer.erl | 6 +++--- apps/iot/src/endpoint/iot_zd_endpoint.erl | 9 ++++++--- apps/iot/src/postman/mqtt_postman.erl | 2 +- 3 files changed, 10 insertions(+), 7 deletions(-) diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index 8d79d6d..eae3ad5 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -62,7 +62,7 @@ init([]) -> %% 启动日志记录器 {ok, LoggerPid} = iot_logger:start_link("directive_data"), - {ok, disconnected, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. + {ok, #state{mqtt_props = Props, conn_pid = undefined, logger_pid = LoggerPid, is_connected = false}}. %% @private %% @doc Handling call messages @@ -97,7 +97,7 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> {stop, disconnected, State}; %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> - lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), %% 将消息分发到对应的host进程去处理 {noreply, State}; handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> @@ -119,7 +119,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> lager:warning("[iot_zd_consumer] consumer exited with reason: ~p", [Reason]), erlang:start_timer(?RETRY_INTERVAL, self(), create_consumer), - {next_state, disconnected, State#state{conn_pid = undefined}}; + {noreply, State#state{conn_pid = undefined}}; handle_info(Info, State = #state{}) -> lager:debug("[iot_zd_consumer] get info: ~p", [Info]), diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index e089ed7..eeeaefd 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -235,6 +235,9 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = <<"ts">> => Timestamp, <<"properties">> => Fields }, - Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, - ok. \ No newline at end of file + try + Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), + PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}} + catch _:_ -> + self() ! {ack, Id, <<"json error">>} + end. \ No newline at end of file diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 17dc044..6e0177b 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -102,7 +102,7 @@ handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> - ReceiverPid ! {ack, Id}, + ReceiverPid ! {ack, Id, Message}, {noreply, State}; {ok, PacketId} -> {noreply, State#state{inflight = maps:put(PacketId, {Id, ReceiverPid, Message}, InFlight)}}; From e7e75b51ea9c350fc0cbc9e84689ef393eec13d0 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 16:16:44 +0800 Subject: [PATCH 06/16] fix --- apps/iot/include/iot.hrl | 3 +++ apps/iot/src/consumer/iot_zd_consumer.erl | 28 ++++++++++++++++++++++- apps/iot/src/redis/redis_client.erl | 25 ++++++++++++++++++-- 3 files changed, 53 insertions(+), 3 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 4d6264e..602cc29 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -57,6 +57,9 @@ %% ai相关的事件 -define(EVENT_AI, 16#03). +%% 指令相关 +-define(DIRECTIVE_ZD_CTRL, 16#01). + %% 缓存数据库表 -record(kv, { key :: binary(), diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index eae3ad5..40ce2a7 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -22,6 +22,9 @@ -define(SERVER, ?MODULE). -define(RETRY_INTERVAL, 5000). +%% 执行超时时间 +-define(EXECUTE_TIMEOUT, 10 * 1000). + %% 需要订阅的主题信息 -define(Topics,[ {<<"CET/NX/upload">>, 2} @@ -98,8 +101,27 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), - %% 将消息分发到对应的host进程去处理 + case catch jiffy:decode(Payload, [return_maps]) of + #{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams} -> + %% 通过LocationCode查找到主机和Device_uuid + case redis_client:hgetall(LocationCode) of + {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> + case iot_host:get_pid(HostUUID) of + undefined -> + lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); + Pid -> + ReceiverPid = self(), + spawn(fun() -> + DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), + ReceiverPid ! {directive_reply, DirectiveResult} + end) + end; + _ -> + lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) + end + end, {noreply, State}; + handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), {noreply, State}; @@ -121,6 +143,10 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, State#state{conn_pid = undefined}}; +handle_info({directive_reply, Reply}, State = #state{}) -> + lager:debug("[iot_zd_consumer] get directive_reply: ~p", [Reply]), + {noreply, State}; + handle_info(Info, State = #state{}) -> lager:debug("[iot_zd_consumer] get info: ~p", [Info]), {noreply, State}. diff --git a/apps/iot/src/redis/redis_client.erl b/apps/iot/src/redis/redis_client.erl index bf0d341..ea29396 100755 --- a/apps/iot/src/redis/redis_client.erl +++ b/apps/iot/src/redis/redis_client.erl @@ -10,7 +10,7 @@ -author("aresei"). %% API --export([hget/2]). +-export([hget/2, hgetall/1]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% HashTable处理 @@ -18,4 +18,25 @@ -spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}. hget(Key, Field) when is_binary(Key), is_binary(Field) -> - poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end). \ No newline at end of file + poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end). + +-spec hgetall(Key :: binary()) -> {ok, Fields :: map()} | {error, Reason :: binary()}. +hgetall(Key) when is_binary(Key) -> + poolboy:transaction(redis_pool, fun(Conn) -> + case eredis:q(Conn, ["HGETALL", Key]) of + {ok, Items} -> + {ok, to_map(Items)}; + Error -> + Error + end + end). + + +to_map(Items) when is_list(Items), length(Items) rem 2 == 0 -> + to_map(Items, #{}). +to_map([], Target) -> + Target; +to_map([K, V|Tail], Target) -> + to_map(Tail, Target#{K => V}). + + From c6879d51832d37405b59660926303504fdebf8b5 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 16:59:18 +0800 Subject: [PATCH 07/16] fix directive --- apps/iot/src/consumer/iot_zd_consumer.erl | 70 ++++++++++++++++------- apps/iot/src/iot_host.erl | 11 ++-- 2 files changed, 55 insertions(+), 26 deletions(-) diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index 40ce2a7..1db6399 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -15,6 +15,7 @@ %% API -export([start_link/0]). +-export([mock/0, mock/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,25 @@ %%% API %%%=================================================================== +mock() -> + LocationCode = <<"0001000290040150004002560">>, + mock(LocationCode). + +mock(LocationCode) when is_binary(LocationCode) -> + Req = #{ + <<"version">> => <<"1.0">>, + <<"ts">> => iot_util:current_time(), + <<"properties">> => #{ + <<"type">> => <<"ctrl">>, + <<"stype">> => 0, + <<"ctype">> => 1, + <<"value">> => 1234, + <<"timestamp">> => iot_util:current_time() + }, + <<"location_code">> => LocationCode + }, + gen_server:call(?MODULE, {mock, Req}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -77,7 +97,8 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) -> +handle_call({mock, Request}, _From, State = #state{conn_pid = _ConnPid}) -> + publish_directive(Request), {reply, ok, State}. %% @private @@ -101,25 +122,10 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), - case catch jiffy:decode(Payload, [return_maps]) of - #{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams} -> - %% 通过LocationCode查找到主机和Device_uuid - case redis_client:hgetall(LocationCode) of - {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> - case iot_host:get_pid(HostUUID) of - undefined -> - lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); - Pid -> - ReceiverPid = self(), - spawn(fun() -> - DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), - ReceiverPid ! {directive_reply, DirectiveResult} - end) - end; - _ -> - lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) - end - end, + + Request = catch jiffy:decode(Payload, [return_maps]), + publish_directive(Request), + {noreply, State}; handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> @@ -144,7 +150,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, State#state{conn_pid = undefined}}; handle_info({directive_reply, Reply}, State = #state{}) -> - lager:debug("[iot_zd_consumer] get directive_reply: ~p", [Reply]), + lager:debug("[iot_zd_consumer] get directive_reply: ~ts", [Reply]), {noreply, State}; handle_info(Info, State = #state{}) -> @@ -182,6 +188,28 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}) -> + %% 通过LocationCode查找到主机和Device_uuid + case redis_client:hgetall(LocationCode) of + {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> + case iot_host:get_pid(HostUUID) of + undefined -> + lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); + Pid -> + ReceiverPid = self(), + spawn(fun() -> + DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), + ReceiverPid ! {directive_reply, DirectiveResult} + end) + end; + {ok, Map} -> + lager:notice("[iot_zd_consumer] location_code: ~p, redis data invalid: ~p", [LocationCode, Map]); + _ -> + lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) + end; +publish_directive(Other) -> + lager:notice("[iot_zd_consumer] get a unknown directive", [Other]). + -spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. create_consumer(Props) when is_list(Props) -> Node = atom_to_binary(node()), diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 0764b96..914e6a4 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,7 +21,8 @@ %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, publish_message/4, get_aes/1, get_status/1, publish_directive/6, send_directive/5]). +-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). +-export([publish_directive/6, send_directive/5]). -export([create_session/2, attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -124,9 +125,9 @@ publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Time case gen_statem:call(Pid, {publish_directive, self(), Directive}) of {ok, Ref} -> receive - {directive_response, Ref} -> + {ws_response, Ref} -> ok; - {directive_response, Ref, Response} -> + {ws_response, Ref, Response} -> {ok, Response} after Timeout -> {error, timeout} @@ -278,7 +279,7 @@ handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), Directive = iot_cipher_aes:encrypt(AES, Directive0), %% 通过websocket发送请求 - Ref = ws_channel:publish_directive(ChannelPid, ReceiverPid, Directive), + Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>), {keep_state, State, [{reply, From, {ok, Ref}}]}; @@ -294,7 +295,7 @@ handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED, lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), Directive = iot_cipher_aes:encrypt(AES, Directive0), %% 通过websocket发送请求 - ws_channel:send_directive(ChannelPid, Directive), + ws_channel:send(ChannelPid, <<16:8, Directive/binary>>), {keep_state, State, [{reply, From, ok}]}; From ec74c55c97b2cbf69f8849ad2f906b63bb01f96c Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 17:02:56 +0800 Subject: [PATCH 08/16] simple directive --- apps/iot/include/iot.hrl | 5 --- apps/iot/src/websocket/ws_channel.erl | 48 +-------------------------- 2 files changed, 1 insertion(+), 52 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 602cc29..39cd286 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -44,11 +44,6 @@ -define(PACKET_PUBLISH, 16#03). -define(PACKET_PUBLISH_RESPONSE, 16#04). -%% 服务端指令 -% directive --define(PACKET_DIRECTIVE, 16#05). --define(PACKET_DIRECTIVE_RESPONSE, 16#06). - %% 事件类型 -define(EVENT_DEVICE, 16#01). %% 主机的相关事件 diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index bdf5dad..943ec6c 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -14,7 +14,6 @@ -export([init/2]). -export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). -export([publish/3, stop/2, send/2]). --export([publish_directive/3, send_directive/2]). -record(state, { uuid :: undefined | binary(), @@ -24,9 +23,7 @@ packet_id = 1 :: integer(), %% 请求响应的对应关系 - inflight = #{}, - %% 指令的对应关系 - directive_inflight = #{} + inflight = #{} }). %% 向通道中写入消息 @@ -41,17 +38,6 @@ publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> Pid ! {send, Msg}. -%% 发送指令信息 --spec publish_directive(Pid :: pid(), ReceiverPid :: pid(), Directive :: binary()) -> Ref :: reference(). -publish_directive(Pid, ReceiverPid, Directive) when is_pid(Pid), is_binary(Directive) -> - Ref = make_ref(), - Pid ! {publish_directive, ReceiverPid, Ref, Directive}, - Ref. - --spec send_directive(Pid :: pid(), Directive :: binary()) -> no_return(). -send_directive(Pid, Directive) when is_pid(Pid), is_binary(Directive) -> - Pid ! {send_directive, Directive}. - %% 关闭方法 -spec stop(Pid :: pid(), Reason :: any()) -> no_return(). stop(undefined, _Reason) -> @@ -154,29 +140,6 @@ websocket_handle({binary, <> {ok, State#state{inflight = NInflight}} end; -%% 命令的响应, PacketId == 0 的指令就不应该有返回值 -websocket_handle({binary, <>}, State = #state{uuid = UUID}) -> - lager:debug("[ws_channel] uuid: ~p, get send directive response message: ~p", [UUID, Body]), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{uuid = UUID, directive_inflight = DirectiveInflight}) when PacketId > 0 -> - lager:debug("[ws_channel] uuid: ~p, get directive response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), - case maps:take(PacketId, DirectiveInflight) of - error -> - lager:warning("[ws_channel] get unknown directive response message: ~p, packet_id: ~p", [Body, PacketId]), - {ok, State}; - {{ReceiverPid, Ref}, NDirectiveInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true when Body == <<>> -> - ReceiverPid ! {directive_response, Ref}; - true -> - ReceiverPid ! {directive_response, Ref, Body}; - false -> - lager:warning("[ws_channel] get directive response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) - end, - {ok, State#state{directive_inflight = NDirectiveInflight}} - end; - websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), {stop, State}. @@ -195,15 +158,6 @@ websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = Pack websocket_info({send, Msg}, State) when is_binary(Msg) -> {reply, {binary, <>}, State}; -%% 下发指令 -websocket_info({publish_directive, ReceiverPid, Ref, Directive}, State = #state{packet_id = PacketId, directive_inflight = DirectiveInflight}) when is_binary(Directive) -> - NDirectiveInflight = maps:put(PacketId, {ReceiverPid, Ref}, DirectiveInflight), - {reply, {binary, <>}, State#state{packet_id = PacketId + 1, directive_inflight = NDirectiveInflight}}; - -%% 发送指令, 不需要等待回复 -websocket_info({send_directive, Directive}, State) when is_binary(Directive) -> - {reply, {binary, <>}, State}; - %% 用户进程关闭,则关闭通道 websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), From 2c22592376bbb526924f55f4e50e63f39b52489b Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 23 Dec 2023 17:04:22 +0800 Subject: [PATCH 09/16] add sup --- apps/iot/src/iot_sup.erl | 9 +++++++++ 1 file changed, 9 insertions(+) diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 1cd72c6..5479484 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -64,6 +64,15 @@ init([]) -> modules => ['iot_zd_endpoint'] }, + #{ + id => 'iot_zd_consumer', + start => {'iot_zd_consumer', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_zd_consumer'] + }, + #{ id => 'iot_jinzhi_endpoint', start => {'iot_jinzhi_endpoint', start_link, []}, From d5b4532633ca08f67201bf36c0e2539c1a59fc82 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 15:24:33 +0800 Subject: [PATCH 10/16] fix docs --- docs/websocket.md | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/docs/websocket.md b/docs/websocket.md index 37eae6c..53d54b9 100644 --- a/docs/websocket.md +++ b/docs/websocket.md @@ -233,6 +233,14 @@ Body: 事件内容,AES加密 ## 指令说明 +### 指令返回格式说明(按照json_rpc 2.0的规范) + +```text + 成功: {"result": map | array | string | any} + + 失败: {"error": {code: int, message: "错误描述"}} +``` + ### 服务器对主机推送的指令格式 <<0x05, PacketId:4, Body:任意长度>> @@ -247,6 +255,7 @@ Body: 事件内容,AES加密 "device_uuid": "xxxxxx", // 设备的device_uuid, 数组格式 "version": "1.0", "directive_type": 0x01, // 中电计费电表控制 + "timeout": 10, // 指令执行超时时间 "directive": { "type": "ctrl", // 遥控 "stype": int, // 遥控类型,0: 遥控, 1: 遥调, 2: 置数 @@ -254,4 +263,5 @@ Body: 事件内容,AES加密 "value": double, // 控制参数 "timestamp": 17031000000 // 发命令时间 } -} \ No newline at end of file +} + From 325d95e7849f3a17e7cf123912386154f14ddbad Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 16:00:47 +0800 Subject: [PATCH 11/16] add iot_api --- apps/iot/src/iot_api.erl | 137 ++++++++++++++++++++++++++++++++++++++ apps/iot/src/iot_host.erl | 7 +- apps/iot/src/iot_sup.erl | 9 +++ config/sys-dev.config | 2 + config/sys-prod.config | 2 + 5 files changed, 156 insertions(+), 1 deletion(-) create mode 100644 apps/iot/src/iot_api.erl diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl new file mode 100644 index 0000000..c3ea881 --- /dev/null +++ b/apps/iot/src/iot_api.erl @@ -0,0 +1,137 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 24. 12月 2023 15:42 +%%%------------------------------------------------------------------- +-module(iot_api). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([ai_event/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). +-define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +ai_event(Id) when is_integer(Id) -> + gen_server:cast(?MODULE, {ai_event, Id}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({ai_event, Id}, State = #state{}) -> + spawn_monitor(fun() -> + Sign = iot_util:md5(<>), + {ok, Url} = application:get_env(iot, api_url), + + Headers = [ + {<<"content-type">>, <<"application/json">>} + ], + ReqData = #{ + <<"sign">> => Sign, + <<"id">> => Id + }, + Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), + case hackney:request(post, Url, Headers, Body, [{pool, false}]) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef); + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]); + {error, Reason} -> + lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]) + end + end), + + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +%% Task进程挂掉 +handle_info({'DOWN', _MRef, process, _Pid, normal}, State) -> + {keep_state, State}; + +handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> + lager:notice("[iot_api] task process down with reason: ~p", [Reason]), + {keep_state, State}; + +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 914e6a4..2e85e77 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -490,7 +490,12 @@ handle_event(cast, {handle, {ai_event, Event0}}, ?STATE_ACTIVATED, State = #stat %% 保存数据到mysql Message = iolist_to_binary(jiffy:encode(Params, [force_utf8])), - ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message), + case ai_event_logs_bo:insert(UUID, DeviceUUID, SceneId, MicroId, EventType, Message) of + {ok, LogId} -> + iot_api:ai_event(LogId); + _ -> + ok + end, iot_device:change_status(DevicePid, ?DEVICE_ONLINE), iot_ai_router:route_uuid(DeviceUUID, EventType, Params) diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 5479484..e0b76cf 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ + #{ + id => 'iot_api', + start => {'iot_api', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_api'] + }, + #{ id => 'iot_database_buffer', start => {'iot_database_buffer', start_link, []}, diff --git a/config/sys-dev.config b/config/sys-dev.config index a2ea5d6..c49c0a0 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -18,6 +18,8 @@ {port, 18080} ]}, + {api_url, "http://39.98.184.67:8800/api/v1/taskLog"}, + %% 目标服务器地址 {emqx_server, [ {host, {39, 98, 184, 67}}, diff --git a/config/sys-prod.config b/config/sys-prod.config index c3b9719..1f9cffc 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -23,6 +23,8 @@ {<<"test">>, <<"iot2023">>} ]}, + {api_url, "https://lgsiot.njau.edu.cn/api/v1/taskLog"}, + %% 配置中电的数据转发, mqtt协议 {zhongdian, [ {host, "172.30.6.161"}, From 9894d9990820c842093c63f7cf672d9ac258789c Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 16:04:03 +0800 Subject: [PATCH 12/16] fix --- apps/iot/src/iot_api.erl | 1 + 1 file changed, 1 insertion(+) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index c3ea881..8ae4f92 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -85,6 +85,7 @@ handle_cast({ai_event, Id}, State = #state{}) -> case hackney:request(post, Url, Headers, Body, [{pool, false}]) of {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[iot_api] send body: ~p, get error is: ~p", [Body, RespBody]), hackney:close(ClientRef); {ok, HttpCode, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), From f117a57be60742a986361b8785e4fba1c256af8e Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 16:06:02 +0800 Subject: [PATCH 13/16] fix token --- apps/iot/src/iot_api.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index 8ae4f92..0383f5f 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -71,14 +71,14 @@ handle_call(_Request, _From, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). handle_cast({ai_event, Id}, State = #state{}) -> spawn_monitor(fun() -> - Sign = iot_util:md5(<>), + Token = iot_util:md5(<>), {ok, Url} = application:get_env(iot, api_url), Headers = [ {<<"content-type">>, <<"application/json">>} ], ReqData = #{ - <<"sign">> => Sign, + <<"token">> => Token, <<"id">> => Id }, Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), From df127490ed25538b29cbd9b9f442d1240339347a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 16:07:30 +0800 Subject: [PATCH 14/16] fix --- apps/iot/src/iot_api.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index 0383f5f..a5fed49 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -106,11 +106,11 @@ handle_cast({ai_event, Id}, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% Task进程挂掉 handle_info({'DOWN', _MRef, process, _Pid, normal}, State) -> - {keep_state, State}; + {noreply, State}; handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> lager:notice("[iot_api] task process down with reason: ~p", [Reason]), - {keep_state, State}; + {noreply, State}; handle_info(_Info, State = #state{}) -> {noreply, State}. From 2dc8364bec511ce390bc1fcdcf44da07373cb4a0 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 16:12:22 +0800 Subject: [PATCH 15/16] fix --- apps/iot/src/consumer/iot_zd_consumer.erl | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index 1db6399..98f80b4 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -28,7 +28,7 @@ %% 需要订阅的主题信息 -define(Topics,[ - {<<"CET/NX/upload">>, 2} + {<<"CET/NX/download">>, 2} ]). -record(state, { From 2e5af22852ac7549eebab697abab5c948d49a7fe Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sun, 24 Dec 2023 16:13:00 +0800 Subject: [PATCH 16/16] fix --- apps/iot/src/iot_sup.erl | 16 ++++++++-------- 1 file changed, 8 insertions(+), 8 deletions(-) diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index e0b76cf..a0062df 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -73,14 +73,14 @@ init([]) -> modules => ['iot_zd_endpoint'] }, - #{ - id => 'iot_zd_consumer', - start => {'iot_zd_consumer', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_zd_consumer'] - }, + %#{ + % id => 'iot_zd_consumer', + % start => {'iot_zd_consumer', start_link, []}, + % restart => permanent, + % shutdown => 2000, + % type => worker, + % modules => ['iot_zd_consumer'] + %}, #{ id => 'iot_jinzhi_endpoint',