diff --git a/apps/sdlan/src/policy/identity_policy_ets.erl b/apps/sdlan/src/policy/identity_policy_ets.erl index 357e91a..37f0881 100644 --- a/apps/sdlan/src/policy/identity_policy_ets.erl +++ b/apps/sdlan/src/policy/identity_policy_ets.erl @@ -2,7 +2,7 @@ -include("policy.hrl"). -export([init/0]). --export([get_policies/1, insert/1]). +-export([get_policies/1, insert/1, delete/1, update/2]). init() -> ets:new(identity_policy, [named_table, bag, public, {keypos, 2}, {read_concurrency, true}]). @@ -12,5 +12,18 @@ get_policies(IdentityId) when is_integer(IdentityId) -> Records = ets:lookup(identity_policy, IdentityId), lists:map(fun(#identity_policy{policy_id = PolicyId}) -> PolicyId end, Records). -insert(IdentityPolicy = #identity_policy{}) -> - true = ets:insert(identity_policy, IdentityPolicy). \ No newline at end of file +insert(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) -> + insert(#identity_policy{identity_id = IdentityId, policy_id = PolicyId}); +insert(IdentityPolicy=#identity_policy{}) -> + true = ets:insert(identity_policy, IdentityPolicy). + +delete(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) -> + ets:delete_object(identity_policy, #identity_policy{identity_id = IdentityId, policy_id = PolicyId}); +delete(IdentityPolicy = #identity_policy{}) -> + true = ets:delete_object(identity_policy, IdentityPolicy). + +update(NewData=#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}, OldData) -> + %% 清理老的数据 + #{<<"identity_id">> := OldIdentityId, <<"policy_id">> := OldPolicyId} = maps:merge(NewData, OldData), + ets:delete_object(identity_policy, #identity_policy{identity_id = OldIdentityId, policy_id = OldPolicyId}), + ets:insert(identity_policy, #identity_policy{identity_id = IdentityId, policy_id = PolicyId}). \ No newline at end of file diff --git a/apps/sdlan/src/policy/maxwell_redis_channel.erl b/apps/sdlan/src/policy/maxwell_redis_channel.erl index f0988ce..98af208 100644 --- a/apps/sdlan/src/policy/maxwell_redis_channel.erl +++ b/apps/sdlan/src/policy/maxwell_redis_channel.erl @@ -8,6 +8,7 @@ %%%------------------------------------------------------------------- -module(maxwell_redis_channel). -author("licheng5"). +-include("policy.hrl"). %% API -export([start/1, loop/1]). @@ -52,14 +53,39 @@ loop(State=#state{socket = Socket, command = Command = #command{data = Data}}) - exit(normal) end. -%% 处理请求命令 +%% PING命令 handle_command([<<"PING">>]) -> {reply, encode({single_line, <<"PONG">>})}; +handle_command([<<"PUBLISH">>, _Channel, Msg]) -> + case catch jiffy:decode(Msg, [return_maps]) of + M when is_map(M) -> + handle_data(M); + _ -> + ok + end, + {reply, encode(1)}; + handle_command(Args) -> logger:debug("[maxwell_redis_channel] args: ~p", [Args]), {reply, encode({error, <<"Unsuported Command">>})}. +handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"insert">>, <<"data">> := Data}) -> + identity_policy_ets:insert(Data); +handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"delete">>, <<"data">> := Data}) -> + identity_policy_ets:delete(Data); +handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"identity_policy">>, <<"type">> := <<"update">>, <<"data">> := Data, <<"old">> := Old}) -> + identity_policy_ets:update(Data, Old); +%% 处理rule +handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"rule">>, <<"type">> := <<"insert">>, <<"data">> := Data}) -> + identity_policy_ets:insert(Data); +handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"rule">>, <<"type">> := <<"delete">>, <<"data">> := Data}) -> + identity_policy_ets:delete(Data); +handle_data(#{<<"database">> := <<"punchnet_v2">>, <<"table">> := <<"rule">>, <<"type">> := <<"update">>, <<"data">> := Data, <<"old">> := Old}) -> + identity_policy_ets:update(Data, Old); +handle_data(_Json) -> + ok. + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/sdlan/src/policy/rule_ets.erl b/apps/sdlan/src/policy/rule_ets.erl index 46d220f..9c99088 100644 --- a/apps/sdlan/src/policy/rule_ets.erl +++ b/apps/sdlan/src/policy/rule_ets.erl @@ -2,7 +2,7 @@ -include("policy.hrl"). -export([init/0]). --export([insert/1, get_rules/2]). +-export([insert/1, get_rules/2, delete/1, update/2]). init() -> ets:new(rule_table, [named_table, ordered_set, public, {keypos, 2}, {read_concurrency, true}]), @@ -25,7 +25,50 @@ get_rules(SrcPolicyIds, DstPolicyIds) when is_list(SrcPolicyIds), is_list(DstPol {ok, sets:to_list(S)}. -insert(Rule = #rule{src_policy_id = SrcPolicyId, dst_policy_id = DstPolicyId, rule_id = RuleId}) -> +insert(#{<<"rule_id">> := RuleId, <<"network_id">> := NetworkId, + <<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto, + <<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}) -> + Rule = #rule{ + rule_id = RuleId, + network_id = NetworkId, + src_policy_id = SrcPolicyId, + dst_policy_id = DstPolicyId, + proto = Proto, + port = Port, + action = format_action(Action), + created_at = CreatedAt + }, ets:insert(rule_table, Rule), - logger:debug("rule_index: ~p", [{SrcPolicyId, DstPolicyId, RuleId}]), - ets:insert(rule_index, {SrcPolicyId, DstPolicyId, RuleId}). \ No newline at end of file + ets:insert(rule_index, {SrcPolicyId, DstPolicyId, RuleId}). + +update(NewData = #{<<"rule_id">> := RuleId, <<"network_id">> := NetworkId, + <<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto, + <<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}, OldData) -> + + %% rule_id是主键,直接覆盖 + Rule = #rule{ + rule_id = RuleId, + network_id = NetworkId, + src_policy_id = SrcPolicyId, + dst_policy_id = DstPolicyId, + proto = Proto, + port = Port, + action = format_action(Action), + created_at = CreatedAt + }, + ets:insert(rule_table, Rule), + %% index老的数据可能要清理掉, 用就的数据覆盖新的数据,得到的是Old的Record + #{<<"src_policy_id">> := OldSrcPolicyId, <<"dst_policy_id">> := OldDstPolicyId} = maps:merge(NewData, OldData), + ets:delete_object(rule_index, {OldSrcPolicyId, OldDstPolicyId, RuleId}), + %% 建立信息的索引 + ets:insert(rule_index, {SrcPolicyId, DstPolicyId, RuleId}). + +delete(#{<<"rule_id">> := RuleId, <<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId}) -> + ets:delete(rule_table, RuleId), + ets:delete_object(rule_index, {SrcPolicyId, DstPolicyId, RuleId}). + +-spec format_action(binary()) -> atom(). +format_action(<<"allow">>) -> + allow; +format_action(_) -> + deny. \ No newline at end of file diff --git a/apps/sdlan/src/sdlan_sync_mysql.erl b/apps/sdlan/src/sdlan_sync_mysql.erl index cb55679..560ef4a 100644 --- a/apps/sdlan/src/sdlan_sync_mysql.erl +++ b/apps/sdlan/src/sdlan_sync_mysql.erl @@ -89,9 +89,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> sync_identity_policy() -> {ok, Rows} = mysql_pool:get_all(mysql_sdlan, <<"select * from identity_policy">>), - lists:map(fun(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) -> - identity_policy_ets:insert(#identity_policy{identity_id = IdentityId, policy_id = PolicyId}) - end, Rows). + lists:map(fun(R) -> identity_policy_ets:insert(R) end, Rows). sync_rule() -> sync_rule0(0). @@ -100,29 +98,12 @@ sync_rule0(RuleIdOffset) -> logger:debug("rule rows: ~p", [Rows]), case length(Rows) > 0 of true -> - RuleIds = lists:map(fun(#{<<"rule_id">> := RuleId, <<"network_id">> := NetworkId, - <<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto, - <<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}) -> - rule_ets:insert(#rule{ - rule_id = RuleId, - network_id = NetworkId, - src_policy_id = SrcPolicyId, - dst_policy_id = DstPolicyId, - proto = Proto, - port = Port, - action = format_action(Action), - created_at = CreatedAt - }), + RuleIds = lists:map(fun(R = #{<<"rule_id">> := RuleId}) -> + rule_ets:insert(R), RuleId end, Rows), LastRuleOffset = lists:max(RuleIds), sync_rule0(LastRuleOffset); false -> ok - end. - --spec format_action(binary()) -> atom(). -format_action(<<"allow">>) -> - allow; -format_action(_) -> - deny. \ No newline at end of file + end. \ No newline at end of file