support mysql endpoint

This commit is contained in:
anlicheng 2023-08-07 17:22:31 +08:00
parent d62eb192e2
commit 049623d2a3
26 changed files with 529 additions and 311 deletions

View File

@ -68,6 +68,16 @@
topic = <<>> :: binary()
}).
-record(mysql_endpoint, {
host = <<>> :: binary(),
port = 0 :: integer(),
username = <<>> :: binary(),
password = <<>> :: binary(),
database = <<>> :: binary(),
table_name = <<>> :: binary(),
pool_size = 10 :: integer()
}).
%%
-record(endpoint, {
%%
@ -77,10 +87,12 @@
%% ,
matcher = <<>> :: binary(),
mapper = <<>> :: binary(),
%% function
mapper_fun = fun(_, Data) -> Data end :: fun((binary(), any()) -> any()),
%%
%% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}])
%% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer())
mapper_fun = fun(_, Fields) -> Fields end :: fun(),
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{},
config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{},
%%
updated_at = 0 :: integer(),
%%
@ -97,5 +109,6 @@
-record(north_data, {
id = 0 :: integer(),
location_code :: binary(),
body :: binary()
%% endpoint list: [{K, V}, {K1, V1}]
body :: binary() | list()
}).

View File

@ -14,4 +14,4 @@
-export([get_device_by_uuid/1]).
get_device_by_uuid(UUID) when is_binary(UUID) ->
mysql_client:get_row(<<"SELECT * FROM device WHERE uuid = ? LIMIT 1">>, [UUID]).
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE uuid = ? LIMIT 1">>, [UUID]).

View File

@ -16,7 +16,7 @@
-spec get_all_hosts() -> UUIDList :: [binary()].
get_all_hosts() ->
case mysql_client:get_all(<<"SELECT uuid FROM host where uuid != ''">>) of
case mysql_pool:get_all(mysql_iot, <<"SELECT uuid FROM host where uuid != ''">>) of
{ok, Hosts} ->
lists:map(fun(#{<<"uuid">> := UUID}) -> UUID end, Hosts);
{error, _Reason} ->
@ -24,22 +24,22 @@ get_all_hosts() ->
end.
get_host(HostId) when is_integer(HostId) ->
mysql_client:get_row(<<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]).
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]).
get_host_by_uuid(UUID) when is_binary(UUID) ->
mysql_client:get_row(<<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]).
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]).
create_host(UUID) when is_binary(UUID) ->
mysql_client:insert(<<"host">>, #{<<"UUID">> => UUID, <<"status">> => ?HOST_STATUS_INACTIVE}, true).
mysql_pool:insert(mysql_iot, <<"host">>, #{<<"UUID">> => UUID, <<"status">> => ?HOST_STATUS_INACTIVE}, true).
%%
-spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
change_status(UUID, Status) when is_binary(UUID), is_integer(Status) ->
mysql_client:update_by(<<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]).
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]).
%%
is_authorized(HostId) when is_integer(HostId) ->
case mysql_client:get_row(<<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]) of
case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]) of
{ok, #{<<"status">> := Status}} when Status =/= ?HOST_STATUS_INACTIVE ->
true;
_ ->

View File

@ -14,4 +14,4 @@
-export([insert/1]).
insert(Fields) when is_map(Fields) ->
mysql_client:insert(<<"micro_inform_log">>, Fields, true).
mysql_pool:insert(mysql_iot, <<"micro_inform_log">>, Fields, true).

View File

@ -16,4 +16,4 @@
%%
-spec change_status(HostId :: integer(), SceneId :: integer(), MircoId :: integer(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
change_status(HostId, SceneId, MircoId, Status) when is_integer(HostId), is_integer(SceneId), is_integer(MircoId), is_integer(Status) ->
mysql_client:update_by(<<"UPDATE micro_set SET status = ? WHERE host_id = ? AND scene_id = ? AND micro_id = ? LIMIT 1">>, [Status, HostId, SceneId, MircoId]).
mysql_pool:update_by(mysql_iot, <<"UPDATE micro_set SET status = ? WHERE host_id = ? AND scene_id = ? AND micro_id = ? LIMIT 1">>, [Status, HostId, SceneId, MircoId]).

View File

@ -14,4 +14,4 @@
-export([insert/1]).
insert(Fields) when is_map(Fields) ->
mysql_client:insert(<<"scene_feedback">>, Fields, true).
mysql_pool:insert(mysql_iot, <<"scene_feedback">>, Fields, true).

View File

@ -14,4 +14,4 @@
-export([insert/1]).
insert(Fields) when is_map(Fields) ->
mysql_client:insert(<<"scene_feedback_step">>, Fields, true).
mysql_pool:insert(mysql_iot, <<"scene_feedback_step">>, Fields, true).

View File

@ -16,4 +16,4 @@
%%
-spec change_status(TaskId :: integer(), Status :: integer()) -> {ok, AffectedRow :: integer()} | {error, Reason :: any()}.
change_status(TaskId, Status) when is_integer(TaskId), is_integer(Status) ->
mysql_client:update_by(<<"UPDATE task_logs SET status = ? WHERE id = ? LIMIT 1">>, [Status, TaskId]).
mysql_pool:update_by(mysql_iot, <<"UPDATE task_logs SET status = ? WHERE id = ? LIMIT 1">>, [Status, TaskId]).

View File

@ -33,7 +33,8 @@ handle_request("GET", "/endpoint/all", GetParams, _) ->
%%
handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) ->
case lists:all(fun(Key) -> maps:is_key(Key, Params) end, [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>]) of
RequiredFields = [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>],
case lists:all(fun(Key) -> maps:is_key(Key, Params) end, RequiredFields) of
true ->
ok;
false ->
@ -44,13 +45,8 @@ handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) ->
undefined ->
Endpoint0 = make_endpoint(maps:to_list(Params), #endpoint{}),
Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()},
case mnesia_endpoint:insert(Endpoint) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[endpoint_handler] create router, get error is: ~p", [Reason]),
{ok, 200, iot_util:json_error(404, <<"error">>)}
end;
ok = mnesia_endpoint:insert(Endpoint),
{ok, 200, iot_util:json_data(<<"success">>)};
{ok, _} ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}
end;
@ -65,14 +61,9 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh
Params1 = maps:remove(<<"name">>, Params),
NEndpoint = make_endpoint(maps:to_list(Params1), Endpoint),
NEndpoint1 = NEndpoint#endpoint{updated_at = iot_util:timestamp_of_seconds()},
ok = mnesia_endpoint:insert(NEndpoint1),
case mnesia_endpoint:insert(NEndpoint1) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[endpoint_handler] update endpoint, get error is: ~p", [Reason]),
{ok, 200, iot_util:json_error(404, <<"error">>)}
end
{ok, 200, iot_util:json_data(<<"success">>)}
end;
%%
@ -158,6 +149,16 @@ make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<
andalso is_binary(Topic) andalso Topic /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}});
%% mysql的支持
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"databse">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint)
when is_binary(Username) andalso Username /= <<>>
andalso is_binary(Password) andalso Password /= <<>>
andalso is_binary(Host) andalso Host /= <<>>
andalso is_integer(Port) andalso Port > 0
andalso is_binary(Database) andalso Database /= <<>>
andalso is_binary(TableName) andalso TableName /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}});
make_endpoint([{<<"config">>, Config} | _], _) ->
lager:warning("[endpoint_handler] unsupport config: ~p", [Config]),
throw(<<"invalid config">>);

