This commit is contained in:
anlicheng 2026-02-28 16:35:15 +08:00
parent 73f3d58286
commit 2c4ce65e7b
6 changed files with 166 additions and 13 deletions

View File

@ -2,14 +2,15 @@
-include("policy.hrl"). -include("policy.hrl").
-export([init/0]). -export([init/0]).
-export([lookup/1, insert/1]). -export([get_policies/1, insert/1]).
-define(TABLE, identity_policy_ets_table). -define(TABLE, identity_policy_ets_table).
init() -> init() ->
ets:new(?TABLE, [named_table, bag, public, {keypos, 2}, {read_concurrency, true}]). ets:new(?TABLE, [named_table, bag, public, {keypos, 2}, {read_concurrency, true}]).
lookup(IdentityId) when is_integer(IdentityId) -> -spec get_policies(IdentityId :: integer()) -> [PolicyId :: integer()].
get_policies(IdentityId) when is_integer(IdentityId) ->
Records = ets:lookup(?TABLE, IdentityId), Records = ets:lookup(?TABLE, IdentityId),
lists:map(fun(#identity_policy{policy_id = PolicyId}) -> PolicyId end, Records). lists:map(fun(#identity_policy{policy_id = PolicyId}) -> PolicyId end, Records).

View File

@ -0,0 +1,21 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2026, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 28. 2 2026 15:55
%%%-------------------------------------------------------------------
-module(policy).
-author("anlicheng").
%% API
-export([]).
get_rules(SrcIdentityId, DstIdentityId) when is_integer(SrcIdentityId), is_integer(DstIdentityId) ->
SrcPolicies = identity_policy_ets:get_policies(SrcIdentityId),
DstPolicies = identity_policy_ets:get_policies(DstIdentityId),
ok.

View File

@ -2,15 +2,18 @@
-include("policy.hrl"). -include("policy.hrl").
-export([init/0]). -export([init/0]).
-export([lookup/1, insert/1]). -export([insert/1, get_rules/2]).
-define(TABLE, policy_ets_table).
init() -> init() ->
ets:new(?TABLE, [named_table, ordered_set, public, {keypos, 2}, {read_concurrency, true}]). ets:new(rule_table, [named_table, ordered_set, public, {keypos, 2}, {read_concurrency, true}]),
ets:new(rule_index, [named_table, set, public, {read_concurrency, true}]).
lookup(PolicyId) when is_integer(PolicyId) -> get_rules(SrcPolicyIds, DstPolicyIds) when is_list(SrcPolicyIds), is_list(DstPolicyIds) ->
Records = ets:lookup(?TABLE, PolicyId). MatchKeys = [{S, D, '_'} || S <- SrcPolicyIds, D <- DstPolicyIds],
Records = lists:flatmap(fun({S, D, _}) -> ets:match_object(rule_index, {S, D, '_'}) end, MatchKeys),
Rules = lists:flatmap(fun({_, _, RuleId}) -> ets:lookup(rule_table, RuleId) end, Records),
{ok, Rules}.
insert(Rule = #rule{}) -> insert(Rule = #rule{src_policy_id = SrcPolicyId, dst_policy_id = DstPolicyId, rule_id = RuleId}) ->
true = ets:insert(?TABLE, Rule). ets:insert(rule_table, Rule),
ets:insert(rule_index, {SrcPolicyId, DstPolicyId, RuleId}).

View File

@ -79,6 +79,15 @@ init([]) ->
shutdown => 2000, shutdown => 2000,
type => worker, type => worker,
modules => ['sdlan_quic_server'] modules => ['sdlan_quic_server']
},
#{
id => sdlan_sync_mysql,
start => {sdlan_sync_mysql, start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['sdlan_sync_mysql']
} }
], ],

View File

@ -0,0 +1,122 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2026, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 11. 2 2026 23:00
%%%-------------------------------------------------------------------
-module(sdlan_sync_mysql).
-author("anlicheng").
-include("policy.hrl").
-behaviour(gen_statem).
%%
-define(PING_TICKER, 15000).
%%
%%
-define(NAK_NETWORK_FAULT, 4).
%%
-define(NAK_INTERNAL_FAULT, 5).
%% API
-export([start_link/0]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
-record(state, {
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link() ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([]) ->
{ok, initializing, #state{}, [{next_event, internal, do_sync}]}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(internal, do_sync, initializing, State=#state{}) ->
sync_identity_policy(),
sync_rule(),
{next_state, initialized, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, _State) ->
logger:debug("[sdlan_sync_mysql] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
sync_identity_policy() ->
{ok, Rows} = mysql_pool:get_all(mysql_sdlan, <<"select * from identity_policy">>),
lists:map(fun(#{<<"identity_id">> := IdentityId, <<"policy_id">> := PolicyId}) ->
identity_policy_ets:insert(#identity_policy{identity_id = IdentityId, policy_id = PolicyId})
end, Rows).
sync_rule() ->
sync_rule0(0).
sync_rule0(RuleIdOffset) ->
{ok, Rows} = mysql_pool:get_all(mysql_sdlan, <<"select * from rule where rule_id > ? order by rule_id asc limit 5000">>, [RuleIdOffset]),
case length(Rows) > 0 of
true ->
RuleIds = lists:map(fun(#{<<"rule_id">> := RuleId, <<"policy_id">> := PolicyId, <<"network_id">> := NetworkId,
<<"src_policy_id">> := SrcPolicyId, <<"dst_policy_id">> := DstPolicyId, <<"proto">> := Proto,
<<"port">> := Port, <<"action">> := Action, <<"created_at">> := CreatedAt}) ->
identity_policy_ets:insert(#rule{
rule_id = RuleId,
policy_id = PolicyId,
network_id = NetworkId,
src_policy_id = SrcPolicyId,
dst_policy_id = DstPolicyId,
proto = Proto,
port = Port,
action = Action,
created_at = CreatedAt
}),
RuleId
end, Rows),
LastRuleOffset = lists:max(RuleIds),
sync_rule0(LastRuleOffset);
false ->
ok
end.

View File

@ -10,14 +10,12 @@ CREATE TABLE `identity` (
KEY `idx_network_id` (`network_id`) KEY `idx_network_id` (`network_id`)
) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; ) ENGINE=InnoDB AUTO_INCREMENT=4 DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE `identity_policy` ( CREATE TABLE `identity_policy` (
`identity_id` int NOT NULL, `identity_id` int NOT NULL,
`policy_id` int NOT NULL, `policy_id` int NOT NULL,
PRIMARY KEY (`identity_id`,`policy_id`) PRIMARY KEY (`identity_id`,`policy_id`)
) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci; ) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4 COLLATE=utf8mb4_unicode_ci;
CREATE TABLE `policy` ( CREATE TABLE `policy` (
`policy_id` int NOT NULL AUTO_INCREMENT, `policy_id` int NOT NULL AUTO_INCREMENT,
`network_id` int NOT NULL, `network_id` int NOT NULL,
@ -30,7 +28,6 @@ CREATE TABLE `policy` (
CREATE TABLE `rule` ( CREATE TABLE `rule` (
`rule_id` int NOT NULL AUTO_INCREMENT, `rule_id` int NOT NULL AUTO_INCREMENT,
`policy_id` int NOT NULL,
`network_id` int NOT NULL, `network_id` int NOT NULL,
`access_rule_id` int NOT NULL, `access_rule_id` int NOT NULL,
`src_policy_id` int NOT NULL, `src_policy_id` int NOT NULL,