fix
This commit is contained in:
parent
46a4d1e015
commit
9da280de08
@ -1,6 +1,6 @@
|
||||
-module(dns_pending_wheel).
|
||||
|
||||
-export([start/0, put/2, get/1, delete/1]).
|
||||
-export([start/0, insert/2, lookup/1, delete/1]).
|
||||
|
||||
-define(TTL, 5).
|
||||
-define(TICK_MS, 1000).
|
||||
@ -11,26 +11,21 @@
|
||||
%%% =====================================================
|
||||
|
||||
start() ->
|
||||
ets:new(dns_pending_data, [ordered_set, public, named_table]),
|
||||
ets:new(dns_pending_wheel, [bag, public, named_table]),
|
||||
ets:new(dns_pending_data, [ordered_set, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
|
||||
ets:new(dns_pending_wheel, [bag, public, named_table, {read_concurrency, true}, {write_concurrency, true}]),
|
||||
start_scanner().
|
||||
|
||||
-spec put(Key :: any(), Val :: any()) -> ok.
|
||||
put(Key, Val) ->
|
||||
-spec insert(Key :: any(), Val :: any()) -> ok.
|
||||
insert(Key, Val) ->
|
||||
Tick = now_tick(),
|
||||
Slot = Tick rem ?WHEEL_SIZE,
|
||||
ets:insert(dns_pending_data, {Key, {Val, Tick}}),
|
||||
ets:insert(dns_pending_wheel, {Slot, {Key, Tick}}),
|
||||
ok.
|
||||
|
||||
-spec get(Key :: any()) -> error | {ok, Val :: any()}.
|
||||
get(Key) ->
|
||||
case ets:lookup(dns_pending_data, Key) of
|
||||
[{Key, {Val, _Tick}}] ->
|
||||
{ok, Val};
|
||||
[] ->
|
||||
error
|
||||
end.
|
||||
-spec lookup(Key :: any()) -> [term()].
|
||||
lookup(Key) ->
|
||||
ets:lookup(dns_pending_data, Key).
|
||||
|
||||
-spec delete(Key :: any()) -> ok.
|
||||
delete(Key) ->
|
||||
|
||||
@ -21,11 +21,10 @@
|
||||
-export([forward/5]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(REQUEST_TTL, 5000).
|
||||
|
||||
-record(state, {
|
||||
socket,
|
||||
tid,
|
||||
idx :: integer(),
|
||||
dns_servers = []
|
||||
}).
|
||||
|
||||
@ -53,12 +52,9 @@ start_link(Args) when is_list(Args) ->
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, DnsServers} = application:get_env(sdlan, public_dns_servers),
|
||||
|
||||
{ok, Sock} = gen_udp:open(0, [binary, {active, true}]),
|
||||
%% 通过ets来保存映射关系
|
||||
Tid = ets:new(random_table(), [set, {read_concurrency, true}, {write_concurrency, true}, private]),
|
||||
|
||||
{ok, #state{socket = Sock, tid = Tid, dns_servers = DnsServers}}.
|
||||
Idx = erlang:unique_integer([monotonic, positive]),
|
||||
{ok, #state{socket = Sock, idx = Idx, dns_servers = DnsServers}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
@ -79,15 +75,15 @@ handle_call(_Request, _From, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({forward, ReceiverPid, Ref, Request, #dns_message{id = TxId, questions = [#dns_query{name = QName, type = QType, class = QClass}|_]}}, State = #state{socket = Socket, tid = Tid, dns_servers = DnsServers}) ->
|
||||
Keys = lists:foldl(fun({DnsIp, DnsPort}, Acc) ->
|
||||
handle_cast({forward, ReceiverPid, Ref, Request, #dns_message{id = TxId, questions = [#dns_query{name = QName, type = QType, class = QClass}|_]}},
|
||||
State = #state{socket = Socket, idx = Idx, dns_servers = DnsServers}) ->
|
||||
|
||||
lists:foreach(fun({DnsIp, DnsPort}) ->
|
||||
ok = gen_udp:send(Socket, DnsIp, DnsPort, Request),
|
||||
Key = {TxId, DnsIp, DnsPort, QName, QType, QClass},
|
||||
Key = {Idx, TxId, DnsIp, DnsPort, QName, QType, QClass},
|
||||
lager:debug("[dns_resolver] key: ~p, send to: ~p, packet: ~p", [Key, {DnsIp, DnsPort}, Request]),
|
||||
true = ets:insert(Tid, {Key, Ref, ReceiverPid}),
|
||||
[Key|Acc]
|
||||
end, [], DnsServers),
|
||||
erlang:start_timer(?REQUEST_TTL, self(), {clean_ticker, Keys}),
|
||||
dns_pending_wheel:insert(Key, {Ref, ReceiverPid})
|
||||
end, DnsServers),
|
||||
|
||||
{noreply, State}.
|
||||
|
||||
@ -97,21 +93,18 @@ handle_cast({forward, ReceiverPid, Ref, Request, #dns_message{id = TxId, questio
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({udp, Socket, TargetIp, TargetPort, Resp}, State = #state{tid = Tid, socket = Socket}) ->
|
||||
handle_info({udp, Socket, TargetIp, TargetPort, Resp}, State = #state{socket = Socket, idx = Idx}) ->
|
||||
try dns:decode_message(Resp) of
|
||||
#dns_message{id = TxId, questions = [#dns_query{name = QName, type = QType, class = QClass}|_]} ->
|
||||
Key = {TxId, TargetIp, TargetPort, QName, QType, QClass},
|
||||
Records = ets:take(Tid, Key),
|
||||
Key = {Idx, TxId, TargetIp, TargetPort, QName, QType, QClass},
|
||||
Records = dns_pending_wheel:lookup(Key),
|
||||
dns_pending_wheel:delete(Key),
|
||||
resolver_reply(Records, Resp);
|
||||
_ ->
|
||||
ok
|
||||
catch error:_ ->
|
||||
ok
|
||||
end,
|
||||
{noreply, State};
|
||||
|
||||
handle_info({timeout, _, {clean_ticker, Keys}}, State = #state{tid = Tid}) ->
|
||||
lists:foreach(fun(Key) -> ets:delete(Tid, Key) end, Keys),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
@ -136,17 +129,13 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec random_table() -> atom().
|
||||
random_table() ->
|
||||
list_to_atom("udp_ets:" ++ integer_to_list(erlang:unique_integer([monotonic, positive]))).
|
||||
|
||||
-spec resolver_reply(list(), Resp :: binary()) -> no_return().
|
||||
resolver_reply([{_, Ref, ReceiverPid}], Resp) when is_binary(Resp) ->
|
||||
resolver_reply(Records, Resp) when is_binary(Resp) ->
|
||||
lists:foreach(fun({_, {Ref, ReceiverPid}}) ->
|
||||
case is_process_alive(ReceiverPid) of
|
||||
true ->
|
||||
ReceiverPid ! {dns_resolver_reply, Ref, Resp};
|
||||
false ->
|
||||
ok
|
||||
end;
|
||||
resolver_reply(_, _) ->
|
||||
ok.
|
||||
end
|
||||
end, Records).
|
||||
Loading…
x
Reference in New Issue
Block a user