View File

@ -13,8 +13,6 @@
-export([init/2]).
init(Req0, Opts = [Mod|_]) ->
StartTs = erlang:monotonic_time(),
Method = binary_to_list(cowboy_req:method(Req0)),
Path = binary_to_list(cowboy_req:path(Req0)),
GetParams0 = cowboy_req:parse_qs(Req0),
@ -23,7 +21,7 @@ init(Req0, Opts = [Mod|_]) ->
try Mod:handle_request(Method, Path, GetParams, PostParams) of
{ok, StatusCode, Resp} ->
lager:debug("[http_protocol] get a request, path: ~p, get params: ~p, post params: ~p, response: ~ts",
lager:debug("[http_protocol] request path: ~p, get_params: ~p, post_params: ~p, response: ~ts",
[Path, GetParams, PostParams, Resp]),
AcceptEncoding = cowboy_req:header(<<"accept-encoding">>, Req1, <<>>),
Req2 = case iolist_size(Resp) >= 1024 andalso supported_gzip(AcceptEncoding) of
@ -38,9 +36,6 @@ init(Req0, Opts = [Mod|_]) ->
<<"Content-Type">> => <<"application/json;charset=utf-8">>
}, Resp, Req1)
end,
EndTs = erlang:monotonic_time(),
CostTs = erlang:convert_time_unit(EndTs - StartTs, native, microsecond),
lager:debug("cost_ts is: ~p", [CostTs]),
{ok, Req2, Opts}
catch
throw:Error ->

View File

