diff --git a/apps/sdlan/src/policy/maxwell_redis_channel.erl b/apps/sdlan/src/policy/maxwell_redis_channel.erl new file mode 100644 index 0000000..5fca8ef --- /dev/null +++ b/apps/sdlan/src/policy/maxwell_redis_channel.erl @@ -0,0 +1,108 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2026, +%%% @doc +%%% +%%% @end +%%% Created : 28. 2月 2026 22:07 +%%%------------------------------------------------------------------- +-module(maxwell_redis_channel). +-author("licheng5"). + +%% API +-export([start/1, loop/1]). + +-record(command, { + data = <<>>, + stage = parse_arg_num, + arg_num = 0, + args = [] +}). + +-record(state, { + socket, + command +}). + +%%-------------------------------------------------------------------- +%% esockd callback +%%-------------------------------------------------------------------- + +start(Socket) -> + spawn_link(?MODULE, loop, [#state{socket = Socket, command = #command{}}]). + +loop(State=#state{socket = Socket, command = Command = #command{data = Data}}) -> + inet:setopts(Socket, [{active, once}]), + receive + {tcp, _, Packet} -> + NData = <>, + case parse(Command#command{data = NData}) of + {ok, #command{args = Args}} -> + {reply, Reply} = handle_command(Args), + gen_tcp:send(Socket, Reply), + loop(State#state{command = #command{}}); + {more_data, NCommand} -> + %% 请求的数据包过大,一次接受不完整 + loop(State#state{command = NCommand}) + end; + {tcp_error, _} -> + exit(normal); + {tcp_closed, _} -> + exit(normal) + end. + +%% 处理请求命令 +handle_command(Args) -> + logger:debug("[maxwell_redis_channel] args: ~p", [Args]), + {reply, encode({error, <<"Unsuported Command">>})}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +%% 解析请求的包, 支持请求不在一个包里面的情况, 基于状态机 +parse(Command = #command{stage = parse_arg_num, data = <<$*, Rest/binary>>}) -> + [ArgNum0, ArgBin] = binary:split(Rest, <<$\r, $\n>>), + ArgNum = binary_to_integer(ArgNum0), + parse(Command#command{arg_num = ArgNum, data = ArgBin, stage = parse_arg}); +%% 解析请求的参数 +parse(Command = #command{stage = parse_arg, args = Args, arg_num = 0, data = <<>>}) -> + {ok, Command#command{args = lists:reverse(Args)}}; +parse(Command = #command{stage = parse_arg, args = Args, arg_num = ArgNum, data = ArgBin}) -> + case binary:split(ArgBin, <<$\r, $\n>>) of + [<<"$", ArgLen0/binary>>, RestArgBin] -> + ArgLen = binary_to_integer(ArgLen0), + case RestArgBin of + <> -> + parse(Command#command{arg_num = ArgNum - 1, args = [Arg | Args], data = RestArgBin1}); + _ -> + {more_data, Command} + end; + _ -> + {more_data, Command} + end. + +%% redis数据返回格式化 +-spec encode(tuple() | binary() | list()) -> iolist(). +encode({single_line, Arg}) when is_binary(Arg) -> + [<<$+>>, Arg, <<$\r, $\n>>]; +encode({error, Arg}) when is_binary(Arg) -> + [<<$->>, Arg, <<$\r, $\n>>]; +encode(Arg) when is_integer(Arg) -> + [<<$:>>, integer_to_list(Arg), <<$\r, $\n>>]; +encode(Arg) when is_binary(Arg) -> + [<<$$>>, integer_to_list(iolist_size(Arg)), <<$\r, $\n>>, Arg, <<$\r, $\n>>]; +encode(Args) when is_list(Args) -> + ArgCount = [<<$*>>, integer_to_list(length(Args)), <<$\r, $\n>>], + ArgsBin = lists:map(fun encode/1, lists:map(fun to_binary/1, Args)), + [ArgCount, ArgsBin]. + +%% 将数据转换成binary +to_binary(X) when is_list(X) -> + unicode:characters_to_binary(X); +to_binary(X) when is_atom(X) -> + list_to_binary(atom_to_list(X)); +to_binary(X) when is_binary(X) -> + X; +to_binary(X) when is_integer(X) -> + list_to_binary(integer_to_list(X)). diff --git a/apps/sdlan/src/policy/maxwell_redis_server.erl b/apps/sdlan/src/policy/maxwell_redis_server.erl new file mode 100644 index 0000000..c6b2526 --- /dev/null +++ b/apps/sdlan/src/policy/maxwell_redis_server.erl @@ -0,0 +1,44 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2026, +%%% @doc +%%% +%%% @end +%%% Created : 28. 2月 2026 22:07 +%%%------------------------------------------------------------------- +-module(maxwell_redis_server). +-author("licheng5"). + +%% API +-export([start_link/1, init/1]). + +%%-------------------------------------------------------------------- +%% esockd callback +%%-------------------------------------------------------------------- + +start_link(Port) when is_integer(Port) -> + {ok, spawn_link(?MODULE, init, [Port])}. + +init(Port) -> + {ok, LSocket} = gen_tcp:listen(Port, [ + binary, + {packet, 0}, + {reuseaddr, true}, + {backlog, 1024}, + {active, false} + ]), + accept_loop(LSocket). + +accept_loop(LSocket) -> + case gen_tcp:accept(LSocket) of + {ok, Socket} -> + %% 每个连接一个进程 + Pid = spawn(fun() -> maxwell_redis_channel:start(Socket) end), + ok = gen_tcp:controlling_process(Socket, Pid), + accept_loop(LSocket); + {error, closed} -> + exit(tcp_closed); + {error, Reason} -> + logger:debug("[maxwell_redis_server] Accept error: ~p~n", [Reason]), + accept_loop(LSocket) + end. \ No newline at end of file diff --git a/apps/sdlan/src/sdlan_sup.erl b/apps/sdlan/src/sdlan_sup.erl index 8620998..fa639d6 100644 --- a/apps/sdlan/src/sdlan_sup.erl +++ b/apps/sdlan/src/sdlan_sup.erl @@ -88,6 +88,15 @@ init([]) -> shutdown => 2000, type => worker, modules => ['sdlan_sync_mysql'] + }, + + #{ + id => maxwell_redis_server, + start => {maxwell_redis_server, start_link, [16379]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['maxwell_redis_server'] } ],