This commit is contained in:
anlicheng 2025-04-16 16:12:55 +08:00
parent ed5c6bc3cf
commit 24019f4ceb
5 changed files with 88 additions and 313 deletions

View File

@ -96,7 +96,9 @@ handle_request("POST", "/api/device_token", _, #{<<"user_id">> := UserId, <<"tok
end; end;
%% %%
handle_request("POST", "/api/push", _, PushList) -> handle_request("POST", "/api/push", _, Notifications) ->
{ok, Pid} = dimension_apn_worker:start_link(),
dimension_apn_worker:push(Pid, Notifications),
{ok, 200, dimension_utils:json_data(<<"success">>)}; {ok, 200, dimension_utils:json_data(<<"success">>)};

View File

@ -1,155 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 4 2025 14:53
%%%-------------------------------------------------------------------
-module(dimension_apn_pusher).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([push/1]).
-export([test/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
task_pids = []
}).
%%%===================================================================
%%% API
%%%===================================================================
test() ->
DeviceToken = <<"7fd3df4063670d945490cdaf511dedff8c56679c5c504af946806f78d002e8cf">>,
Title = <<"动物狂响曲"/utf8>>,
Body = <<"第7集(校服与被毛更深处),bilibili已更新"/utf8>>,
push_with_token(DeviceToken, Title, Body).
-spec push(NotificationList :: list()) -> no_return().
push(NotificationList) when is_list(NotificationList) ->
gen_server:cast(?SERVER, {push, NotificationList}).
push_with_token(DeviceToken, Title, Body) when is_binary(DeviceToken), is_binary(Title), is_binary(Body) ->
gen_server:cast(?SERVER, {push_with_token, DeviceToken, Title, Body}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
Size = get_pool_size(),
TaskPids = lists:map(fun(_) ->
{ok, Pid} = dimension_task:start_link(),
Pid
end, lists:seq(1, Size)),
lager:debug("[dimension_apn_pusher] pool_size: ~p, task_pids: ~p", [Size, TaskPids]),
{ok, #state{task_pids = TaskPids}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({push, NotificationList}, State = #state{task_pids = TaskPids}) ->
dispatch(NotificationList, TaskPids),
{noreply, State};
handle_cast({push_with_token, DeviceToken, Title, Body}, State = #state{task_pids = [TaskPid|_]}) ->
dimension_task:submit(TaskPid, fun() ->
poolboy:transaction(apns_pool, fun(WorkerPid) -> dimension_apn_worker:push(WorkerPid, DeviceToken, Title, Body) end)
end),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec get_pool_size() -> Size :: integer().
get_pool_size() ->
{ok, Pools} = application:get_env(dimension_apn, pools),
case lists:keyfind(apns_pool, 1, Pools) of
{_, Props, _} ->
proplists:get_value(size, Props, 1);
false ->
1
end.
-spec dispatch(NotificationList :: list(), TaskPids :: list()) -> ok.
dispatch(NotificationList, TaskPids) when is_list(NotificationList), is_list(TaskPids) ->
dispatch0(NotificationList, TaskPids, TaskPids).
dispatch0([], _, _) ->
ok;
dispatch0(NotificationList, [], TaskPids) ->
dispatch0(NotificationList, TaskPids, TaskPids);
dispatch0([#{<<"user_id">> := UserId, <<"title">> := Title, <<"body">> := Body}|Tail], [TaskPid|RestTaskPids], TaskPids) ->
case mnesia_device_token:get_token(UserId) of
error ->
ok;
{ok, DeviceToken} ->
dimension_task:submit(TaskPid, fun() ->
poolboy:transaction(apns_pool, fun(WorkerPid) -> dimension_apn_worker:push(WorkerPid, DeviceToken, Title, Body) end)
end)
end,
dispatch0(Tail, RestTaskPids, TaskPids).

View File

@ -12,8 +12,9 @@
-behaviour(gen_server). -behaviour(gen_server).
%% API %% API
-export([start_link/1]). -export([start_link/0]).
-export([push/4]). -export([push/2]).
-export([test/0]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -29,15 +30,37 @@
%%% API %%% API
%%%=================================================================== %%%===================================================================
-spec push(Pid :: pid(), DeviceToken :: binary(), Title :: binary(), Body :: binary()) -> Response :: apns:response(). test() ->
push(Pid, DeviceToken, Title, Body) when is_pid(Pid), is_binary(DeviceToken), is_binary(Title), is_binary(Body) -> {ok, Pid} = start_link(),
gen_server:call(Pid, {push, DeviceToken, Title, Body}). UserId = <<"">>,
DeviceToken = <<"7fd3df4063670d945490cdaf511dedff8c56679c5c504af946806f78d002e8cf">>,
Title = <<"动物狂响曲"/utf8>>,
Body = <<"第7集(校服与被毛更深处),bilibili已更新"/utf8>>,
mnesia_device_token:insert(UserId, DeviceToken, dimension_utils:current_time()),
push(Pid, [
#{
<<"user_id">> => UserId,
<<"title">> => Title,
<<"body">> => Body,
<<"custom_data">> => #{
<<"target">> => <<"detail">>,
<<"params">> => #{
<<"drama_id">> => 1234
}
}
}
]).
-spec push(Pid :: pid(), Notifications :: [map()]) -> no_return().
push(Pid, Notifications) when is_pid(Pid), is_list(Notifications) ->
gen_server:cast(Pid, {push, Notifications}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Opts :: list()) -> -spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}). {ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Props) when is_list(Props) -> start_link() ->
gen_server:start_link(?MODULE, [Props], []). gen_server:start_link(?MODULE, [], []).
%%%=================================================================== %%%===================================================================
%%% gen_server callbacks %%% gen_server callbacks
@ -48,7 +71,8 @@ start_link(Props) when is_list(Props) ->
-spec(init(Args :: term()) -> -spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([Props]) -> init([]) ->
{ok, Props} = application:get_env(dimension_apn, apns),
ConnectionOpts0 = proplists:get_value(connection_opts, Props), ConnectionOpts0 = proplists:get_value(connection_opts, Props),
Headers0 = proplists:get_value(headers, Props), Headers0 = proplists:get_value(headers, Props),
@ -73,28 +97,8 @@ init([Props]) ->
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_call({push, DeviceToken, Title, Body}, _From, State = #state{apns_pid = ApnsPid, headers = Headers}) -> handle_call(_Request, _From, State) ->
Notification = #{ {reply, ok, State}.
aps => #{
alert => #{
title => Title,
% subtitle => <<"副标题"/utf8>>,
body => Body
},
'mutable-content' => 1,
%
sound => <<"default">>,
% App
badge => 1,
category => <<"HUB_MESSAGE">>
},
custom_data => #{
<<"drama_id">> => 305
}
},
PushResult = apns:push_notification(ApnsPid, DeviceToken, Notification, Headers),
lager:debug("[dimension_apn_pusher] push result is: ~p", [PushResult]),
{reply, PushResult, State}.
%% @private %% @private
%% @doc Handling cast messages %% @doc Handling cast messages
@ -102,6 +106,13 @@ handle_call({push, DeviceToken, Title, Body}, _From, State = #state{apns_pid = A
{noreply, NewState :: #state{}} | {noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} | {noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}). {stop, Reason :: term(), NewState :: #state{}}).
handle_cast({push, Notifications}, State = #state{apns_pid = ApnsPid, headers = Headers}) ->
lists:foreach(fun(#{<<"user_id">> := UserId, <<"title">> := Title, <<"body">> := Body, <<"custom_data">> := CustomData}) ->
PushResult = push_task(ApnsPid, UserId, Title, Body, CustomData, Headers),
lager:debug("[dimension_apn_pusher] push result is: ~p", [PushResult])
end, Notifications),
{noreply, State};
handle_cast(_Request, State) -> handle_cast(_Request, State) ->
{noreply, State}. {noreply, State}.
@ -136,6 +147,31 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
%%% Internal functions %%% Internal functions
%%%=================================================================== %%%===================================================================
push_task(ApnsPid, UserId, Title, Body, CustomData, Headers)
when is_pid(ApnsPid), is_binary(UserId), is_binary(Title), is_binary(Body), is_map(CustomData) ->
case mnesia_device_token:get_token(UserId) of
error ->
ok;
{ok, DeviceToken} ->
Notification = #{
aps => #{
alert => #{
title => Title,
body => Body
},
'mutable-content' => 1,
%
sound => <<"default">>,
% App
badge => 1
% category => <<"HUB_MESSAGE">>
},
custom_data => CustomData
},
apns:push_notification(ApnsPid, DeviceToken, Notification, Headers)
end.
-spec parse_headers(Headers :: list()) -> map(). -spec parse_headers(Headers :: list()) -> map().
parse_headers(Headers) -> parse_headers(Headers) ->
parse_headers(Headers, #{}). parse_headers(Headers, #{}).

View File

@ -1,106 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 4 2025 15:09
%%%-------------------------------------------------------------------
-module(dimension_task).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([submit/2]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
}).
%%%===================================================================
%%% API
%%%===================================================================
submit(Pid, Task) when is_pid(Pid) ->
gen_server:cast(Pid, {submit, Task}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link(?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({submit, Task}, State = #state{}) ->
%%
Task(),
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -9,27 +9,25 @@
{backlog, 10240} {backlog, 10240}
]}, ]},
%% 推送配置
{apns, [
{connection_opts, [
{apple_host, "api.sandbox.push.apple.com"},
{apple_port, 443},
{certfile, "aps_development.pem"},
{keyfile, "priv_key.pem"},
{type, cert},
{timeout, 5000}
]},
{headers, [
{apns_expiration, 0},
{apns_priority, 10},
{apns_topic, "com.jihe.dimensionhub"},
{apns_push_type, "alert"}
]}
]},
{pools, [ {pools, [
%% 推送设置
{apns_pool,
[{size, 1}, {max_overflow, 2}, {worker_module, dimension_apn_worker}],
[
{connection_opts, [
{apple_host, "api.sandbox.push.apple.com"},
{apple_port, 443},
{certfile, "aps_development.pem"},
{keyfile, "priv_key.pem"},
{type, cert},
{timeout, 5000}
]},
{headers, [
{apns_expiration, 0},
{apns_priority, 10},
{apns_topic, "com.jihe.dimensionhub"},
{apns_push_type, "alert"}
]}
]
}
]} ]}
]}, ]},