@ -14,7 +14,7 @@
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, forward/3, get_stat/1, reload/2, clean_up/1]).
-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
@ -53,11 +53,11 @@ get_name(EndpointName) when is_binary(EndpointName) ->
get_pid(Name) when is_binary(Name) ->
whereis(get_name(Name)).
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list()) -> no_return().
forward(undefined, _, _) ->
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(undefined, _, _, _) ->
ok;
forward(Pid, LocationCode, Fields) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields) ->
gen_statem:cast(Pid, {forward, LocationCode, Fields}).
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}).
@ -70,6 +70,10 @@ get_stat(Pid) when is_pid(Pid) ->
clean_up(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, clean_up, 5000).
-spec get_mapper_fun(Pid :: pid()) -> fun().
get_mapper_fun(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_mapper_fun).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
@ -136,9 +140,14 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat
{next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}}
end;
handle_event(cast, {forward, LocationCode, Data}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
try
Body = MapperFun(LocationCode, Data),
Body = if
is_function(MapperFun, 2) ->
MapperFun(LocationCode, Fields);
is_function(MapperFun, 3) ->
MapperFun(LocationCode, Fields, Timestamp)
end,
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}),
%%
Actions = case StateName =:= connected andalso FlightNum < WindowSize of
@ -162,7 +171,7 @@ handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cur
{ok, NCursor, NorthData = #north_data{id = Id}} ->
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
PostmanPid ! {post, NorthData},
PostmanPid ! {post, self(), NorthData},
%%
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
@ -188,7 +197,7 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp
%%
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap}) ->
lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]),
PostmanPid ! {post, NorthData},
PostmanPid ! {post, self(), NorthData},
%% 5
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
@ -217,6 +226,9 @@ handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) ->
mnesia:delete_table(TabName),
{keep_state, State, [{reply, From, ok}]};
handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) ->
{keep_state, State, [{reply, From, F}]};
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) ->
Stat = #{
@ -270,11 +282,11 @@ remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) ->
end.
%% http和https协议的支持
create_postman(#endpoint{name = Name, config = #http_endpoint{url = Url, pool_size = PoolSize}}) ->
PoolName = binary_to_atom(<<"http_pool:", Name/binary>>),
http_postman:start_link(self(), Url, PoolName, PoolSize);
create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) ->
WorkerArgs = [{url, Url}],
broker_postman:start_link(self(), http_postman, WorkerArgs, PoolSize);
%% mqtt协议的支持
%% mqtt协议的支持,
create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) ->
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>,
@ -293,5 +305,21 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port
],
mqtt_postman:start_link(self(), Opts, Topic, Qos);
%% mysql协议的支持
create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) ->
WorkerArgs = [
{mysql_opts, [
{host, binary_to_list(Host)},
{port, Port},
{user, binary_to_list(Username)},
{password, binary_to_list(Password)},
{keep_alive, true},
{database, binary_to_list(Database)},
{queries, [<<"set names utf8">>]}
]},
{table, TableName}
],
broker_postman:start_link(self(), mysql_postman, WorkerArgs, PoolSize);
create_postman(#endpoint{}) ->
throw(<<"not supported">>).

View File

@ -253,7 +253,7 @@ handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID,
{error, Reason} ->
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]);
{ok, LocationCode} ->
iot_router:route(LocationCode, FieldsList)
iot_router:route(LocationCode, FieldsList, Timestamp)
end,
%% influxdb

View File

@ -11,23 +11,23 @@
-include("iot.hrl").
%% API
-export([route/2]).
-export([route/3]).
-spec route(LocationCode :: binary(), Fields :: list()) -> ok.
route(LocationCode, Fields) when is_binary(LocationCode), is_list(Fields) ->
-spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok.
route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) ->
Endpoints = iot_endpoint_monitor:get_all_endpoints(),
router0(Endpoints, LocationCode, Fields).
router0([], _, _) ->
router0(Endpoints, LocationCode, Fields, Timestamp).
router0([], _, _, _) ->
ok;
router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields) ->
router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields, Timestamp) ->
{ok, MP} = re:compile(Regexp),
case re:run(LocationCode, MP, [{capture, all, list}]) of
nomatch ->
router0(Endpoints, LocationCode, Fields);
router0(Endpoints, LocationCode, Fields, Timestamp);
{match, _} ->
lager:debug("[iot_router] match endpoint: ~p", [Name]),
Pid = iot_endpoint:get_pid(Name),
iot_endpoint:forward(Pid, LocationCode, Fields),
iot_endpoint:forward(Pid, LocationCode, Fields, Timestamp),
%% Endpoint
router0(Endpoints, LocationCode, Fields)
router0(Endpoints, LocationCode, Fields, Timestamp)
end.

View File

@ -27,7 +27,7 @@ start_link() ->
%% modules => modules()} % optional
init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
ChildSpecs = [
Specs = [
#{
id => 'iot_endpoint_monitor',
start => {'iot_endpoint_monitor', start_link, []},
@ -55,7 +55,8 @@ init([]) ->
modules => ['iot_host_sup']
}
],
{ok, {SupFlags, pools() ++ ChildSpecs}}.
{ok, {SupFlags, pools() ++ Specs}}.
%% internal functions

View File

@ -109,7 +109,7 @@ parse_mapper(Mapper) when is_list(Mapper) ->
{ok, Tokens, _} = erl_scan:string(Mapper),
{ok, ExprList} = erl_parse:parse_exprs(Tokens),
{value, F, _} = erl_eval:exprs(ExprList, []),
case is_function(F, 2) of
case is_function(F, 2) orelse is_function(F, 3) of
true ->
{ok, F};
false ->

View File

