This commit is contained in:
anlicheng 2025-04-07 15:33:55 +08:00
parent b725499dfe
commit 49d251f95f
5 changed files with 152 additions and 25 deletions

View File

@ -98,17 +98,7 @@ handle_request("POST", "/api/device_token", _, #{<<"user_id">> := UserId, <<"tok
%% %%
handle_request("POST", "/api/push", _, PushList) -> handle_request("POST", "/api/push", _, PushList) ->
Title = <<"动物狂响曲"/utf8>>, {ok, 200, dimension_utils:json_data(<<"OK">>)};
Body = <<"第7集(校服与被毛更深处),bilibili已更新"/utf8>>,
push(DeviceToken, Title, Body).
case mnesia_device_token:insert(UserId, Token, dimension_utils:current_time()) of
ok ->
{ok, 200, dimension_utils:json_data(<<"OK">>)};
{error, Reason} ->
lager:notice("[api_handler] insert user_id: ~p, token: ~p, error: ~p", [UserId, Token, Reason]),
{ok, 200, dimension_utils:json_error(-1, <<"更新token失败"/utf8>>)}
end;
handle_request(_, Path, _, _) -> handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path), Path1 = list_to_binary(Path),

View File

@ -22,7 +22,7 @@
-define(SERVER, ?MODULE). -define(SERVER, ?MODULE).
-record(state, { -record(state, {
task_pids = []
}). }).
%%%=================================================================== %%%===================================================================
@ -61,7 +61,11 @@ start_link() ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([]) -> init([]) ->
{ok, #state{}}. Size = get_pool_size(),
TaskPids = lists:map(fun(_) -> dimension_task:start_link() end, lists:seq(1, Size)),
lager:debug("[dimension_apn_pusher] pool_size: ~p, task_pids: ~p", [Size, TaskPids]),
{ok, #state{task_pids = TaskPids}}.
%% @private %% @private
%% @doc Handling call messages %% @doc Handling call messages
@ -82,16 +86,8 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({push, NotificationList}, State = #state{}) -> handle_cast({push, NotificationList}, State = #state{task_pids = TaskPids}) ->
lists:foreach(fun(#{<<"user_id">> := UserId, <<"title">> := Title, <<"body">> := Body}) -> dispatch(NotificationList, TaskPids),
case mnesia_device_token:get_token(UserId) of
error ->
ok;
{ok, DeviceToken} ->
poolboy:transaction(apns_pool, fun(WorkerPid) -> dimension_apn_worker:push(WorkerPid, DeviceToken, Title, Body) end)
end
end, NotificationList),
{noreply, State}. {noreply, State}.
%% @private %% @private
@ -124,3 +120,31 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%%=================================================================== %%%===================================================================
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
-spec get_pool_size() -> Size :: integer().
get_pool_size() ->
{ok, Pools} = application:get_env(dimension_apn, pools),
case lists:keyfind(apns_pool, 1, Pools) of
{_, Props, _} ->
proplists:get_value(size, Props, 1);
false ->
1
end.
-spec dispatch(NotificationList :: list(), TaskPids :: list()) -> ok.
dispatch(NotificationList, TaskPids) when is_list(NotificationList), is_list(TaskPids) ->
dispatch0(NotificationList, TaskPids, TaskPids).
dispatch0([], _, _) ->
ok;
dispatch0(NotificationList, [], TaskPids) ->
dispatch0(NotificationList, TaskPids, TaskPids);
dispatch0([#{<<"user_id">> := UserId, <<"title">> := Title, <<"body">> := Body}|Tail], [TaskPid|RestTaskPids], TaskPids) ->
case mnesia_device_token:get_token(UserId) of
error ->
ok;
{ok, DeviceToken} ->
dimension_task:submit(TaskPid, fun() ->
poolboy:transaction(apns_pool, fun(WorkerPid) -> dimension_apn_worker:push(WorkerPid, DeviceToken, Title, Body) end)
end)
end,
dispatch0(Tail, RestTaskPids, TaskPids).

View File

@ -29,7 +29,14 @@ init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
ChildSpecs = [ ChildSpecs = [
#{
id => 'dimension_apn_pusher',
start => {'dimension_apn_pusher', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['dimension_apn_pusher']
}
], ],
{ok, {SupFlags, pools() ++ ChildSpecs}}. {ok, {SupFlags, pools() ++ ChildSpecs}}.

View File

@ -0,0 +1,106 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 4 2025 15:09
%%%-------------------------------------------------------------------
-module(dimension_task).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([submit/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
}).
%%%===================================================================
%%% API
%%%===================================================================
submit(Pid, Task) when is_pid(Pid) ->
gen_server:cast(Pid, {submit, Task}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link(?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({submit, Task}, State = #state{}) ->
%%
Task(),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -12,7 +12,7 @@
{pools, [ {pools, [
%% 推送设置 %% 推送设置
{apns_pool, {apns_pool,
[{size, 1}, {max_overflow, 1}, {worker_module, dimension_apn_worker}], [{size, 10}, {max_overflow, 20}, {worker_module, dimension_apn_worker}],
[ [
{connection_opts, [ {connection_opts, [
{apple_host, "api.sandbox.push.apple.com"}, {apple_host, "api.sandbox.push.apple.com"},