diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index bf30c84..eccd6c4 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -15,9 +15,35 @@ start(_StartType, _StartArgs) -> %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), + start_tcp_server(), + iot_sup:start_link(). stop(_State) -> ok. -%% internal functions \ No newline at end of file +%% internal functions + +%% 启动tcp服务 +start_tcp_server() -> + {ok, Props} = application:get_env(iot, tcp_server), + Acceptors = proplists:get_value(acceptors, Props, 50), + MaxConnections = proplists:get_value(max_connections, Props, 10240), + Backlog = proplists:get_value(backlog, Props, 1024), + Port = proplists:get_value(port, Props), + + TransOpts = [ + {tcp_options, [ + binary, + {reuseaddr, true}, + {active, false}, + {packet, 2}, + {nodelay, false}, + {backlog, Backlog} + ]}, + {acceptors, Acceptors}, + {max_connections, MaxConnections} + ], + {ok, _} = esockd:open('iot/tcp_server', Port, TransOpts, {tcp_channel, start_link, []}), + + lager:debug("[iot_app] the tcp server start at: ~p", [Port]). \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index fc8ac1b..c644a48 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,7 +28,23 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ + #{ + id => 'iot_device_sup', + start => {'iot_device_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_device_sup'] + }, + #{ + id => 'iot_host_sup', + start => {'iot_host_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_host_sup'] + } ], {ok, {SupFlags, pools() ++ Specs}}. diff --git a/apps/iot/src/websocket/tcp_channel.erl b/apps/iot/src/websocket/tcp_channel.erl new file mode 100644 index 0000000..c661153 --- /dev/null +++ b/apps/iot/src/websocket/tcp_channel.erl @@ -0,0 +1,209 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 11. 1月 2021 上午12:17 +%%%------------------------------------------------------------------- +-module(tcp_channel). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([publish/3, stop/2, send/2]). + +-export([start_link/2]). +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, terminate/2]). + +-record(state, { + transport, + socket, + + uuid :: undefined | binary(), + %% 用户进程id + host_pid = undefined, + %% 发送消息对应的id + packet_id = 1 :: integer(), + + %% 请求响应的对应关系 + inflight = #{} +}). + +%% 向通道中写入消息 +-spec publish(Pid :: pid(), ReceiverPid :: pid(), Msg :: binary()) -> Ref :: reference(). +publish(Pid, ReceiverPid, Msg) when is_pid(Pid), is_binary(Msg) -> + Ref = make_ref(), + Pid ! {publish, ReceiverPid, Ref, Msg}, + Ref. + +%% 向通道中写入消息 +-spec send(Pid :: pid(), Msg :: binary()) -> no_return(). +send(Pid, Msg) when is_pid(Pid), is_binary(Msg) -> + Pid ! {send, Msg}. + +%% 关闭方法 +-spec stop(Pid :: pid(), Reason :: any()) -> no_return(). +stop(undefined, _Reason) -> + ok; +stop(Pid, Reason) when is_pid(Pid) -> + Pid ! {stop, Reason}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% 逻辑处理方法 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +start_link(Transport, Sock) -> + {ok, proc_lib:spawn_link(?MODULE, init, [[Transport, Sock]])}. + +init([Transport, Sock]) -> + lager:debug("[sdlan_channel] get a new connection: ~p", [Sock]), + case Transport:wait(Sock) of + {ok, NewSock} -> + Transport:setopts(Sock, [{active, true}]), + % erlang:start_timer(?PING_TICKER, self(), ping_ticker), + gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = NewSock}); + {error, Reason} -> + {stop, Reason} + end. + +handle_call(_Request, _From, State) -> + {reply, ok, State}. + +handle_cast(_Msg, State) -> + {noreply, State}. + +handle_info({binary, <>}, State) -> + #{<<"uuid">> := UUID, <<"timestamp">> := Timestamp, <<"salt">> := Salt, <<"username">> := Username, <<"token">> := Token} = jiffy:decode(Data, [return_maps]), + lager:debug("[ws_channel] auth uuid: ~p, request message: ~p", [UUID, Data]), + case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of + true -> + case host_bo:get_host_by_uuid(UUID) of + undefined -> + lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), + {stop, State}; + {ok, _} -> + %% 尝试启动主机的服务进程 + {ok, HostPid} = iot_host_sup:ensured_host_started(UUID), + case iot_host:attach_channel(HostPid, self()) of + ok -> + %% 建立到host的monitor + erlang:monitor(process, HostPid), + Reply = jiffy:encode(#{<<"code">> => 1, <<"message">> => <<"ok">>}, [force_utf8]), + + {reply, {binary, <>}, State#state{uuid = UUID, host_pid = HostPid}}; + {error, Reason} -> + lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p", [UUID, Reason]), + {stop, State} + end + end; + false -> + lager:warning("[ws_channel] uuid: ~p, user: ~p, auth failed", [UUID, Username]), + {stop, State} + end; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + {ok, Reply} = iot_host:create_session(HostPid, PubKey), + {reply, {binary, <>}, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {data, Data}), + {ok, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {ping, CipherMetric}), + {ok, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {inform, CipherInfo}), + {ok, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {feedback_step, CipherInfo}), + {ok, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {feedback_result, CipherInfo}), + {ok, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {event, CipherEvent}), + {ok, State}; + +handle_info({binary, <>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) -> + iot_host:handle(HostPid, {ai_event, CipherEvent}), + {ok, State}; + +%% 主机端的消息响应 +handle_info({binary, <>}, State = #state{uuid = UUID}) -> + lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]), + {ok, State}; +handle_info({binary, <>}, State = #state{uuid = UUID, inflight = Inflight}) when PacketId > 0 -> + lager:debug("[ws_channel] uuid: ~p, get publish response message: ~p, packet_id: ~p", [UUID, Body, PacketId]), + case maps:take(PacketId, Inflight) of + error -> + lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]), + {ok, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true when Body == <<>> -> + ReceiverPid ! {ws_response, Ref}; + true -> + ReceiverPid ! {ws_response, Ref, Body}; + false -> + lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Body, PacketId]) + end, + {ok, State#state{inflight = NInflight}} + end; + +handle_info(Info, State) -> + lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), + {stop, State}; + +handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> + lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), + {stop, normal, State}; + +handle_info({tcp_closed, Sock}, State = #state{socket = Sock}) -> + lager:notice("[sdlan_channel] tcp_closed"), + {stop, normal, State}; + +%% 关闭当前通道 +handle_info({stop, Reason}, State) -> + {stop, Reason, State}; + +handle_info(Info, State) -> + lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]), + {noreply, State}. + +terminate(Reason, #state{}) -> + lager:warning("[sdlan_channel] stop with reason: ~p", [Reason]), + ok. + +code_change(_OldVsn, State, _Extra) -> + {ok, State}. + +%% 处理关闭信号 +websocket_info({stop, Reason}, State) -> + lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), + {stop, State}; + +%% 发送消息 +websocket_info({publish, ReceiverPid, Ref, Msg}, State = #state{packet_id = PacketId, inflight = Inflight}) when is_binary(Msg) -> + NInflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight), + {reply, {binary, <>}, State#state{packet_id = PacketId + 1, inflight = NInflight}}; + +%% 发送消息, 不需要等待回复 +websocket_info({send, Msg}, State) when is_binary(Msg) -> + {reply, {binary, <>}, State}; + +%% 用户进程关闭,则关闭通道 +websocket_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, host_pid = HostPid}) -> + lager:debug("[ws_channel] uuid: ~p, channel will close because host exited with reason: ~p", [UUID, Reason]), + {stop, State}; + +%% 处理其他未知消息 +websocket_info(Info, State = #state{uuid = UUID}) -> + lager:debug("[ws_channel] channel get unknown info: ~p, uuid: ~p", [Info, UUID]), + {ok, State}. \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index d3d0b46..1d8ab33 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -7,6 +7,13 @@ {backlog, 10240} ]}, + {tcp_server, [ + {port, 18082}, + {acceptors, 500}, + {max_connections, 10240}, + {backlog, 10240} + ]}, + {redis_server, [ {port, 16379}, {acceptors, 500},