fix channel

This commit is contained in:
anlicheng 2026-02-28 23:49:56 +08:00
parent 2a7b8da245
commit 3f15398c47

View File

@ -36,7 +36,6 @@ loop(State=#state{socket = Socket, command = Command = #command{data = Data}}) -
inet:setopts(Socket, [{active, once}]), inet:setopts(Socket, [{active, once}]),
receive receive
{tcp, _, Packet} -> {tcp, _, Packet} ->
logger:debug("channel rec: ~p", [Packet]),
NData = <<Data/binary, Packet/binary>>, NData = <<Data/binary, Packet/binary>>,
case parse(Command#command{data = NData}) of case parse(Command#command{data = NData}) of
{ok, #command{args = Args}} -> {ok, #command{args = Args}} ->
@ -50,6 +49,7 @@ loop(State=#state{socket = Socket, command = Command = #command{data = Data}}) -
{tcp_error, _} -> {tcp_error, _} ->
exit(normal); exit(normal);
{tcp_closed, _} -> {tcp_closed, _} ->
logger:debug("[maxwell_redis_channel] channel closed"),
exit(normal) exit(normal)
end. end.
@ -67,7 +67,6 @@ handle_command([<<"PUBLISH">>, _Channel, Msg]) ->
{reply, encode(1)}; {reply, encode(1)};
handle_command(Args) -> handle_command(Args) ->
logger:debug("[maxwell_redis_channel] args: ~p", [Args]),
{reply, encode({error, <<"Unsuported Command">>})}. {reply, encode({error, <<"Unsuported Command">>})}.
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"insert">>, <<"data">> := Data}) -> handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"insert">>, <<"data">> := Data}) ->