@ -88,4 +88,6 @@ config_map(#ws_endpoint{url = Url}) ->
config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
#{<<"protocol">> => <<"kafka">>, <<"args">> => #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}};
config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
#{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}.
#{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}};
config_map(#mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}) ->
#{<<"protocol">> => <<"mysql">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"database">> => Database, <<"table_name">> => TableName}}.

View File

@ -13,9 +13,9 @@
%% API
-export([rsa_encode/1]).
-export([insert_services/1]).
-export([insert_endpoints/0, forward/0]).
-export([insert_endpoints/0, test_http/0, test_mysql/0, test_mqtt/0]).
forward() ->
test_http() ->
Name = <<"zhongguodianli">>,
Pid = iot_endpoint:get_pid(Name),
@ -28,19 +28,27 @@ forward() ->
<<"unit">> => <<"cm">>
}
],
iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields)
iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields, iot_util:timestamp_of_seconds())
end, lists:seq(1, 10000)).
insert_endpoints() ->
Mapper0 = "fun(LocationCode, Fields) ->
Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields),
Fields2 = maps:from_list(Fields1),
Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]),
iolist_to_binary(Bin)
end.",
test_mysql() ->
lists:foreach(fun(_) ->
iot_router:route(<<"mysql123">>, [
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
#{<<"key">> => <<"age">>, <<"value">> => 30},
#{<<"key">> => <<"flow">>, <<"value">> => 30}
], iot_util:timestamp_of_seconds())
end, lists:seq(1, 10000)).
Mapper = list_to_binary(Mapper0),
{ok, F} = iot_util:parse_mapper(Mapper),
test_mqtt() ->
iot_router:route(<<"test123">>, [
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
#{<<"key">> => <<"age">>, <<"value">> => 30},
#{<<"key">> => <<"flow">>, <<"value">> => 30}
], iot_util:timestamp_of_seconds()).
insert_endpoints() ->
{ok, Mapper, F} = simple_mapper(),
mnesia_endpoint:insert(#endpoint{
name = <<"zhongguodianli">>,
title = <<"中国电力"/utf8>>,
@ -68,11 +76,54 @@ insert_endpoints() ->
created_at = iot_util:timestamp_of_seconds()
}),
{ok, MysqlMapper, MysqlF} = mysql_mapper(),
mnesia_endpoint:insert(#endpoint{
name = <<"mysql">>,
title = <<"测试mysql"/utf8>>,
matcher = <<"mysql*">>,
mapper = MysqlMapper,
mapper_fun = MysqlF,
config = #mysql_endpoint{
host = <<"localhost">>,
port = 3306,
username = <<"php_an">>,
password = <<"123456">>,
database = <<"iot">>,
table_name = <<"north_data">>
},
created_at = iot_util:timestamp_of_seconds()
}),
{Mapper, F}.
simple_mapper() ->
Mapper0 = "fun(LocationCode, Fields) ->
Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields),
Fields2 = maps:from_list(Fields1),
Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]),
iolist_to_binary(Bin)
end.",
Mapper = list_to_binary(Mapper0),
{ok, F} = iot_util:parse_mapper(Mapper),
{ok, Mapper, F}.
mysql_mapper() ->
Mapper0 = "fun(LocationCode, Fields, Timestamp) ->
Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields),
Content = jiffy:encode(maps:from_list(Fields1), [force_utf8]),
[{<<\"location_code\">>, LocationCode}, {<<\"content\">>, Content}, {<<\"created_ts\">>, Timestamp}]
end.",
Mapper = list_to_binary(Mapper0),
{ok, F} = iot_util:parse_mapper(Mapper),
{ok, Mapper, F}.
insert_services(Num) ->
lists:foreach(fun(Id) ->
Res = mysql_client:insert(<<"micro_service">>,
Res = mysql_pool:insert(mysql_iot, <<"micro_service">>,
#{
<<"name">> => <<"微服务"/utf8, (integer_to_binary(Id))/binary>>,
<<"code">> => <<"1223423423423423"/utf8>>,

View File

@ -0,0 +1,46 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 29. 2018 17:01
%%%-------------------------------------------------------------------
-module(mysql_pool).
-author("aresei").
%% API
-export([get_row/2, get_row/3, get_all/2, get_all/3]).
-export([update/4, update_by/2, update_by/3, insert/4]).
%%
-spec get_row(Pool :: atom(), Sql::binary()) -> {ok, Record::map()} | undefined.
get_row(Pool, Sql) when is_atom(Pool), is_binary(Sql) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_row(ConnPid, Sql) end).
-spec get_row(Pool :: atom(), Sql::binary(), Params::list()) -> {ok, Record::map()} | undefined.
get_row(Pool, Sql, Params) when is_atom(Pool), is_binary(Sql), is_list(Params) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_row(ConnPid, Sql, Params) end).
-spec get_all(Pool :: atom(), Sql::binary()) -> {ok, Rows::list()} | {error, Reason :: any()}.
get_all(Pool, Sql) when is_atom(Pool), is_binary(Sql) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_all(ConnPid, Sql) end).
-spec get_all(Pool :: atom(), Sql::binary(), Params::list()) -> {ok, Rows::list()} | {error, Reason::any()}.
get_all(Pool, Sql, Params) when is_atom(Pool), is_binary(Sql), is_list(Params) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_all(ConnPid, Sql, Params) end).
-spec insert(Pool :: atom(), Table :: binary(), Fields :: map() | list(), boolean()) -> ok | {ok, InsertId :: integer()} | {error, Reason :: any()}.
insert(Pool, Table, Fields, FetchInsertId) when is_atom(Pool), is_binary(Table), is_list(Fields); is_map(Fields), is_boolean(FetchInsertId) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:insert(ConnPid, Table, Fields, FetchInsertId) end).
update_by(Pool, UpdateSql) when is_atom(Pool), is_binary(UpdateSql) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:update_by(ConnPid, UpdateSql) end).
-spec update_by(Pool :: atom(), UpdateSql :: binary(), Params :: list()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
update_by(Pool, UpdateSql, Params) when is_atom(Pool), is_binary(UpdateSql) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:update_by(ConnPid, UpdateSql, Params) end).
-spec update(Pool :: atom(), Table :: binary(), Fields :: map(), WhereFields :: map()) -> {ok, AffectedRows::integer()} | {error, Reason::any()}.
update(Pool, Table, Fields, WhereFields) when is_atom(Pool), is_binary(Table), is_map(Fields), is_map(WhereFields) ->
poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:update(ConnPid, Table, Fields, WhereFields) end).

