iot/apps/iot/src/redis/redis_protocol.erl
2023-07-24 11:26:46 +08:00

106 lines
4.1 KiB
Erlang
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2020, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 10. 12月 2020 上午11:17
%%%-------------------------------------------------------------------
-module(redis_protocol).
-author("licheng5").
%% API
-export([start_link/2, init/2]).
-record(command, {
data = <<>>,
stage = parse_arg_num,
arg_num = 0,
args = []
}).
%%--------------------------------------------------------------------
%% esockd callback
%%--------------------------------------------------------------------
start_link(Transport, Sock) ->
{ok, spawn_link(?MODULE, init, [Transport, Sock])}.
init(Transport, Sock) ->
{ok, NewSock} = Transport:wait(Sock),
loop(Transport, NewSock, #command{data = <<>>, arg_num = 0, args = []}).
loop(Transport, Sock, Command = #command{data = Data}) ->
Transport:setopts(Sock, [{active, once}]),
receive
{tcp, _, Packet} ->
%% 收到数据的第一个包,才开始记录处理时间, redis基于长连接请求不是连续处理的
NData = <<Data/binary, Packet/binary>>,
case parse(Command#command{data = NData}) of
{ok, #command{args = [Method0|Args]}} ->
Method = string:uppercase(Method0),
lager:debug("[redis_protocol] get a command: ~p", [[Method|Args]]),
{reply, Reply} = redis_handler:handle([Method | Args]),
Transport:send(Sock, encode(Reply)),
%% 等待下一次请求
loop(Transport, Sock, #command{});
{more_data, NCommand} ->
%% 请求的数据包过大,一次接受不完整
loop(Transport, Sock, NCommand)
end;
{tcp_error, _} ->
exit(normal);
{tcp_closed, _} ->
exit(normal)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% 解析请求的包, 支持请求不在一个包里面的情况, 基于状态机
parse(Command = #command{stage = parse_arg_num, data = <<$*, Rest/binary>>}) ->
[ArgNum0, ArgBin] = binary:split(Rest, <<$\r, $\n>>),
ArgNum = binary_to_integer(ArgNum0),
parse(Command#command{arg_num = ArgNum, data = ArgBin, stage = parse_arg});
%% 解析请求的参数
parse(Command = #command{stage = parse_arg, args = Args, arg_num = 0, data = <<>>}) ->
{ok, Command#command{args = lists:reverse(Args)}};
parse(Command = #command{stage = parse_arg, args = Args, arg_num = ArgNum, data = ArgBin}) ->
case binary:split(ArgBin, <<$\r, $\n>>) of
[<<"$", ArgLen0/binary>>, RestArgBin] ->
ArgLen = binary_to_integer(ArgLen0),
case RestArgBin of
<<Arg:ArgLen/binary, $\r, $\n, RestArgBin1/binary>> ->
parse(Command#command{arg_num = ArgNum - 1, args = [Arg | Args], data = RestArgBin1});
_ ->
{more_data, Command}
end;
_ ->
{more_data, Command}
end.
%% redis数据返回格式化
-spec encode(tuple() | binary() | list()) -> iolist().
encode({single_line, Arg}) when is_binary(Arg) ->
[<<$+>>, Arg, <<$\r, $\n>>];
encode({error, Arg}) when is_binary(Arg) ->
[<<$->>, Arg, <<$\r, $\n>>];
encode(Arg) when is_integer(Arg) ->
[<<$:>>, integer_to_list(Arg), <<$\r, $\n>>];
encode(Arg) when is_binary(Arg) ->
[<<$$>>, integer_to_list(iolist_size(Arg)), <<$\r, $\n>>, Arg, <<$\r, $\n>>];
encode(Args) when is_list(Args) ->
ArgCount = [<<$*>>, integer_to_list(length(Args)), <<$\r, $\n>>],
ArgsBin = lists:map(fun encode/1, lists:map(fun to_binary/1, Args)),
[ArgCount, ArgsBin].
%% 将数据转换成binary
to_binary(X) when is_list(X) ->
unicode:characters_to_binary(X);
to_binary(X) when is_atom(X) ->
list_to_binary(atom_to_list(X));
to_binary(X) when is_binary(X) ->
X;
to_binary(X) when is_integer(X) ->
list_to_binary(integer_to_list(X)).