diff --git a/apps/efka/include/efka_service.hrl b/apps/efka/include/efka_service.hrl new file mode 100644 index 0000000..0df9fb3 --- /dev/null +++ b/apps/efka/include/efka_service.hrl @@ -0,0 +1,18 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 26. 8月 2025 14:42 +%%%------------------------------------------------------------------- +-author("anlicheng"). + +%% 服务注册 +-define(PACKET_REQUEST, 16#01). +%% 消息响应 +-define(PACKET_RESPONSE, 16#02). +%% 上传数据 +-define(PACKET_PUSH, 16#03). + +-define(PACKET_PUB, 16#04). \ No newline at end of file diff --git a/apps/efka/src/efka.app.src b/apps/efka/src/efka.app.src index 3fe4287..3b38dd1 100644 --- a/apps/efka/src/efka.app.src +++ b/apps/efka/src/efka.app.src @@ -10,6 +10,8 @@ %gpb, parse_trans, lager, + cowboy, + ranch, crypto, inets, ssl, diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index ac89df2..3c3114a 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -13,6 +13,9 @@ start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), + + ws_server:start_server(), + efka_sup:start_link(). stop(_State) -> diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index c9a3701..66355b9 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -74,21 +74,12 @@ init([]) -> }, #{ - id => 'efka_tcp_sup', - start => {'efka_tcp_sup', start_link, []}, + id => 'tcp_sup', + start => {'tcp_sup', start_link, []}, restart => permanent, shutdown => 2000, type => supervisor, - modules => ['efka_tcp_sup'] - }, - - #{ - id => 'efka_tcp_server', - start => {'efka_tcp_server', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['efka_tcp_server'] + modules => ['tcp_sup'] }, #{ diff --git a/apps/efka/src/tcp_server/efka_tcp_channel.erl b/apps/efka/src/tcp_server/tcp_channel.erl similarity index 98% rename from apps/efka/src/tcp_server/efka_tcp_channel.erl rename to apps/efka/src/tcp_server/tcp_channel.erl index 189433e..fecc64f 100644 --- a/apps/efka/src/tcp_server/efka_tcp_channel.erl +++ b/apps/efka/src/tcp_server/tcp_channel.erl @@ -6,8 +6,9 @@ %%% @end %%% Created : 30. 4月 2025 09:22 %%%------------------------------------------------------------------- --module(efka_tcp_channel). +-module(tcp_channel). -author("anlicheng"). +-include("efka_service.hrl"). -behaviour(gen_server). @@ -24,15 +25,6 @@ -define(PENDING_TIMEOUT, 10 * 1000). %% 消息类型 -%% 服务注册 --define(PACKET_REQUEST, 16#01). -%% 消息响应 --define(PACKET_RESPONSE, 16#02). -%% 上传数据 --define(PACKET_PUSH, 16#03). - --define(PACKET_PUB, 16#04). - -record(state, { packet_id = 1, socket :: gen_tcp:socket(), diff --git a/apps/efka/src/tcp_server/efka_tcp_sup.erl b/apps/efka/src/tcp_server/tcp_connection_sup.erl similarity index 91% rename from apps/efka/src/tcp_server/efka_tcp_sup.erl rename to apps/efka/src/tcp_server/tcp_connection_sup.erl index 0d3325c..ccfeadf 100644 --- a/apps/efka/src/tcp_server/efka_tcp_sup.erl +++ b/apps/efka/src/tcp_server/tcp_connection_sup.erl @@ -3,7 +3,7 @@ %% @end %%%------------------------------------------------------------------- --module(efka_tcp_sup). +-module(tcp_connection_sup). -behaviour(supervisor). @@ -28,8 +28,8 @@ start_link() -> init([]) -> SupFlags = #{strategy => simple_one_for_one, intensity => 0, period => 1}, ChildSpec = #{ - id => efka_tcp_channel, - start => {efka_tcp_channel, start_link, []}, + id => tcp_channel, + start => {tcp_channel, start_link, []}, restart => temporary, type => worker }, diff --git a/apps/efka/src/tcp_server/efka_tcp_server.erl b/apps/efka/src/tcp_server/tcp_server.erl similarity index 94% rename from apps/efka/src/tcp_server/efka_tcp_server.erl rename to apps/efka/src/tcp_server/tcp_server.erl index e1646e2..5a9de03 100644 --- a/apps/efka/src/tcp_server/efka_tcp_server.erl +++ b/apps/efka/src/tcp_server/tcp_server.erl @@ -6,7 +6,7 @@ %%% @end %%% Created : 29. 4月 2025 23:24 %%%------------------------------------------------------------------- --module(efka_tcp_server). +-module(tcp_server). -author("anlicheng"). %% API @@ -33,7 +33,7 @@ main_loop(ListenSocket) -> case gen_tcp:accept(ListenSocket) of {ok, Socket} -> % 为每个新连接生成一个处理进程 - {ok, ChannelPid} = efka_tcp_sup:start_child(Socket), + {ok, ChannelPid} = tcp_connection_sup:start_child(Socket), ok = gen_tcp:controlling_process(Socket, ChannelPid), % 继续监听下一个连接 main_loop(ListenSocket); diff --git a/apps/efka/src/tcp_server/tcp_sup.erl b/apps/efka/src/tcp_server/tcp_sup.erl new file mode 100644 index 0000000..159e591 --- /dev/null +++ b/apps/efka/src/tcp_server/tcp_sup.erl @@ -0,0 +1,72 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 26. 8月 2025 14:36 +%%%------------------------------------------------------------------- +-module(tcp_sup). +-author("anlicheng"). + +-behaviour(supervisor). + +%% API +-export([start_link/0]). + +%% Supervisor callbacks +-export([init/1]). + +-define(SERVER, ?MODULE). + +%%%=================================================================== +%%% API functions +%%%=================================================================== + +%% @doc Starts the supervisor +-spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + supervisor:start_link({local, ?SERVER}, ?MODULE, []). + +%%%=================================================================== +%%% Supervisor callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a supervisor is started using supervisor:start_link/[2,3], +%% this function is called by the new process to find out about +%% restart strategy, maximum restart frequency and child +%% specifications. +-spec(init(Args :: term()) -> + {ok, {SupFlags :: {RestartStrategy :: supervisor:strategy(), + MaxR :: non_neg_integer(), MaxT :: non_neg_integer()}, + [ChildSpec :: supervisor:child_spec()]}} + | ignore | {error, Reason :: term()}). +init([]) -> + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, + + Specs = [ + #{ + id => 'tcp_connection_sup', + start => {'tcp_connection_sup', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['tcp_connection_sup'] + }, + + #{ + id => 'tcp_server', + start => {'tcp_server', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['tcp_server'] + } + ], + + {ok, {SupFlags, Specs}}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/efka/src/websocket_server/ws_channel.erl b/apps/efka/src/websocket_server/ws_channel.erl new file mode 100644 index 0000000..22f3e57 --- /dev/null +++ b/apps/efka/src/websocket_server/ws_channel.erl @@ -0,0 +1,236 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2021, +%%% @doc +%%% +%%% @end +%%% Created : 11. 1月 2021 上午12:17 +%%%------------------------------------------------------------------- +-module(ws_channel). +-author("licheng5"). +-include("efka_service.hrl"). + +%% API +-export([init/2]). +-export([websocket_init/1, websocket_handle/2, websocket_info/2, terminate/3]). +-export([push_config/4, invoke/4]). + +%% 最大的等待时间 +-define(PENDING_TIMEOUT, 10 * 1000). + +-record(state, { + packet_id = 1, + service_id :: undefined | binary(), + service_pid :: undefined | pid(), + is_registered = false :: boolean(), + + %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}}; 自身的inflight需要超时逻辑处理 + inflight = #{} +}). + +-spec push_config(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), ConfigJson :: binary()) -> no_return(). +push_config(ChannelPid, Ref, ReceiverPid, ConfigJson) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(ConfigJson), is_reference(Ref) -> + ChannelPid ! {push_config, Ref, ReceiverPid, ConfigJson}. + +-spec invoke(ChannelPid :: pid(), Ref :: reference(), ReceiverPid :: pid(), Payload :: binary()) -> no_return(). +invoke(ChannelPid, Ref, ReceiverPid, Payload) when is_pid(ChannelPid), is_pid(ReceiverPid), is_binary(Payload), is_reference(Ref) -> + ChannelPid ! {invoke, Ref, ReceiverPid, Payload}. + + + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% 逻辑处理方法 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +init(Req, Opts) -> + {cowboy_websocket, Req, Opts}. + +websocket_init(_State) -> + lager:debug("[ws_channel] get a new connection"), + %% 初始状态为true + {ok, #state{packet_id = 1}}. + +websocket_handle({binary, <>}, State) -> + Request = jiffy:decode(Data, [return_maps]), + handle_request(Request, State); + +%% 处理micro-client:response => efka 的响应 +websocket_handle({binary, <>}, State = #state{inflight = Inflight}) -> + Resp = jiffy:decode(Data, [return_maps]), + case Resp of + #{<<"id">> := Id, <<"result">> := Result} -> + case maps:take(Id, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), + {ok, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {ok, Result}}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) + end, + {ok, State#state{inflight = NInflight}} + end; + #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> + case maps:take(Id, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), + {ok, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {error, Error}}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) + end, + {ok, State#state{inflight = NInflight}} + end + end; +websocket_handle(Info, State) -> + lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), + {stop, State}. + + +websocket_info({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{packet_id = PacketId, inflight = Inflight}) -> + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, + Packet = jiffy:encode(PushConfig, [force_utf8]), + Reply = <>, + erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), + + {reply, {binary, Reply}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + +%% 远程调用 +websocket_info({invoke, Ref, ReceiverPid, Payload}, State = #state{packet_id = PacketId, inflight = Inflight}) -> + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}}, + Packet = jiffy:encode(PushConfig, [force_utf8]), + Reply = <>, + erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), + + {reply, {binary, Reply}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; + +%% 订阅的消息 +websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> + Packet = jiffy:encode(#{<<"topic">> => Topic, <<"content">> => Content}, [force_utf8]), + Reply = <>, + {reply, {binary, Reply}, State}; + +%% 超时逻辑处理 +websocket_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) -> + case maps:take(Id, Inflight) of + error -> + {ok, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {error, <<"timeout">>}}; + false -> + ok + end, + {ok, State#state{inflight = NInflight}} + end; + +%% service进程关闭 +websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{service_pid = ServicePid}) -> + lager:debug("[tcp_channel] service_pid: ~p, exited: ~p", [ServicePid, Reason]), + {stop, State#state{service_pid = undefined}}; + +websocket_info({timeout, _, {stop, Reason}}, State) -> + lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), + {stop, State}; +%% 处理关闭信号 +websocket_info({stop, Reason}, State) -> + lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), + {stop, State}; + +%% 处理其他未知消息 +websocket_info(Info, State) -> + lager:debug("[ws_channel] channel get unknown info: ~p", [Info]), + {ok, State}. + +%% 进程关闭事件 +terminate(Reason, _Req, State) -> + lager:debug("[ws_channel] channel close with reason: ~p, state is: ~p", [Reason, State]), + ok. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% 注册 +handle_request(#{<<"id">> := Id, <<"method">> := <<"register">>, <<"params">> := #{<<"service_id">> := ServiceId}}, State) -> + case efka_service:get_pid(ServiceId) of + undefined -> + lager:warning("[efka_tcp_channel] service_id: ~p, not running", [ServiceId]), + Packet = json_error(Id, -1, <<"service not running">>), + Reply = <>, + delay_stop(10, normal), + {reply, {binary, Reply}, State}; + ServicePid when is_pid(ServicePid) -> + case efka_service:attach_channel(ServicePid, self()) of + ok -> + Packet = json_result(Id, <<"ok">>), + erlang:monitor(process, ServicePid), + Reply = <>, + + {reply, {binary, Reply}, State#state{service_id = ServiceId, service_pid = ServicePid, is_registered = true}}; + {error, Error} -> + lager:warning("[efka_tcp_channel] service_id: ~p, attach_channel get error: ~p", [ServiceId, Error]), + Packet = json_error(Id, -1, Error), + Reply = <>, + delay_stop(10, normal), + {reply, {binary, Reply}, State} + end + end; + +%% 请求参数 +handle_request(#{<<"id">> := Id, <<"method">> := <<"request_config">>}, State = #state{service_pid = ServicePid, is_registered = true}) -> + {ok, ConfigJson} = efka_service:request_config(ServicePid), + Packet = json_result(Id, ConfigJson), + Reply = <>, + {reply, {binary, Reply}, State}; + +%% 数据项 +handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, + <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> + efka_service:metric_data(ServicePid, DeviceUUID, RouteKey, Metric), + {ok, State}; + +%% Event事件 +handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> + efka_service:send_event(ServicePid, EventType, Body), + {ok, State}; + +%% 订阅事件 +handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) -> + efka_subscription:subscribe(Topic, self()), + {ok, State}. + +%% 采用32位编码 +-spec next_packet_id(PacketId :: integer()) -> NextPacketId :: integer(). +next_packet_id(PacketId) when PacketId >= 4294967295 -> + 1; +next_packet_id(PacketId) -> + PacketId + 1. + +-spec json_result(Id :: integer(), Result :: term()) -> binary(). +json_result(Id, Result) when is_integer(Id) -> + Response = #{ + <<"id">> => Id, + <<"result">> => Result + }, + jiffy:encode(Response, [force_utf8]). + +-spec json_error(Id :: integer(), Code :: integer(), Message :: binary()) -> binary(). +json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(Message) -> + Response = #{ + <<"id">> => Id, + <<"error">> => #{ + <<"code">> => Code, + <<"message">> => Message + } + }, + jiffy:encode(Response, [force_utf8]). + +delay_stop(Timeout, Reason) when is_integer(Timeout) -> + erlang:start_timer(Timeout, self(), {stop, Reason}). \ No newline at end of file diff --git a/apps/efka/src/websocket_server/ws_server.erl b/apps/efka/src/websocket_server/ws_server.erl new file mode 100644 index 0000000..c7edfe7 --- /dev/null +++ b/apps/efka/src/websocket_server/ws_server.erl @@ -0,0 +1,34 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 26. 8月 2025 14:28 +%%%------------------------------------------------------------------- +-module(ws_server). +-author("anlicheng"). + +%% API +-export([start_server/0]). + +start_server() -> + {ok, Props} = application:get_env(iot, ws_server), + Acceptors = proplists:get_value(acceptors, Props, 50), + MaxConnections = proplists:get_value(max_connections, Props, 10240), + Backlog = proplists:get_value(backlog, Props, 1024), + Port = proplists:get_value(port, Props), + + Dispatcher = cowboy_router:compile([ + {'_', [{"/ws", ws_channel, []}]} + ]), + + TransOpts = [ + {port, Port}, + {num_acceptors, Acceptors}, + {backlog, Backlog}, + {max_connections, MaxConnections} + ], + {ok, Pid} = cowboy:start_clear(ws_listener, TransOpts, #{env => #{dispatch => Dispatcher}}), + + lager:debug("[efka_app] websocket server start at: ~p, pid is: ~p", [Port, Pid]). \ No newline at end of file diff --git a/config/sys.config b/config/sys.config index 7253bbd..844c2e2 100644 --- a/config/sys.config +++ b/config/sys.config @@ -8,6 +8,13 @@ {port, 18088} ]}, + {ws_server, [ + {port, 18080}, + {acceptors, 10}, + {max_connections, 1024}, + {backlog, 256} + ]}, + {tls_server, [ {host, "localhost"}, {port, 443} diff --git a/rebar.config b/rebar.config index b91b703..3cf1dce 100644 --- a/rebar.config +++ b/rebar.config @@ -4,6 +4,7 @@ {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.2"}}}, {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, + {cowboy, ".*", {git, "https://github.com/ninenines/cowboy.git", {tag, "2.10.0"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}.