View File

@ -0,0 +1,143 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 29. 2018 17:01
%%%-------------------------------------------------------------------
-module(mysql_provider).
-author("aresei").
-define(POOL_NAME, mysql_pool).
%% API
-export([get_row/2, get_row/3, get_all/2, get_all/3]).
-export([update/4, update_by/2, update_by/3, insert/4]).
%%
-spec get_row(ConnPid :: pid(), Sql::binary()) -> {ok, Record::map()} | undefined.
get_row(ConnPid, Sql) when is_pid(ConnPid), is_binary(Sql) ->
lager:debug("[mysql_client] the get_row sql is: ~p", [Sql]),
case mysql:query(ConnPid, Sql) of
{ok, Names, [Row | _]} ->
{ok, maps:from_list(lists:zip(Names, Row))};
{ok, _, []} ->
undefined;
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
undefined
end.
-spec get_row(ConnPid :: pid(), Sql::binary(), Params::list()) -> {ok, Record::map()} | undefined.
get_row(ConnPid, Sql, Params) when is_pid(ConnPid), is_binary(Sql), is_list(Params) ->
lager:debug("[mysql_client] the get_row sql is: ~p, params: ~p", [Sql, Params]),
case mysql:query(ConnPid, Sql, Params) of
{ok, Names, [Row | _]} ->
{ok, maps:from_list(lists:zip(Names, Row))};
{ok, _, []} ->
undefined;
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
undefined
end.
-spec get_all(ConnPid :: pid(), Sql::binary()) -> {ok, Rows::list()} | {error, Reason :: any()}.
get_all(ConnPid, Sql) when is_pid(ConnPid), is_binary(Sql) ->
lager:debug("[mysql_client] the get_all sql is: ~p", [Sql]),
case mysql:query(ConnPid, Sql) of
{ok, Names, Rows} ->
{ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)};
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
Error
end.
-spec get_all(ConnPid :: pid(), Sql::binary(), Params::list()) -> {ok, Rows::list()} | {error, Reason::any()}.
get_all(ConnPid, Sql, Params) when is_pid(ConnPid), is_binary(Sql), is_list(Params) ->
lager:debug("[mysql_client] the get_all sql is: ~p, params: ~p", [Sql, Params]),
case mysql:query(ConnPid, Sql, Params) of
{ok, Names, Rows} ->
{ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)};
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
{ok, []}
end.
-spec insert(ConnPid :: pid(), Table :: binary(), Fields :: map() | list(), boolean()) -> ok | {ok, InsertId :: integer()} | {error, Reason :: any()}.
insert(ConnPid, Table, Fields, FetchInsertId) when is_pid(ConnPid), is_binary(Table), is_map(Fields), is_boolean(FetchInsertId) ->
insert(ConnPid, Table, maps:to_list(Fields), FetchInsertId);
insert(ConnPid, Table, Fields, FetchInsertId) when is_pid(ConnPid), is_binary(Table), is_list(Fields), is_boolean(FetchInsertId) ->
{Keys, Values} = kvs(Fields),
FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)),
Placeholders = lists:duplicate(length(Keys), <<"?">>),
ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)),
Sql = <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>,
lager:debug("[mysql_client] the insert sql is: ~p, params: ~p", [Sql, Values]),
case mysql:query(ConnPid, Sql, Values) of
ok ->
case FetchInsertId of
true ->
InsertId = mysql:insert_id(ConnPid),
{ok, InsertId};
false ->
ok
end;
Error ->
Error
end.
update_by(ConnPid, UpdateSql) when is_pid(ConnPid), is_binary(UpdateSql) ->
lager:debug("[mysql_client] updateBySql sql: ~p", [UpdateSql]),
case mysql:query(ConnPid, UpdateSql) of
ok ->
AffectedRows = mysql:affected_rows(ConnPid),
{ok, AffectedRows};
Error ->
Error
end.
-spec update_by(ConnPid :: pid(), UpdateSql :: binary(), Params :: list()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
update_by(ConnPid, UpdateSql, Params) when is_pid(ConnPid), is_binary(UpdateSql) ->
lager:debug("[mysql_client] updateBySql sql: ~p, params: ~p", [UpdateSql, Params]),
case mysql:query(ConnPid, UpdateSql, Params) of
ok ->
AffectedRows = mysql:affected_rows(ConnPid),
{ok, AffectedRows};
Error ->
Error
end.
-spec update(ConnPid :: pid(), Sql :: binary(), Fields :: map(), WhereFields :: map()) -> {ok, AffectedRows::integer()} | {error, Reason::any()}.
update(ConnPid, Table, Fields, WhereFields) when is_pid(ConnPid), is_binary(Table), is_map(Fields), is_map(WhereFields) ->
%% set
{SetKeys, SetVals} = kvs(Fields),
SetKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, SetKeys),
SetSql = iolist_to_binary(lists:join(<<", ">>, SetKeys1)),
%% where
{WhereKeys, WhereVals} = kvs(WhereFields),
WhereKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, WhereKeys),
WhereSql = iolist_to_binary(lists:join(<<" AND ">>, WhereKeys1)),
Params = SetVals ++ WhereVals,
Sql = <<"UPDATE ", Table/binary, " SET ", SetSql/binary, " WHERE ", WhereSql/binary>>,
lager:debug("[mysql_client] update sql is: ~p, params: ~p", [Sql, Params]),
case mysql:query(ConnPid, Sql, Params) of
ok ->
AffectedRows = mysql:affected_rows(ConnPid),
{ok, AffectedRows};
Error ->
lager:error("[mysql_client] update sql: ~p, params: ~p, get a error: ~p", [Sql, Params, Error]),
Error
end.
-spec kvs(Fields :: map() | list()) -> {Keys :: list(), Values :: list()}.
kvs(Fields) when is_map(Fields) ->
kvs(maps:to_list(Fields));
kvs(Fields) when is_list(Fields) ->
{Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields),
{lists:reverse(Keys0), lists:reverse(Values0)}.

