diff --git a/apps/sdlan/src/dns_proxy/dns_pending_wheel.erl b/apps/sdlan/src/dns_proxy/dns_pending_wheel.erl index 622aea1..ad72903 100644 --- a/apps/sdlan/src/dns_proxy/dns_pending_wheel.erl +++ b/apps/sdlan/src/dns_proxy/dns_pending_wheel.erl @@ -1,6 +1,6 @@ -module(dns_pending_wheel). --export([start/0, put/2, get/1, delete/1]). +-export([start/0, insert/2, lookup/1, delete/1]). -define(TTL, 5). -define(TICK_MS, 1000). @@ -11,26 +11,21 @@ %%% ===================================================== start() -> - ets:new(dns_pending_data, [ordered_set, public, named_table]), - ets:new(dns_pending_wheel, [bag, public, named_table]), + ets:new(dns_pending_data, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]), + ets:new(dns_pending_wheel, [bag, public, named_table, {read_concurrency, true}, {write_concurrency, true}]), start_scanner(). --spec put(Key :: any(), Val :: any()) -> ok. -put(Key, Val) -> +-spec insert(Key :: any(), Val :: any()) -> ok. +insert(Key, Val) -> Tick = now_tick(), Slot = Tick rem ?WHEEL_SIZE, ets:insert(dns_pending_data, {Key, {Val, Tick}}), ets:insert(dns_pending_wheel, {Slot, {Key, Tick}}), ok. --spec get(Key :: any()) -> error | {ok, Val :: any()}. -get(Key) -> - case ets:lookup(dns_pending_data, Key) of - [{Key, {Val, _Tick}}] -> - {ok, Val}; - [] -> - error - end. +-spec lookup(Key :: any()) -> [term()]. +lookup(Key) -> + ets:lookup(dns_pending_data, Key). -spec delete(Key :: any()) -> ok. delete(Key) -> diff --git a/apps/sdlan/src/dns_proxy/dns_resolver.erl b/apps/sdlan/src/dns_proxy/dns_resolver.erl index 9abd914..7058800 100644 --- a/apps/sdlan/src/dns_proxy/dns_resolver.erl +++ b/apps/sdlan/src/dns_proxy/dns_resolver.erl @@ -21,11 +21,10 @@ -export([forward/5]). -define(SERVER, ?MODULE). --define(REQUEST_TTL, 5000). -record(state, { socket, - tid, + idx :: integer(), dns_servers = [] }). @@ -53,12 +52,9 @@ start_link(Args) when is_list(Args) -> {stop, Reason :: term()} | ignore). init([]) -> {ok, DnsServers} = application:get_env(sdlan, public_dns_servers), - {ok, Sock} = gen_udp:open(0, [binary, {active, true}]), - %% 通过ets来保存映射关系 - Tid = ets:new(random_table(), [set, {read_concurrency, true}, {write_concurrency, true}, private]), - - {ok, #state{socket = Sock, tid = Tid, dns_servers = DnsServers}}. + Idx = erlang:unique_integer([monotonic, positive]), + {ok, #state{socket = Sock, idx = Idx, dns_servers = DnsServers}}. %% @private %% @doc Handling call messages @@ -79,15 +75,15 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ReceiverPid, Ref, Request, #dns_message{id = TxId, questions = [#dns_query{name = QName, type = QType, class = QClass}|_]}}, State = #state{socket = Socket, tid = Tid, dns_servers = DnsServers}) -> - Keys = lists:foldl(fun({DnsIp, DnsPort}, Acc) -> +handle_cast({forward, ReceiverPid, Ref, Request, #dns_message{id = TxId, questions = [#dns_query{name = QName, type = QType, class = QClass}|_]}}, + State = #state{socket = Socket, idx = Idx, dns_servers = DnsServers}) -> + + lists:foreach(fun({DnsIp, DnsPort}) -> ok = gen_udp:send(Socket, DnsIp, DnsPort, Request), - Key = {TxId, DnsIp, DnsPort, QName, QType, QClass}, + Key = {Idx, TxId, DnsIp, DnsPort, QName, QType, QClass}, lager:debug("[dns_resolver] key: ~p, send to: ~p, packet: ~p", [Key, {DnsIp, DnsPort}, Request]), - true = ets:insert(Tid, {Key, Ref, ReceiverPid}), - [Key|Acc] - end, [], DnsServers), - erlang:start_timer(?REQUEST_TTL, self(), {clean_ticker, Keys}), + dns_pending_wheel:insert(Key, {Ref, ReceiverPid}) + end, DnsServers), {noreply, State}. @@ -97,21 +93,18 @@ handle_cast({forward, ReceiverPid, Ref, Request, #dns_message{id = TxId, questio {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({udp, Socket, TargetIp, TargetPort, Resp}, State = #state{tid = Tid, socket = Socket}) -> +handle_info({udp, Socket, TargetIp, TargetPort, Resp}, State = #state{socket = Socket, idx = Idx}) -> try dns:decode_message(Resp) of #dns_message{id = TxId, questions = [#dns_query{name = QName, type = QType, class = QClass}|_]} -> - Key = {TxId, TargetIp, TargetPort, QName, QType, QClass}, - Records = ets:take(Tid, Key), + Key = {Idx, TxId, TargetIp, TargetPort, QName, QType, QClass}, + Records = dns_pending_wheel:lookup(Key), + dns_pending_wheel:delete(Key), resolver_reply(Records, Resp); _ -> ok catch error:_ -> ok end, - {noreply, State}; - -handle_info({timeout, _, {clean_ticker, Keys}}, State = #state{tid = Tid}) -> - lists:foreach(fun(Key) -> ets:delete(Tid, Key) end, Keys), {noreply, State}. %% @private @@ -136,17 +129,13 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec random_table() -> atom(). -random_table() -> - list_to_atom("udp_ets:" ++ integer_to_list(erlang:unique_integer([monotonic, positive]))). - -spec resolver_reply(list(), Resp :: binary()) -> no_return(). -resolver_reply([{_, Ref, ReceiverPid}], Resp) when is_binary(Resp) -> - case is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {dns_resolver_reply, Ref, Resp}; - false -> - ok - end; -resolver_reply(_, _) -> - ok. \ No newline at end of file +resolver_reply(Records, Resp) when is_binary(Resp) -> + lists:foreach(fun({_, {Ref, ReceiverPid}}) -> + case is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {dns_resolver_reply, Ref, Resp}; + false -> + ok + end + end, Records). \ No newline at end of file