add code
This commit is contained in:
parent
6f2a527b55
commit
5fd69f0347
@ -4,7 +4,9 @@
|
||||
{registered, []},
|
||||
{mod, {endpoint_app, []}},
|
||||
{applications,
|
||||
[kernel,
|
||||
[
|
||||
emqtt,
|
||||
kernel,
|
||||
stdlib
|
||||
]},
|
||||
{env,[]},
|
||||
|
||||
63
apps/endpoint/src/endpoint.erl
Normal file
63
apps/endpoint/src/endpoint.erl
Normal file
@ -0,0 +1,63 @@
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint).
|
||||
-include("endpoint.hrl").
|
||||
|
||||
%% API
|
||||
-export([start_link/1]).
|
||||
-export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}.
|
||||
start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) ->
|
||||
Name = get_name(Id),
|
||||
endpoint_http:start_link(Name, Endpoint);
|
||||
start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) ->
|
||||
Name = get_name(Id),
|
||||
endpoint_mqtt:start_link(Name, Endpoint);
|
||||
start_link(Endpoint = #endpoint{id = Id, config = #mysql_endpoint{}}) ->
|
||||
Name = get_name(Id),
|
||||
endpoint_mysql:start_link(Name, Endpoint).
|
||||
|
||||
-spec get_name(Id :: integer()) -> atom().
|
||||
get_name(Id) when is_integer(Id) ->
|
||||
list_to_atom("endpoint:" ++ integer_to_list(Id)).
|
||||
|
||||
-spec get_pid(Id :: integer()) -> undefined | pid().
|
||||
get_pid(Id) when is_integer(Id) ->
|
||||
whereis(get_name(Id)).
|
||||
|
||||
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
forward(undefined, _, _, _) ->
|
||||
ok;
|
||||
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
gen_server:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
|
||||
|
||||
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
|
||||
gen_statem:cast(Pid, {reload, NEndpoint}).
|
||||
|
||||
-spec clean_up(Pid :: pid()) -> ok.
|
||||
clean_up(Pid) when is_pid(Pid) ->
|
||||
gen_server:call(Pid, clean_up, 5000).
|
||||
|
||||
-spec config_equals(any(), any()) -> boolean().
|
||||
config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) ->
|
||||
true;
|
||||
config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic},
|
||||
#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
|
||||
true;
|
||||
config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos},
|
||||
#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
|
||||
true;
|
||||
config_equals(_, _) ->
|
||||
false.
|
||||
136
apps/endpoint/src/endpoint_buffer.erl
Normal file
136
apps/endpoint/src/endpoint_buffer.erl
Normal file
@ -0,0 +1,136 @@
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint_buffer).
|
||||
|
||||
-include("endpoint.hrl").
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
|
||||
-export([new/2, append/2, trigger_next/1, trigger_n/1, cleanup/1, ack/2, stat/1]).
|
||||
-export_type([buffer/0]).
|
||||
|
||||
-record(buffer, {
|
||||
endpoint :: #endpoint{},
|
||||
next_id = 1 :: integer(),
|
||||
%% 当前数据所在的游标
|
||||
cursor = 0 :: integer(),
|
||||
%% ets存储的引用
|
||||
tid :: ets:tid(),
|
||||
%% 定时器
|
||||
timer_pid :: pid(),
|
||||
%% 窗口大小,允许最大的未确认消息数
|
||||
window_size = 10,
|
||||
%% 未确认的消息数
|
||||
flight_num = 0,
|
||||
%% 记录成功处理的消息数
|
||||
acc_num = 0
|
||||
}).
|
||||
|
||||
-record(north_data, {
|
||||
id :: integer(),
|
||||
location_code,
|
||||
fields,
|
||||
timestamp :: integer()
|
||||
}).
|
||||
|
||||
-type buffer() :: #buffer{}.
|
||||
|
||||
-spec new(Endpoint :: #endpoint{}, WindowSize :: integer()) -> Buffer :: #buffer{}.
|
||||
new(Endpoint = #endpoint{id = Id}, WindowSize) when is_integer(WindowSize), WindowSize > 0 ->
|
||||
%% 初始化存储
|
||||
EtsName = list_to_atom("endpoint_buffer_ets:" ++ integer_to_list(Id)),
|
||||
Tid = ets:new(EtsName, [ordered_set, private]),
|
||||
%% 定义重发器
|
||||
{ok, TimerPid} = endpoint_timer:start_link(?RETRY_INTERVAL),
|
||||
|
||||
#buffer{cursor = 0, tid = Tid, timer_pid = TimerPid, endpoint = Endpoint, window_size = WindowSize}.
|
||||
|
||||
-spec append(tuple(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||
append({LocationCode, Fields, Timestamp}, Buffer = #buffer{tid = Tid, next_id = NextId, window_size = WindowSize, flight_num = FlightNum}) ->
|
||||
NorthData = #north_data{id = NextId, location_code = LocationCode, fields = Fields, timestamp = Timestamp},
|
||||
true = ets:insert(Tid, NorthData),
|
||||
NBuffer = Buffer#buffer{next_id = NextId + 1},
|
||||
case FlightNum < WindowSize of
|
||||
true ->
|
||||
trigger_next(NBuffer);
|
||||
false ->
|
||||
NBuffer
|
||||
end.
|
||||
|
||||
-spec trigger_n(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||
trigger_n(Buffer = #buffer{window_size = WindowSize}) ->
|
||||
%% 最多允许window_size
|
||||
lists:foldl(fun(_, Buffer0) -> trigger_next(Buffer0) end, Buffer, lists:seq(1, WindowSize)).
|
||||
|
||||
%% 触发读取下一条数据
|
||||
-spec trigger_next(Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||
trigger_next(Buffer = #buffer{tid = Tid, cursor = Cursor, timer_pid = TimerPid, flight_num = FlightNum, endpoint = #endpoint{title = Title, mapper_fun = MapperFun}}) ->
|
||||
case ets:next(Tid, Cursor) of
|
||||
'$end_of_table' ->
|
||||
Buffer;
|
||||
NKey ->
|
||||
[NorthData = #north_data{id = Id, location_code = LocationCode}] = ets:lookup(Tid, NKey),
|
||||
%lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
|
||||
%% 重发机制, 在发送的过程中mapper可能会改变
|
||||
case safe_invoke_mapper(MapperFun, NorthData) of
|
||||
{ok, Body} ->
|
||||
ReceiverPid = self(),
|
||||
ReceiverPid ! {next_data, Id, LocationCode, Body},
|
||||
endpoint_timer:task(TimerPid, Id, fun() -> ReceiverPid ! {next_data, Id, LocationCode, Body} end),
|
||||
|
||||
Buffer#buffer{flight_num = FlightNum + 1};
|
||||
{error, Error} ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Title, Error]),
|
||||
ets:delete(Tid, Id),
|
||||
Buffer;
|
||||
ignore ->
|
||||
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Title]),
|
||||
ets:delete(Tid, Id),
|
||||
Buffer
|
||||
end
|
||||
end.
|
||||
|
||||
-spec ack(Id :: integer(), Buffer :: #buffer{}) -> NBuffer :: #buffer{}.
|
||||
ack(Id, Buffer = #buffer{tid = Tid, timer_pid = TimerPid, acc_num = AccNum, flight_num = FlightNum}) when is_integer(Id) ->
|
||||
true = ets:delete(Tid, Id),
|
||||
endpoint_timer:ack(TimerPid, Id),
|
||||
trigger_next(Buffer#buffer{acc_num = AccNum + 1, flight_num = FlightNum - 1}).
|
||||
|
||||
%% 获取当前统计信息
|
||||
-spec stat(Buffer :: #buffer{}) -> map().
|
||||
stat(#buffer{acc_num = AccNum, tid = Tid}) ->
|
||||
#{
|
||||
<<"acc_num">> => AccNum,
|
||||
<<"queue_num">> => ets:info(Tid, size)
|
||||
}.
|
||||
|
||||
-spec cleanup(Buffer :: #buffer{}) -> ok.
|
||||
cleanup(#buffer{timer_pid = TimerPid}) ->
|
||||
endpoint_timer:cleanup(TimerPid),
|
||||
ok.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) ->
|
||||
{ok, Body :: any()} | ignore | {error, Reason :: any()}.
|
||||
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
|
||||
try
|
||||
if
|
||||
is_function(MapperFun, 2) ->
|
||||
MapperFun(LocationCode, Fields);
|
||||
is_function(MapperFun, 3) ->
|
||||
MapperFun(LocationCode, Fields, Timestamp)
|
||||
end
|
||||
catch _:Error ->
|
||||
{error, Error}
|
||||
end.
|
||||
127
apps/endpoint/src/endpoint_http.erl
Normal file
127
apps/endpoint/src/endpoint_http.erl
Normal file
@ -0,0 +1,127 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 07. 5月 2024 11:17
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint_http).
|
||||
-author("anlicheng").
|
||||
-include("endpoint.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {
|
||||
endpoint :: #endpoint{},
|
||||
buffer :: endpoint_buffer:buffer()
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(Name :: atom(), Endpoint :: #endpoint{}) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(Name, Endpoint = #endpoint{config = #http_endpoint{}}) when is_atom(Name) ->
|
||||
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([Endpoint]) ->
|
||||
Buffer = endpoint_buffer:new(Endpoint, 10),
|
||||
{ok, #state{endpoint = Endpoint, buffer = Buffer}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
|
||||
Stat = endpoint_buffer:stat(Buffer),
|
||||
{reply, {ok, Stat}, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) ->
|
||||
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
|
||||
{noreply, State#state{buffer = NBuffer}};
|
||||
|
||||
handle_cast(cleanup, State = #state{buffer = Buffer}) ->
|
||||
endpoint_buffer:cleanup(Buffer),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({next_data, Id, _LocationCode, Body}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>}
|
||||
],
|
||||
case hackney:request(post, Url, Headers, Body) of
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]),
|
||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||
|
||||
{noreply, State#state{buffer = NBuffer}};
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
|
||||
{noreply, State};
|
||||
{error, Reason} ->
|
||||
lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]),
|
||||
{noreply, State}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
194
apps/endpoint/src/endpoint_mqtt.erl
Normal file
194
apps/endpoint/src/endpoint_mqtt.erl
Normal file
@ -0,0 +1,194 @@
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint_mqtt).
|
||||
|
||||
-include("endpoint.hrl").
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
|
||||
-define(DISCONNECTED, disconnected).
|
||||
-define(CONNECTED, connected).
|
||||
|
||||
-record(state, {
|
||||
endpoint :: #endpoint{},
|
||||
buffer :: endpoint_buffer:buffer(),
|
||||
conn_pid :: undefined | pid(),
|
||||
%% 待确认的数据, #{PacketId :: integer() => Id :: integer()}
|
||||
inflight = #{},
|
||||
|
||||
status = disconnected
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
|
||||
gen_server:start_link({local, Name}, ?MODULE, [Endpoint], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([Endpoint]) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||
erlang:start_timer(0, self(), create_postman),
|
||||
%% 初始化存储
|
||||
Buffer = endpoint_buffer:new(Endpoint, 10),
|
||||
|
||||
{ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
|
||||
Stat = endpoint_buffer:stat(Buffer),
|
||||
{reply, {ok, Stat}, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) ->
|
||||
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
|
||||
{noreply, State#state{buffer = NBuffer}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED,
|
||||
endpoint = #endpoint{title = Title, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, client_id = ClientId}}}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
|
||||
Opts = [
|
||||
{owner, self()},
|
||||
{clientid, ClientId},
|
||||
{host, binary_to_list(Host)},
|
||||
{port, Port},
|
||||
{tcp_opts, []},
|
||||
{username, binary_to_list(Username)},
|
||||
{password, binary_to_list(Password)},
|
||||
{keepalive, 86400},
|
||||
{auto_ack, true},
|
||||
{connect_timeout, 5000},
|
||||
{proto_ver, v5},
|
||||
{retry_interval, 5000}
|
||||
],
|
||||
|
||||
{ok, ConnPid} = emqtt:start_link(Opts),
|
||||
lager:debug("[mqtt_postman] start connect, options: ~p", [Opts]),
|
||||
case emqtt:connect(ConnPid, 5000) of
|
||||
{ok, _} ->
|
||||
lager:debug("[mqtt_postman] connect success, pid: ~p", [ConnPid]),
|
||||
NBuffer = endpoint_buffer:trigger_n(Buffer),
|
||||
{noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}};
|
||||
{error, Reason} ->
|
||||
lager:warning("[mqtt_postman] connect get error: ~p", [Reason]),
|
||||
erlang:start_timer(5000, self(), create_postman),
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
%% 离线时,忽略数据发送逻辑
|
||||
handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) ->
|
||||
{keep_state, State};
|
||||
%% 发送数据到mqtt服务器
|
||||
handle_info({next_data, Id, LocationCode, Message}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
|
||||
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
|
||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
|
||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||
ok ->
|
||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||
{noreply, State#state{buffer = NBuffer}};
|
||||
{ok, PacketId} ->
|
||||
{noreply, State#state{inflight = maps:put(PacketId, Id, InFlight)}};
|
||||
{error, Reason} ->
|
||||
lager:warning("[mqtt_postman] send message to topic: ~p, get error: ~p", [Topic, Reason]),
|
||||
{stop, Reason, State}
|
||||
end;
|
||||
|
||||
handle_info({disconnected, ReasonCode, Properties}, State = #state{status = ?CONNECTED}) ->
|
||||
lager:debug("[mqtt_postman] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
|
||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
||||
|
||||
handle_info({publish, Message = #{packet_id := _PacketId, payload := Payload}}, State = #state{status = ?CONNECTED}) ->
|
||||
lager:debug("[mqtt_postman] Recv a publish packet: ~p, payload: ~p", [Message, Payload]),
|
||||
{noreply, State};
|
||||
|
||||
%% 收到确认的消息
|
||||
handle_info({puback, #{packet_id := PacketId}}, State = #state{status = ?CONNECTED, inflight = Inflight, buffer = Buffer}) ->
|
||||
case maps:take(PacketId, Inflight) of
|
||||
{Id, RestInflight} ->
|
||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||
{noreply, State#state{buffer = NBuffer, inflight = RestInflight}};
|
||||
error ->
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
%% postman进程挂掉时,重新建立新的
|
||||
handle_info({'EXIT', ConnPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, conn_pid = ConnPid}) ->
|
||||
lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
|
||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||
{noreply, State#state{conn_pid = undefined, status = ?DISCONNECTED}};
|
||||
|
||||
handle_info(Info, State = #state{status = Status}) ->
|
||||
lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
|
||||
endpoint_buffer:cleanup(Buffer),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
186
apps/endpoint/src/endpoint_mysql.erl
Normal file
186
apps/endpoint/src/endpoint_mysql.erl
Normal file
@ -0,0 +1,186 @@
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 06. 7月 2023 12:02
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint_mysql).
|
||||
|
||||
-include("endpoint.hrl").
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
%% 消息重发间隔
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
|
||||
-define(DISCONNECTED, disconnected).
|
||||
-define(CONNECTED, connected).
|
||||
|
||||
-record(state, {
|
||||
endpoint :: #endpoint{},
|
||||
buffer :: endpoint_buffer:buffer(),
|
||||
pool_pid :: undefined | pid(),
|
||||
|
||||
status = ?DISCONNECTED
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link(Name, Endpoint = #endpoint{}) when is_atom(Name) ->
|
||||
gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([Endpoint]) ->
|
||||
erlang:process_flag(trap_exit, true),
|
||||
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||
erlang:start_timer(0, self(), create_postman),
|
||||
%% 初始化存储
|
||||
Buffer = endpoint_buffer:new(Endpoint, 10),
|
||||
|
||||
{ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
|
||||
Stat = endpoint_buffer:stat(Buffer),
|
||||
{reply, {ok, Stat}, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = Buffer}) ->
|
||||
NBuffer = endpoint_buffer:append({LocationCode, Fields, Timestamp}, Buffer),
|
||||
{noreply, State#state{buffer = NBuffer}};
|
||||
|
||||
handle_cast(cleanup, State = #state{buffer = Buffer}) ->
|
||||
endpoint_buffer:cleanup(Buffer),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED, buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]),
|
||||
WorkerArgs = [
|
||||
{host, binary_to_list(Host)},
|
||||
{port, Port},
|
||||
{user, binary_to_list(Username)},
|
||||
{password, binary_to_list(Password)},
|
||||
{keep_alive, true},
|
||||
{database, binary_to_list(Database)},
|
||||
{queries, [<<"set names utf8">>]}
|
||||
],
|
||||
|
||||
%% 启动工作的线程池
|
||||
case poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, mysql}], WorkerArgs) of
|
||||
{ok, PoolPid} ->
|
||||
NBuffer = endpoint_buffer:trigger_n(Buffer),
|
||||
{noreply, State#state{pool_pid = PoolPid, buffer = NBuffer, status = ?CONNECTED}};
|
||||
ignore ->
|
||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||
{noreply, State};
|
||||
{error, Reason} ->
|
||||
lager:warning("[mqtt_postman] start connect pool, get error: ~p", [Reason]),
|
||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
%% 离线时,忽略数据发送逻辑
|
||||
handle_info({next_data, _Id, _LocationCode, _Message}, State = #state{status = ?DISCONNECTED}) ->
|
||||
{noreply, State};
|
||||
%% 发送数据到mqtt服务器
|
||||
handle_info({next_data, Id, _LocationCode, Fields}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer,
|
||||
endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table}}}) ->
|
||||
|
||||
{ok, InsertSql, Values} = insert_sql(Table, Fields),
|
||||
case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of
|
||||
ok ->
|
||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||
{noreply, State#state{buffer = NBuffer}};
|
||||
Error ->
|
||||
lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]),
|
||||
{noreply, State}
|
||||
end;
|
||||
|
||||
%% postman进程挂掉时,重新建立新的
|
||||
handle_info({'EXIT', PoolPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, pool_pid = PoolPid}) ->
|
||||
lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]),
|
||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
|
||||
{noreply, disconnected, State#state{pool_pid = undefined, status = ?DISCONNECTED}};
|
||||
|
||||
handle_info(Info, State = #state{status = Status}) ->
|
||||
lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]),
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) ->
|
||||
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]),
|
||||
endpoint_buffer:cleanup(Buffer),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
|
||||
-spec insert_sql(Table :: binary(), Fields :: list()) -> {ok, Sql :: binary(), Values :: list()}.
|
||||
insert_sql(Table, Fields) when is_binary(Table), is_list(Fields) ->
|
||||
{Keys, Values} = kvs(Fields),
|
||||
|
||||
FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)),
|
||||
Placeholders = lists:duplicate(length(Keys), <<"?">>),
|
||||
ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)),
|
||||
|
||||
{ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values}.
|
||||
|
||||
-spec kvs(Fields :: list()) -> {Keys :: list(), Values :: list()}.
|
||||
kvs(Fields) when is_list(Fields) ->
|
||||
{Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields),
|
||||
{lists:reverse(Keys0), lists:reverse(Values0)}.
|
||||
@ -6,8 +6,10 @@
|
||||
-module(endpoint_sup).
|
||||
|
||||
-behaviour(supervisor).
|
||||
-include("endpoint.hrl").
|
||||
|
||||
-export([start_link/0]).
|
||||
-export([ensured_endpoint_started/1, delete_endpoint/1]).
|
||||
|
||||
-export([init/1]).
|
||||
|
||||
@ -26,10 +28,33 @@ start_link() ->
|
||||
%% type => worker(), % optional
|
||||
%% modules => modules()} % optional
|
||||
init([]) ->
|
||||
SupFlags = #{strategy => one_for_all,
|
||||
intensity => 0,
|
||||
period => 1},
|
||||
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
|
||||
ChildSpecs = [],
|
||||
{ok, {SupFlags, ChildSpecs}}.
|
||||
|
||||
%% internal functions
|
||||
|
||||
-spec ensured_endpoint_started(Endpoint :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
|
||||
ensured_endpoint_started(Endpoint = #endpoint{}) ->
|
||||
case supervisor:start_child(?MODULE, child_spec(Endpoint)) of
|
||||
{ok, Pid} when is_pid(Pid) ->
|
||||
{ok, Pid};
|
||||
{error, {'already_started', Pid}} when is_pid(Pid) ->
|
||||
{ok, Pid};
|
||||
{error, Error} ->
|
||||
{error, Error}
|
||||
end.
|
||||
|
||||
delete_endpoint(Id) when is_integer(Id) ->
|
||||
Name = endpoint:get_name(Id),
|
||||
supervisor:terminate_child(?MODULE, Name),
|
||||
supervisor:delete_child(?MODULE, Name).
|
||||
|
||||
child_spec(Endpoint = #endpoint{id = Id}) ->
|
||||
Name = endpoint:get_name(Id),
|
||||
#{id => Name,
|
||||
start => {endpoint, start_link, [Name, Endpoint]},
|
||||
restart => permanent,
|
||||
shutdown => 2000,
|
||||
type => worker,
|
||||
modules => ['endpoint']}.
|
||||
128
apps/endpoint/src/endpoint_timer.erl
Normal file
128
apps/endpoint/src/endpoint_timer.erl
Normal file
@ -0,0 +1,128 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2024, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 07. 5月 2024 10:30
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint_timer).
|
||||
-author("anlicheng").
|
||||
-include("endpoint.hrl").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_link/1]).
|
||||
-export([task/3, ack/2, cleanup/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-record(state, {
|
||||
retry_interval = 0,
|
||||
%% 定时器
|
||||
timer_map = #{}
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
task(Pid, Id, Task) when is_pid(Pid), is_integer(Id), is_function(Task, 0) ->
|
||||
gen_server:cast(Pid, {task, Id, Task}).
|
||||
|
||||
ack(Pid, Id) when is_pid(Pid), is_integer(Id) ->
|
||||
gen_server:cast(Pid, {ack, Id}).
|
||||
|
||||
cleanup(Pid) when is_pid(Pid) ->
|
||||
gen_server:cast(Pid, cleanup).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link(RetryInterval :: integer()) ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link(RetryInterval) when is_integer(RetryInterval) ->
|
||||
gen_server:start_link(?MODULE, [RetryInterval], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([RetryInterval]) ->
|
||||
{ok, #state{retry_interval = RetryInterval}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({task, Id, Task}, State = #state{retry_interval = RetryInterval, timer_map = TimerMap}) ->
|
||||
TimerRef = erlang:start_timer(RetryInterval, self(), {repost_ticker, {Id, Task}}),
|
||||
{noreply, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}};
|
||||
|
||||
%% 取消
|
||||
handle_cast({ack, Id}, State = #state{timer_map = TimerMap}) ->
|
||||
case maps:take(Id, TimerMap) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{TimerRef, NTimerMap} ->
|
||||
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
|
||||
{noreply, State#state{timer_map = NTimerMap}}
|
||||
end;
|
||||
|
||||
handle_cast(cleanup, State = #state{timer_map = TimerMap}) ->
|
||||
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
|
||||
{noreply, State#state{timer_map = #{}}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, {repost_ticker, {Id, Task}}}, State = #state{retry_interval = RetryInterval, timer_map = TimerMap}) ->
|
||||
Task(),
|
||||
TimerRef = erlang:start_timer(RetryInterval, self(), {repost_ticker, {Id, Task}}),
|
||||
|
||||
{noreply, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
Loading…
x
Reference in New Issue
Block a user