fix mysql binlog

This commit is contained in:
anlicheng 2026-02-28 23:34:05 +08:00
parent 6f8d936dfc
commit 5615a97e14
4 changed files with 94 additions and 31 deletions

View File

@ -2,7 +2,7 @@
-include("policy.hrl").
-export([init/0]).
-export([get_policies/1, insert/1]).
-export([get_policies/1, insert/1, delete/1, update/2]).
init() ->
ets:new(identity_policy, [named_table, bag, public, {keypos, 2}, {read_concurrency, true}]).
@ -12,5 +12,18 @@ get_policies(IdentityId) when is_integer(IdentityId) ->
Records = ets:lookup(identity_policy, IdentityId),
lists:map(fun(#identity_policy{policy_id = PolicyId}) -> PolicyId end, Records).
insert(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) ->
insert(#identity_policy{identity_id = IdentityId, policy_id = PolicyId});
insert(IdentityPolicy=#identity_policy{}) ->
true = ets:insert(identity_policy, IdentityPolicy).
delete(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) ->
ets:delete_object(identity_policy, #identity_policy{identity_id = IdentityId, policy_id = PolicyId});
delete(IdentityPolicy = #identity_policy{}) ->
true = ets:delete_object(identity_policy, IdentityPolicy).
update(NewData=#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}, OldData) ->
%%
#{<<"identity_id">> := OldIdentityId, <<"policy_id">> := OldPolicyId} = maps:merge(NewData, OldData),
ets:delete_object(identity_policy, #identity_policy{identity_id = OldIdentityId, policy_id = OldPolicyId}),
ets:insert(identity_policy, #identity_policy{identity_id = IdentityId, policy_id = PolicyId}).

View File

@ -8,6 +8,7 @@
%%%-------------------------------------------------------------------
-module(maxwell_redis_channel).
-author("licheng5").
-include("policy.hrl").
%% API
-export([start/1, loop/1]).
@ -52,14 +53,39 @@ loop(State=#state{socket = Socket, command = Command = #command{data = Data}}) -
exit(normal)
end.
%%
%% PING命
handle_command([<<"PING">>]) ->
{reply, encode({single_line, <<"PONG">>})};
handle_command([<<"PUBLISH">>, _Channel, Msg]) ->
case catch jiffy:decode(Msg, [return_maps]) of
M when is_map(M) ->
handle_data(M);
_ ->
ok
end,
{reply, encode(1)};
handle_command(Args) ->
logger:debug("[maxwell_redis_channel] args: ~p", [Args]),
{reply, encode({error, <<"Unsuported Command">>})}.
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"insert">>, <<"data">> := Data}) ->
identity_policy_ets:insert(Data);
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"delete">>, <<"data">> := Data}) ->
identity_policy_ets:delete(Data);
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"update">>, <<"data">> := Data, <<"old">> := Old}) ->
identity_policy_ets:update(Data, Old);
%% rule
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"rule">>, <<"type">> := <<"insert">>, <<"data">> := Data}) ->
identity_policy_ets:insert(Data);
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"rule">>, <<"type">> := <<"delete">>, <<"data">> := Data}) ->
identity_policy_ets:delete(Data);
handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"rule">>, <<"type">> := <<"update">>, <<"data">> := Data, <<"old">> := Old}) ->
identity_policy_ets:update(Data, Old);
handle_data(_Json) ->
ok.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -2,7 +2,7 @@
-include("policy.hrl").
-export([init/0]).
-export([insert/1, get_rules/2]).
-export([insert/1, get_rules/2, delete/1, update/2]).
init() ->
ets:new(rule_table, [named_table, ordered_set, public, {keypos, 2}, {read_concurrency, true}]),
@ -25,7 +25,50 @@ get_rules(SrcPolicyIds, DstPolicyIds) when is_list(SrcPolicyIds), is_list(DstPol
{ok, sets:to_list(S)}.
insert(Rule = #rule{src_policy_id = SrcPolicyId, dst_policy_id = DstPolicyId, rule_id = RuleId}) ->
insert(#{<<"rule_id">> := RuleId, <<"network_id">> := NetworkId,
<<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto,
<<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}) ->
Rule = #rule{
rule_id = RuleId,
network_id = NetworkId,
src_policy_id = SrcPolicyId,
dst_policy_id = DstPolicyId,
proto = Proto,
port = Port,
action = format_action(Action),
created_at = CreatedAt
},
ets:insert(rule_table, Rule),
logger:debug("rule_index: ~p", [{SrcPolicyId, DstPolicyId, RuleId}]),
ets:insert(rule_index, {SrcPolicyId, DstPolicyId, RuleId}).
update(NewData = #{<<"rule_id">> := RuleId, <<"network_id">> := NetworkId,
<<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto,
<<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}, OldData) ->
%% rule_id是主键
Rule = #rule{
rule_id = RuleId,
network_id = NetworkId,
src_policy_id = SrcPolicyId,
dst_policy_id = DstPolicyId,
proto = Proto,
port = Port,
action = format_action(Action),
created_at = CreatedAt
},
ets:insert(rule_table, Rule),
%% index老的数据可能要清理掉, Old的Record
#{<<"src_policy_id">> := OldSrcPolicyId, <<"dst_policy_id">> := OldDstPolicyId} = maps:merge(NewData, OldData),
ets:delete_object(rule_index, {OldSrcPolicyId, OldDstPolicyId, RuleId}),
%%
ets:insert(rule_index, {SrcPolicyId, DstPolicyId, RuleId}).
delete(#{<<"rule_id">> := RuleId, <<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId}) ->
ets:delete(rule_table, RuleId),
ets:delete_object(rule_index, {SrcPolicyId, DstPolicyId, RuleId}).
-spec format_action(binary()) -> atom().
format_action(<<"allow">>) ->
allow;
format_action(_) ->
deny.

View File

@ -89,9 +89,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
sync_identity_policy() ->
{ok, Rows} = mysql_pool:get_all(mysql_sdlan, <<"select * from identity_policy">>),
lists:map(fun(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) ->
identity_policy_ets:insert(#identity_policy{identity_id = IdentityId, policy_id = PolicyId})
end, Rows).
lists:map(fun(R) -> identity_policy_ets:insert(R) end, Rows).
sync_rule() ->
sync_rule0(0).
@ -100,19 +98,8 @@ sync_rule0(RuleIdOffset) ->
logger:debug("rule rows: ~p", [Rows]),
case length(Rows) > 0 of
true ->
RuleIds = lists:map(fun(#{<<"rule_id">> := RuleId, <<"network_id">> := NetworkId,
<<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto,
<<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}) ->
rule_ets:insert(#rule{
rule_id = RuleId,
network_id = NetworkId,
src_policy_id = SrcPolicyId,
dst_policy_id = DstPolicyId,
proto = Proto,
port = Port,
action = format_action(Action),
created_at = CreatedAt
}),
RuleIds = lists:map(fun(R = #{<<"rule_id">> := RuleId}) ->
rule_ets:insert(R),
RuleId
end, Rows),
LastRuleOffset = lists:max(RuleIds),
@ -120,9 +107,3 @@ sync_rule0(RuleIdOffset) ->
false ->
ok
end.
-spec format_action(binary()) -> atom().
format_action(<<"allow">>) ->
allow;
format_action(_) ->
deny.