View File

@ -1,160 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2018, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 29. 2018 17:01
%%%-------------------------------------------------------------------
-module(mysql_client).
-author("aresei").
-define(POOL_NAME, mysql_pool).
%% API
-export([get_row/1, get_row/2, get_all/1, get_all/2]).
-export([update/3, update_by/1, update_by/2, insert/3]).
%%
-spec get_row(Sql::binary()) -> {ok, Record::map()} | undefined.
get_row(Sql) when is_binary(Sql) ->
lager:debug("[mysql_client] the get_row sql is: ~p", [Sql]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, Sql) of
{ok, Names, [Row | _]} ->
{ok, maps:from_list(lists:zip(Names, Row))};
{ok, _, []} ->
undefined;
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
undefined
end
end).
-spec get_row(Sql::binary(), Params::list()) -> {ok, Record::map()} | undefined.
get_row(Sql, Params) when is_binary(Sql), is_list(Params) ->
lager:debug("[mysql_client] the get_row sql is: ~p, params: ~p", [Sql, Params]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, Sql, Params) of
{ok, Names, [Row | _]} ->
{ok, maps:from_list(lists:zip(Names, Row))};
{ok, _, []} ->
undefined;
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
undefined
end
end).
-spec get_all(Sql::binary()) -> {ok, Rows::list()} | {error, Reason :: any()}.
get_all(Sql) when is_binary(Sql) ->
lager:debug("[mysql_client] the get_all sql is: ~p", [Sql]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, Sql) of
{ok, Names, Rows} ->
{ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)};
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
Error
end
end).
-spec get_all(Sql::binary(), Params::list()) -> {ok, Rows::list()} | {error, Reason::any()}.
get_all(Sql, Params) when is_binary(Sql), is_list(Params) ->
lager:debug("[mysql_client] the get_all sql is: ~p, params: ~p", [Sql, Params]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, Sql, Params) of
{ok, Names, Rows} ->
{ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)};
Error ->
lager:warning("[mysql_client] get error: ~p", [Error]),
{ok, []}
end
end).
-spec insert(Table :: binary(), Fields :: map() | list(), boolean()) -> ok | {ok, InsertId :: integer()} | {error, Reason :: any()}.
insert(Table, Fields, FetchInsertId) when is_binary(Table), is_map(Fields), is_boolean(FetchInsertId) ->
insert(Table, maps:to_list(Fields), FetchInsertId);
insert(Table, Fields, FetchInsertId) when is_binary(Table), is_list(Fields), is_boolean(FetchInsertId) ->
{Keys, Values} = kvs(Fields),
FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)),
Placeholders = lists:duplicate(length(Keys), <<"?">>),
ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)),
Sql = <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>,
lager:debug("[mysql_client] the insert sql is: ~p, params: ~p", [Sql, Values]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, Sql, Values) of
ok ->
case FetchInsertId of
true ->
InsertId = mysql:insert_id(ConnPid),
{ok, InsertId};
false ->
ok
end;
Error ->
Error
end
end).
update_by(UpdateSql) when is_binary(UpdateSql) ->
lager:debug("[mysql_client] updateBySql sql: ~p", [UpdateSql]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, UpdateSql) of
ok ->
AffectedRows = mysql:affected_rows(ConnPid),
{ok, AffectedRows};
Error ->
Error
end
end).
-spec update_by(UpdateSql :: binary(), Params :: list()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
update_by(UpdateSql, Params) when is_binary(UpdateSql) ->
lager:debug("[mysql_client] updateBySql sql: ~p, params: ~p", [UpdateSql, Params]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, UpdateSql, Params) of
ok ->
AffectedRows = mysql:affected_rows(ConnPid),
{ok, AffectedRows};
Error ->
Error
end
end).
-spec update(Sql :: binary(), Fields :: map(), WhereFields :: map()) -> {ok, AffectedRows::integer()} | {error, Reason::any()}.
update(Table, Fields, WhereFields) when is_binary(Table), is_map(Fields), is_map(WhereFields) ->
%% set
{SetKeys, SetVals} = kvs(Fields),
SetKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, SetKeys),
SetSql = iolist_to_binary(lists:join(<<", ">>, SetKeys1)),
%% where
{WhereKeys, WhereVals} = kvs(WhereFields),
WhereKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, WhereKeys),
WhereSql = iolist_to_binary(lists:join(<<" AND ">>, WhereKeys1)),
Params = SetVals ++ WhereVals,
Sql = <<"UPDATE ", Table/binary, " SET ", SetSql/binary, " WHERE ", WhereSql/binary>>,
lager:debug("[mysql_client] update sql is: ~p, params: ~p", [Sql, Params]),
poolboy:transaction(?POOL_NAME, fun(ConnPid) ->
case mysql:query(ConnPid, Sql, Params) of
ok ->
AffectedRows = mysql:affected_rows(ConnPid),
{ok, AffectedRows};
Error ->
lager:error("[mysql_client] update sql: ~p, params: ~p, get a error: ~p", [Sql, Params, Error]),
Error
end
end).
-spec kvs(Fields :: map() | list()) -> {Keys :: list(), Values :: list()}.
kvs(Fields) when is_map(Fields) ->
kvs(maps:to_list(Fields));
kvs(Fields) when is_list(Fields) ->
{Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields),
{lists:reverse(Keys0), lists:reverse(Values0)}.

