diff --git a/apps/dimension_apn/src/api_handler.erl b/apps/dimension_apn/src/api_handler.erl index f151fec..f8d1e72 100644 --- a/apps/dimension_apn/src/api_handler.erl +++ b/apps/dimension_apn/src/api_handler.erl @@ -98,17 +98,7 @@ handle_request("POST", "/api/device_token", _, #{<<"user_id">> := UserId, <<"tok %% 向用户推送数据 handle_request("POST", "/api/push", _, PushList) -> - Title = <<"动物狂响曲"/utf8>>, - Body = <<"第7集(校服与被毛更深处),bilibili已更新"/utf8>>, - push(DeviceToken, Title, Body). - - case mnesia_device_token:insert(UserId, Token, dimension_utils:current_time()) of - ok -> - {ok, 200, dimension_utils:json_data(<<"OK">>)}; - {error, Reason} -> - lager:notice("[api_handler] insert user_id: ~p, token: ~p, error: ~p", [UserId, Token, Reason]), - {ok, 200, dimension_utils:json_error(-1, <<"更新token失败"/utf8>>)} - end; + {ok, 200, dimension_utils:json_data(<<"OK">>)}; handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), diff --git a/apps/dimension_apn/src/dimension_apn_pusher.erl b/apps/dimension_apn/src/dimension_apn_pusher.erl index cad265d..0be37c2 100644 --- a/apps/dimension_apn/src/dimension_apn_pusher.erl +++ b/apps/dimension_apn/src/dimension_apn_pusher.erl @@ -22,7 +22,7 @@ -define(SERVER, ?MODULE). -record(state, { - + task_pids = [] }). %%%=================================================================== @@ -61,7 +61,11 @@ start_link() -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([]) -> - {ok, #state{}}. + Size = get_pool_size(), + TaskPids = lists:map(fun(_) -> dimension_task:start_link() end, lists:seq(1, Size)), + lager:debug("[dimension_apn_pusher] pool_size: ~p, task_pids: ~p", [Size, TaskPids]), + + {ok, #state{task_pids = TaskPids}}. %% @private %% @doc Handling call messages @@ -82,16 +86,8 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({push, NotificationList}, State = #state{}) -> - lists:foreach(fun(#{<<"user_id">> := UserId, <<"title">> := Title, <<"body">> := Body}) -> - case mnesia_device_token:get_token(UserId) of - error -> - ok; - {ok, DeviceToken} -> - poolboy:transaction(apns_pool, fun(WorkerPid) -> dimension_apn_worker:push(WorkerPid, DeviceToken, Title, Body) end) - end - end, NotificationList), - +handle_cast({push, NotificationList}, State = #state{task_pids = TaskPids}) -> + dispatch(NotificationList, TaskPids), {noreply, State}. %% @private @@ -124,3 +120,31 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions %%%=================================================================== + +-spec get_pool_size() -> Size :: integer(). +get_pool_size() -> + {ok, Pools} = application:get_env(dimension_apn, pools), + case lists:keyfind(apns_pool, 1, Pools) of + {_, Props, _} -> + proplists:get_value(size, Props, 1); + false -> + 1 + end. + +-spec dispatch(NotificationList :: list(), TaskPids :: list()) -> ok. +dispatch(NotificationList, TaskPids) when is_list(NotificationList), is_list(TaskPids) -> + dispatch0(NotificationList, TaskPids, TaskPids). +dispatch0([], _, _) -> + ok; +dispatch0(NotificationList, [], TaskPids) -> + dispatch0(NotificationList, TaskPids, TaskPids); +dispatch0([#{<<"user_id">> := UserId, <<"title">> := Title, <<"body">> := Body}|Tail], [TaskPid|RestTaskPids], TaskPids) -> + case mnesia_device_token:get_token(UserId) of + error -> + ok; + {ok, DeviceToken} -> + dimension_task:submit(TaskPid, fun() -> + poolboy:transaction(apns_pool, fun(WorkerPid) -> dimension_apn_worker:push(WorkerPid, DeviceToken, Title, Body) end) + end) + end, + dispatch0(Tail, RestTaskPids, TaskPids). \ No newline at end of file diff --git a/apps/dimension_apn/src/dimension_apn_sup.erl b/apps/dimension_apn/src/dimension_apn_sup.erl index ec6f169..f35aed7 100644 --- a/apps/dimension_apn/src/dimension_apn_sup.erl +++ b/apps/dimension_apn/src/dimension_apn_sup.erl @@ -29,7 +29,14 @@ init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [ - + #{ + id => 'dimension_apn_pusher', + start => {'dimension_apn_pusher', start_link, []}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['dimension_apn_pusher'] + } ], {ok, {SupFlags, pools() ++ ChildSpecs}}. diff --git a/apps/dimension_apn/src/dimension_task.erl b/apps/dimension_apn/src/dimension_task.erl new file mode 100644 index 0000000..b8abb4e --- /dev/null +++ b/apps/dimension_apn/src/dimension_task.erl @@ -0,0 +1,106 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 07. 4月 2025 15:09 +%%%------------------------------------------------------------------- +-module(dimension_task). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([submit/2]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, + code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +submit(Pid, Task) when is_pid(Pid) -> + gen_server:cast(Pid, {submit, Task}). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link(?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({submit, Task}, State = #state{}) -> + %% 执行任务 + Task(), + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/config/sys.config b/config/sys.config index 4aaa4d2..4991834 100644 --- a/config/sys.config +++ b/config/sys.config @@ -12,7 +12,7 @@ {pools, [ %% 推送设置 {apns_pool, - [{size, 1}, {max_overflow, 1}, {worker_module, dimension_apn_worker}], + [{size, 10}, {max_overflow, 20}, {worker_module, dimension_apn_worker}], [ {connection_opts, [ {apple_host, "api.sandbox.push.apple.com"},