fix redis_server

This commit is contained in:
anlicheng 2026-02-28 22:25:13 +08:00
parent be053ef8b4
commit eb793e913b
3 changed files with 161 additions and 0 deletions

View File

@ -0,0 +1,108 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2026, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. 2 2026 22:07
%%%-------------------------------------------------------------------
-module(maxwell_redis_channel).
-author("licheng5").
%% API
-export([start/1, loop/1]).
-record(command, {
data = <<>>,
stage = parse_arg_num,
arg_num = 0,
args = []
}).
-record(state, {
socket,
command
}).
%%--------------------------------------------------------------------
%% esockd callback
%%--------------------------------------------------------------------
start(Socket) ->
spawn_link(?MODULE, loop, [#state{socket = Socket, command = #command{}}]).
loop(State=#state{socket = Socket, command = Command = #command{data = Data}}) ->
inet:setopts(Socket, [{active, once}]),
receive
{tcp, _, Packet} ->
NData = <<Data/binary, Packet/binary>>,
case parse(Command#command{data = NData}) of
{ok, #command{args = Args}} ->
{reply, Reply} = handle_command(Args),
gen_tcp:send(Socket, Reply),
loop(State#state{command = #command{}});
{more_data, NCommand} ->
%%
loop(State#state{command = NCommand})
end;
{tcp_error, _} ->
exit(normal);
{tcp_closed, _} ->
exit(normal)
end.
%%
handle_command(Args) ->
logger:debug("[maxwell_redis_channel] args: ~p", [Args]),
{reply, encode({error, <<"Unsuported Command">>})}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% , ,
parse(Command = #command{stage = parse_arg_num, data = <<$*, Rest/binary>>}) ->
[ArgNum0, ArgBin] = binary:split(Rest, <<$\r, $\n>>),
ArgNum = binary_to_integer(ArgNum0),
parse(Command#command{arg_num = ArgNum, data = ArgBin, stage = parse_arg});
%%
parse(Command = #command{stage = parse_arg, args = Args, arg_num = 0, data = <<>>}) ->
{ok, Command#command{args = lists:reverse(Args)}};
parse(Command = #command{stage = parse_arg, args = Args, arg_num = ArgNum, data = ArgBin}) ->
case binary:split(ArgBin, <<$\r, $\n>>) of
[<<"$", ArgLen0/binary>>, RestArgBin] ->
ArgLen = binary_to_integer(ArgLen0),
case RestArgBin of
<<Arg:ArgLen/binary, $\r, $\n, RestArgBin1/binary>> ->
parse(Command#command{arg_num = ArgNum - 1, args = [Arg | Args], data = RestArgBin1});
_ ->
{more_data, Command}
end;
_ ->
{more_data, Command}
end.
%% redis数据返回格式化
-spec encode(tuple() | binary() | list()) -> iolist().
encode({single_line, Arg}) when is_binary(Arg) ->
[<<$+>>, Arg, <<$\r, $\n>>];
encode({error, Arg}) when is_binary(Arg) ->
[<<$->>, Arg, <<$\r, $\n>>];
encode(Arg) when is_integer(Arg) ->
[<<$:>>, integer_to_list(Arg), <<$\r, $\n>>];
encode(Arg) when is_binary(Arg) ->
[<<$$>>, integer_to_list(iolist_size(Arg)), <<$\r, $\n>>, Arg, <<$\r, $\n>>];
encode(Args) when is_list(Args) ->
ArgCount = [<<$*>>, integer_to_list(length(Args)), <<$\r, $\n>>],
ArgsBin = lists:map(fun encode/1, lists:map(fun to_binary/1, Args)),
[ArgCount, ArgsBin].
%% binary
to_binary(X) when is_list(X) ->
unicode:characters_to_binary(X);
to_binary(X) when is_atom(X) ->
list_to_binary(atom_to_list(X));
to_binary(X) when is_binary(X) ->
X;
to_binary(X) when is_integer(X) ->
list_to_binary(integer_to_list(X)).

View File

@ -0,0 +1,44 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2026, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. 2 2026 22:07
%%%-------------------------------------------------------------------
-module(maxwell_redis_server).
-author("licheng5").
%% API
-export([start_link/1, init/1]).
%%--------------------------------------------------------------------
%% esockd callback
%%--------------------------------------------------------------------
start_link(Port) when is_integer(Port) ->
{ok, spawn_link(?MODULE, init, [Port])}.
init(Port) ->
{ok, LSocket} = gen_tcp:listen(Port, [
binary,
{packet, 0},
{reuseaddr, true},
{backlog, 1024},
{active, false}
]),
accept_loop(LSocket).
accept_loop(LSocket) ->
case gen_tcp:accept(LSocket) of
{ok, Socket} ->
%%
Pid = spawn(fun() -> maxwell_redis_channel:start(Socket) end),
ok = gen_tcp:controlling_process(Socket, Pid),
accept_loop(LSocket);
{error, closed} ->
exit(tcp_closed);
{error, Reason} ->
logger:debug("[maxwell_redis_server] Accept error: ~p~n", [Reason]),
accept_loop(LSocket)
end.

View File

@ -88,6 +88,15 @@ init([]) ->
shutdown => 2000,
type => worker,
modules => ['sdlan_sync_mysql']
},
#{
id => maxwell_redis_server,
start => {maxwell_redis_server, start_link, [16379]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['maxwell_redis_server']
}
],