View File

@ -0,0 +1,109 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 07. 8 2023 10:15
%%%-------------------------------------------------------------------
-module(broker_postman).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_server).
%% API
-export([start_link/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
parent_pid :: pid(),
pool_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(ParentPid :: pid(), Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(ParentPid, Mod, WorkerArgs, PoolSize) when is_pid(ParentPid), is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) ->
gen_server:start_link(?MODULE, [ParentPid, Mod, WorkerArgs, PoolSize], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Mod, WorkerArgs, PoolSize]) ->
%% 线
{ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, Mod}], WorkerArgs),
{ok, #state{parent_pid = ParentPid, pool_pid = PoolPid}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({post, ReceiverPid, NorthData}, State = #state{pool_pid = PoolPid}) ->
poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, NorthData} end),
{noreply, State};
handle_info(stop, State = #state{pool_pid = PoolPid}) ->
catch poolboy:stop(PoolPid),
{stop, normal, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{}) ->
lager:debug("[broker_postman] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -4,7 +4,7 @@
%%% @doc
%%%
%%% @end
%%% Created : 04. 7 2023 15:41
%%% Created : 06. 7 2023 16:23
%%%-------------------------------------------------------------------
-module(http_postman).
-author("aresei").
@ -13,19 +13,13 @@
-behaviour(gen_server).
%% API
-export([start_link/4]).
-export([start_link/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2,
code_change/3]).
-define(SERVER, ?MODULE).
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-record(state, {
parent_pid :: pid(),
url :: binary(),
pool_name :: atom(),
worker_pool_pid :: pid()
url :: binary()
}).
%%%===================================================================
@ -33,10 +27,10 @@
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(ParentPid :: pid(), Url :: binary(), PoolName :: atom(), PoolSize :: integer()) ->
-spec(start_link(Args :: proplists:proplist()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(ParentPid, Url, PoolName, PoolSize) when is_pid(ParentPid), is_binary(Url), is_atom(PoolName), is_integer(PoolSize) ->
gen_server:start_link(?MODULE, [ParentPid, Url, PoolName, PoolSize], []).
start_link(Args) when is_list(Args) ->
gen_server:start_link(?MODULE, [Args], []).
%%%===================================================================
%%% gen_server callbacks
@ -47,11 +41,9 @@ start_link(ParentPid, Url, PoolName, PoolSize) when is_pid(ParentPid), is_binary
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid, Url, PoolName, PoolSize]) ->
ok = hackney_pool:start_pool(PoolName, [{timeout, 150000}, {max_connections, PoolSize}]),
%% 线
{ok, WorkerPoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, http_postman_worker}], []),
{ok, #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}}.
init([Args]) ->
Url = proplists:get_value(url, Args),
{ok, #state{url = Url}}.
%% @private
%% @doc Handling call messages
@ -64,7 +56,7 @@ init([ParentPid, Url, PoolName, PoolSize]) ->
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
{noreply, State}.
%% @private
%% @doc Handling cast messages
@ -72,7 +64,7 @@ handle_call(_Request, _From, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Info, State) ->
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
@ -81,22 +73,24 @@ handle_cast(_Info, State) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(stop, State = #state{pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) ->
hackney_pool:stop_pool(PoolName),
poolboy:stop(WorkerPoolPid),
{stop, normal, State};
handle_info({post, #north_data{body = Body, id = Id}}, State = #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) ->
poolboy:transaction(WorkerPoolPid, fun(Pid) ->
case http_postman_worker:post(Pid, Url, Body, PoolName) of
ok ->
ParentPid ! {ack, Id};
{error, Reason} ->
lager:debug("[http_postman] post url: ~p, body: ~p, get error: ~p", [Url, Body, Reason])
end
end),
{noreply, State}.
handle_info({post, ReceiverPid, #north_data{id = Id, body = Body}}, State = #state{url = Url}) ->
Headers = [
{<<"content-type">>, <<"application/json">>}
],
case hackney:request(post, Url, Headers, Body) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]),
ReceiverPid ! {ack, Id},
{noreply, State};
{ok, HttpCode, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
{noreply, State};
{error, Reason} ->
lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]),
{noreply, State}
end.
%% @private
%% @doc This function is called by a gen_server when it is about to
@ -105,10 +99,8 @@ handle_info({post, #north_data{body = Body, id = Id}}, State = #state{parent_pid
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) ->
lager:debug("[http_postman] terminate with reasson: ~p", [Reason]),
catch hackney_pool:stop_pool(PoolName),
catch poolboy:stop(WorkerPoolPid),
terminate(Reason, #state{url = Url}) ->
lager:debug("[http_postman] url: ~p, terminate with reason: ~p", [Url, Reason]),
ok.
%% @private

View File

@ -100,12 +100,12 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p
end;
%%
handle_info({post, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
ok ->
ParentPid ! {ack, Id},
ReceiverPid ! {ack, Id},
{noreply, State};
{ok, PacketId} ->
lager:debug("[mqtt_postman] send success, packet_id: ~p", [PacketId]),

View File

@ -4,16 +4,16 @@
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 16:23
%%% Created : 07. 8 2023 10:15
%%%-------------------------------------------------------------------
-module(http_postman_worker).
-module(mysql_postman).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_server).
%% API
-export([start_link/1]).
-export([post/4]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -21,19 +21,16 @@
-define(SERVER, ?MODULE).
-record(state, {
mysql_pid :: pid(),
table :: binary()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec post(Pid :: pid(), Url :: binary(), Body :: binary(), PoolName :: atom()) -> ok | {error, Reason :: any()}.
post(Pid, Url, Body, PoolName) when is_pid(Pid), is_binary(Url), is_binary(Body), is_atom(PoolName) ->
gen_server:call(Pid, {post, Url, Body, PoolName}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link(Args :: proplists:proplist()) ->
-spec(start_link(Args :: list()) ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link(Args) when is_list(Args) ->
gen_server:start_link(?MODULE, [Args], []).
@ -47,8 +44,13 @@ start_link(Args) when is_list(Args) ->
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([_]) ->
{ok, #state{}}.
init([Args]) ->
MysqlOpts = proplists:get_value(mysql_opts, Args),
Table = proplists:get_value(table, Args),
{ok, ConnPid} = mysql:start_link(MysqlOpts),
{ok, #state{mysql_pid = ConnPid, table = Table}}.
%% @private
%% @doc Handling call messages
@ -60,23 +62,8 @@ init([_]) ->
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({post, Url, Body, PoolName}, _From, State = #state{}) ->
Headers = [
{<<"content-type">>, <<"application/json">>}
],
case hackney:request(post, Url, Headers, Body, [{pool, PoolName}]) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
lager:debug("[iot_http_client] url: ~p, response is: ~p", [Url, RespBody]),
{reply, ok, State};
{ok, HttpCode, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
lager:debug("[iot_http_client] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
{reply, {error, {http_code, HttpCode}}, State};
{error, Reason} ->
lager:warning("[iot_http_client] url: ~p, get error: ~p", [Url, Reason]),
{reply, {error, Reason}}
end.
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
@ -93,8 +80,18 @@ handle_cast(_Request, State = #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
handle_info({post, ReceiverPid, #north_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
case catch mysql_provider:insert(ConnPid, Table, Fields, false) of
ok ->
ReceiverPid ! {ack, Id};
Error ->
lager:debug("[mysql_postman] insert table: ~p, res is: ~p", [Table, Error])
end,
{noreply, State};
handle_info(stop, State = #state{mysql_pid = ConnPid}) ->
mysql:stop(ConnPid),
{stop, normal, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to

View File

@ -32,7 +32,7 @@
{pools, [
%% mysql连接池配置
{mysql_pool,
{mysql_iot,
[{size, 10}, {max_overflow, 20}, {worker_module, mysql}],
[
{host, {39, 98, 184, 67}},

View File

@ -32,7 +32,7 @@
{pools, [
%% mysql连接池配置
{mysql_pool,
{mysql_iot,
[{size, 10}, {max_overflow, 20}, {worker_module, mysql}],
[
{host, {172, 19, 0, 2}},