From 049623d2a3bf8fd9633770b3704072f08d2ad2e8 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Mon, 7 Aug 2023 17:22:31 +0800 Subject: [PATCH] support mysql endpoint --- apps/iot/include/iot.hrl | 21 ++- apps/iot/src/database/device_bo.erl | 2 +- apps/iot/src/database/host_bo.erl | 12 +- apps/iot/src/database/micro_inform_log.erl | 2 +- apps/iot/src/database/micro_set_bo.erl | 2 +- apps/iot/src/database/scene_feedback.erl | 2 +- apps/iot/src/database/scene_feedback_step.erl | 2 +- apps/iot/src/database/task_logs_bo.erl | 2 +- .../iot/src/http_handler/endpoint_handler.erl | 31 ++-- apps/iot/src/http_handler/http_protocol.erl | 7 +- apps/iot/src/iot_endpoint.erl | 54 ++++-- apps/iot/src/iot_host.erl | 2 +- apps/iot/src/iot_router.erl | 18 +- apps/iot/src/iot_sup.erl | 5 +- apps/iot/src/iot_util.erl | 2 +- apps/iot/src/mnesia/mnesia_endpoint.erl | 4 +- apps/iot/src/mocker/iot_mock.erl | 77 +++++++-- apps/iot/src/mysql/mysql_pool.erl | 46 +++++ apps/iot/src/mysql/mysql_provider.erl | 143 ++++++++++++++++ apps/iot/src/mysql_client.erl | 160 ------------------ apps/iot/src/postman/broker_postman.erl | 109 ++++++++++++ apps/iot/src/postman/http_postman.erl | 72 ++++---- apps/iot/src/postman/mqtt_postman.erl | 4 +- ...p_postman_worker.erl => mysql_postman.erl} | 57 +++---- config/sys-dev.config | 2 +- config/sys-prod.config | 2 +- 26 files changed, 529 insertions(+), 311 deletions(-) create mode 100644 apps/iot/src/mysql/mysql_pool.erl create mode 100644 apps/iot/src/mysql/mysql_provider.erl delete mode 100644 apps/iot/src/mysql_client.erl create mode 100644 apps/iot/src/postman/broker_postman.erl rename apps/iot/src/postman/{http_postman_worker.erl => mysql_postman.erl} (69%) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 0aa5206..171e48c 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -68,6 +68,16 @@ topic = <<>> :: binary() }). +-record(mysql_endpoint, { + host = <<>> :: binary(), + port = 0 :: integer(), + username = <<>> :: binary(), + password = <<>> :: binary(), + database = <<>> :: binary(), + table_name = <<>> :: binary(), + pool_size = 10 :: integer() +}). + %% 对端配置 -record(endpoint, { %% 不同的对端名字要唯一 @@ -77,10 +87,12 @@ %% 匹配规则, 固定了满足点位信息的前缀匹配的数据的转发规则 matcher = <<>> :: binary(), mapper = <<>> :: binary(), - %% 数据转换规则,基于function - mapper_fun = fun(_, Data) -> Data end :: fun((binary(), any()) -> any()), + %% 数据转换规则,基于 + %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}]) + %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer()) + mapper_fun = fun(_, Fields) -> Fields end :: fun(), %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} - config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{}, + config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{}, %% 更新时间 updated_at = 0 :: integer(), %% 创建时间 @@ -97,5 +109,6 @@ -record(north_data, { id = 0 :: integer(), location_code :: binary(), - body :: binary() + %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] + body :: binary() | list() }). \ No newline at end of file diff --git a/apps/iot/src/database/device_bo.erl b/apps/iot/src/database/device_bo.erl index bca4139..36ed236 100644 --- a/apps/iot/src/database/device_bo.erl +++ b/apps/iot/src/database/device_bo.erl @@ -14,4 +14,4 @@ -export([get_device_by_uuid/1]). get_device_by_uuid(UUID) when is_binary(UUID) -> - mysql_client:get_row(<<"SELECT * FROM device WHERE uuid = ? LIMIT 1">>, [UUID]). \ No newline at end of file + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE uuid = ? LIMIT 1">>, [UUID]). \ No newline at end of file diff --git a/apps/iot/src/database/host_bo.erl b/apps/iot/src/database/host_bo.erl index cf26f09..b452536 100644 --- a/apps/iot/src/database/host_bo.erl +++ b/apps/iot/src/database/host_bo.erl @@ -16,7 +16,7 @@ -spec get_all_hosts() -> UUIDList :: [binary()]. get_all_hosts() -> - case mysql_client:get_all(<<"SELECT uuid FROM host where uuid != ''">>) of + case mysql_pool:get_all(mysql_iot, <<"SELECT uuid FROM host where uuid != ''">>) of {ok, Hosts} -> lists:map(fun(#{<<"uuid">> := UUID}) -> UUID end, Hosts); {error, _Reason} -> @@ -24,22 +24,22 @@ get_all_hosts() -> end. get_host(HostId) when is_integer(HostId) -> - mysql_client:get_row(<<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]). + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]). get_host_by_uuid(UUID) when is_binary(UUID) -> - mysql_client:get_row(<<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]). + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]). create_host(UUID) when is_binary(UUID) -> - mysql_client:insert(<<"host">>, #{<<"UUID">> => UUID, <<"status">> => ?HOST_STATUS_INACTIVE}, true). + mysql_pool:insert(mysql_iot, <<"host">>, #{<<"UUID">> => UUID, <<"status">> => ?HOST_STATUS_INACTIVE}, true). %% 修改主机的状态 -spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. change_status(UUID, Status) when is_binary(UUID), is_integer(Status) -> - mysql_client:update_by(<<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]). + mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [Status, UUID]). %% 判断主机是否已经授权 is_authorized(HostId) when is_integer(HostId) -> - case mysql_client:get_row(<<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]) of + case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]) of {ok, #{<<"status">> := Status}} when Status =/= ?HOST_STATUS_INACTIVE -> true; _ -> diff --git a/apps/iot/src/database/micro_inform_log.erl b/apps/iot/src/database/micro_inform_log.erl index 989d9f5..c5128fd 100644 --- a/apps/iot/src/database/micro_inform_log.erl +++ b/apps/iot/src/database/micro_inform_log.erl @@ -14,4 +14,4 @@ -export([insert/1]). insert(Fields) when is_map(Fields) -> - mysql_client:insert(<<"micro_inform_log">>, Fields, true). \ No newline at end of file + mysql_pool:insert(mysql_iot, <<"micro_inform_log">>, Fields, true). \ No newline at end of file diff --git a/apps/iot/src/database/micro_set_bo.erl b/apps/iot/src/database/micro_set_bo.erl index 6786754..80e30ee 100644 --- a/apps/iot/src/database/micro_set_bo.erl +++ b/apps/iot/src/database/micro_set_bo.erl @@ -16,4 +16,4 @@ %% 修改主机的状态 -spec change_status(HostId :: integer(), SceneId :: integer(), MircoId :: integer(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. change_status(HostId, SceneId, MircoId, Status) when is_integer(HostId), is_integer(SceneId), is_integer(MircoId), is_integer(Status) -> - mysql_client:update_by(<<"UPDATE micro_set SET status = ? WHERE host_id = ? AND scene_id = ? AND micro_id = ? LIMIT 1">>, [Status, HostId, SceneId, MircoId]). \ No newline at end of file + mysql_pool:update_by(mysql_iot, <<"UPDATE micro_set SET status = ? WHERE host_id = ? AND scene_id = ? AND micro_id = ? LIMIT 1">>, [Status, HostId, SceneId, MircoId]). \ No newline at end of file diff --git a/apps/iot/src/database/scene_feedback.erl b/apps/iot/src/database/scene_feedback.erl index 60b4550..b7620ee 100644 --- a/apps/iot/src/database/scene_feedback.erl +++ b/apps/iot/src/database/scene_feedback.erl @@ -14,4 +14,4 @@ -export([insert/1]). insert(Fields) when is_map(Fields) -> - mysql_client:insert(<<"scene_feedback">>, Fields, true). \ No newline at end of file + mysql_pool:insert(mysql_iot, <<"scene_feedback">>, Fields, true). \ No newline at end of file diff --git a/apps/iot/src/database/scene_feedback_step.erl b/apps/iot/src/database/scene_feedback_step.erl index 548d551..c404896 100644 --- a/apps/iot/src/database/scene_feedback_step.erl +++ b/apps/iot/src/database/scene_feedback_step.erl @@ -14,4 +14,4 @@ -export([insert/1]). insert(Fields) when is_map(Fields) -> - mysql_client:insert(<<"scene_feedback_step">>, Fields, true). \ No newline at end of file + mysql_pool:insert(mysql_iot, <<"scene_feedback_step">>, Fields, true). \ No newline at end of file diff --git a/apps/iot/src/database/task_logs_bo.erl b/apps/iot/src/database/task_logs_bo.erl index ea60ac1..4d2b25a 100644 --- a/apps/iot/src/database/task_logs_bo.erl +++ b/apps/iot/src/database/task_logs_bo.erl @@ -16,4 +16,4 @@ %% 修改主机的状态 -spec change_status(TaskId :: integer(), Status :: integer()) -> {ok, AffectedRow :: integer()} | {error, Reason :: any()}. change_status(TaskId, Status) when is_integer(TaskId), is_integer(Status) -> - mysql_client:update_by(<<"UPDATE task_logs SET status = ? WHERE id = ? LIMIT 1">>, [Status, TaskId]). \ No newline at end of file + mysql_pool:update_by(mysql_iot, <<"UPDATE task_logs SET status = ? WHERE id = ? LIMIT 1">>, [Status, TaskId]). \ No newline at end of file diff --git a/apps/iot/src/http_handler/endpoint_handler.erl b/apps/iot/src/http_handler/endpoint_handler.erl index e774f96..aba9d22 100644 --- a/apps/iot/src/http_handler/endpoint_handler.erl +++ b/apps/iot/src/http_handler/endpoint_handler.erl @@ -33,7 +33,8 @@ handle_request("GET", "/endpoint/all", GetParams, _) -> %% 重新加载对应的主机信息 handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) -> - case lists:all(fun(Key) -> maps:is_key(Key, Params) end, [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>]) of + RequiredFields = [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>], + case lists:all(fun(Key) -> maps:is_key(Key, Params) end, RequiredFields) of true -> ok; false -> @@ -44,13 +45,8 @@ handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) -> undefined -> Endpoint0 = make_endpoint(maps:to_list(Params), #endpoint{}), Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()}, - case mnesia_endpoint:insert(Endpoint) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[endpoint_handler] create router, get error is: ~p", [Reason]), - {ok, 200, iot_util:json_error(404, <<"error">>)} - end; + ok = mnesia_endpoint:insert(Endpoint), + {ok, 200, iot_util:json_data(<<"success">>)}; {ok, _} -> {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)} end; @@ -65,14 +61,9 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh Params1 = maps:remove(<<"name">>, Params), NEndpoint = make_endpoint(maps:to_list(Params1), Endpoint), NEndpoint1 = NEndpoint#endpoint{updated_at = iot_util:timestamp_of_seconds()}, + ok = mnesia_endpoint:insert(NEndpoint1), - case mnesia_endpoint:insert(NEndpoint1) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[endpoint_handler] update endpoint, get error is: ~p", [Reason]), - {ok, 200, iot_util:json_error(404, <<"error">>)} - end + {ok, 200, iot_util:json_data(<<"success">>)} end; %% 删除对应的主机信息 @@ -158,6 +149,16 @@ make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<< andalso is_binary(Topic) andalso Topic /= <<>> -> make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}); +%% mysql的支持 +make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"databse">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint) + when is_binary(Username) andalso Username /= <<>> + andalso is_binary(Password) andalso Password /= <<>> + andalso is_binary(Host) andalso Host /= <<>> + andalso is_integer(Port) andalso Port > 0 + andalso is_binary(Database) andalso Database /= <<>> + andalso is_binary(TableName) andalso TableName /= <<>> -> + make_endpoint(Params, Endpoint#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}}); + make_endpoint([{<<"config">>, Config} | _], _) -> lager:warning("[endpoint_handler] unsupport config: ~p", [Config]), throw(<<"invalid config">>); diff --git a/apps/iot/src/http_handler/http_protocol.erl b/apps/iot/src/http_handler/http_protocol.erl index 18d3f18..fb3ed4b 100644 --- a/apps/iot/src/http_handler/http_protocol.erl +++ b/apps/iot/src/http_handler/http_protocol.erl @@ -13,8 +13,6 @@ -export([init/2]). init(Req0, Opts = [Mod|_]) -> - StartTs = erlang:monotonic_time(), - Method = binary_to_list(cowboy_req:method(Req0)), Path = binary_to_list(cowboy_req:path(Req0)), GetParams0 = cowboy_req:parse_qs(Req0), @@ -23,7 +21,7 @@ init(Req0, Opts = [Mod|_]) -> try Mod:handle_request(Method, Path, GetParams, PostParams) of {ok, StatusCode, Resp} -> - lager:debug("[http_protocol] get a request, path: ~p, get params: ~p, post params: ~p, response: ~ts", + lager:debug("[http_protocol] request path: ~p, get_params: ~p, post_params: ~p, response: ~ts", [Path, GetParams, PostParams, Resp]), AcceptEncoding = cowboy_req:header(<<"accept-encoding">>, Req1, <<>>), Req2 = case iolist_size(Resp) >= 1024 andalso supported_gzip(AcceptEncoding) of @@ -38,9 +36,6 @@ init(Req0, Opts = [Mod|_]) -> <<"Content-Type">> => <<"application/json;charset=utf-8">> }, Resp, Req1) end, - EndTs = erlang:monotonic_time(), - CostTs = erlang:convert_time_unit(EndTs - StartTs, native, microsecond), - lager:debug("cost_ts is: ~p", [CostTs]), {ok, Req2, Opts} catch throw:Error -> diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index ee67f98..66e54d2 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/2]). --export([get_name/1, get_pid/1, forward/3, get_stat/1, reload/2, clean_up/1]). +-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -53,11 +53,11 @@ get_name(EndpointName) when is_binary(EndpointName) -> get_pid(Name) when is_binary(Name) -> whereis(get_name(Name)). --spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list()) -> no_return(). -forward(undefined, _, _) -> +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(undefined, _, _, _) -> ok; -forward(Pid, LocationCode, Fields) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields) -> - gen_statem:cast(Pid, {forward, LocationCode, Fields}). +forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). @@ -70,6 +70,10 @@ get_stat(Pid) when is_pid(Pid) -> clean_up(Pid) when is_pid(Pid) -> gen_statem:call(Pid, clean_up, 5000). +-spec get_mapper_fun(Pid :: pid()) -> fun(). +get_mapper_fun(Pid) when is_pid(Pid) -> + gen_statem:call(Pid, get_mapper_fun). + %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. @@ -136,9 +140,14 @@ handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, Stat {next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}} end; -handle_event(cast, {forward, LocationCode, Data}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> try - Body = MapperFun(LocationCode, Data), + Body = if + is_function(MapperFun, 2) -> + MapperFun(LocationCode, Fields); + is_function(MapperFun, 3) -> + MapperFun(LocationCode, Fields, Timestamp) + end, mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, body = Body}), %% 避免不必要的内部消息 Actions = case StateName =:= connected andalso FlightNum < WindowSize of @@ -162,7 +171,7 @@ handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cur {ok, NCursor, NorthData = #north_data{id = Id}} -> lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), - PostmanPid ! {post, NorthData}, + PostmanPid ! {post, self(), NorthData}, %% 重发机制 TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), @@ -188,7 +197,7 @@ handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endp %% 收到重发过期请求 handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, postman_pid = PostmanPid, timer_map = TimerMap}) -> lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), - PostmanPid ! {post, NorthData}, + PostmanPid ! {post, self(), NorthData}, %% 5秒后重发 TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), @@ -217,6 +226,9 @@ handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) -> mnesia:delete_table(TabName), {keep_state, State, [{reply, From, ok}]}; +handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) -> + {keep_state, State, [{reply, From, F}]}; + %% 获取当前统计信息 handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) -> Stat = #{ @@ -270,11 +282,11 @@ remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> end. %% 对http和https协议的支持 -create_postman(#endpoint{name = Name, config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> - PoolName = binary_to_atom(<<"http_pool:", Name/binary>>), - http_postman:start_link(self(), Url, PoolName, PoolSize); +create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> + WorkerArgs = [{url, Url}], + broker_postman:start_link(self(), http_postman, WorkerArgs, PoolSize); -%% 对mqtt协议的支持 +%% 对mqtt协议的支持, 只需要建立单个链接 create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> Node = atom_to_binary(node()), ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>, @@ -293,5 +305,21 @@ create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port ], mqtt_postman:start_link(self(), Opts, Topic, Qos); + +%% 对mysql协议的支持 +create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) -> + WorkerArgs = [ + {mysql_opts, [ + {host, binary_to_list(Host)}, + {port, Port}, + {user, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keep_alive, true}, + {database, binary_to_list(Database)}, + {queries, [<<"set names utf8">>]} + ]}, + {table, TableName} + ], + broker_postman:start_link(self(), mysql_postman, WorkerArgs, PoolSize); create_postman(#endpoint{}) -> throw(<<"not supported">>). \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 55e28a9..e337307 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -253,7 +253,7 @@ handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, {error, Reason} -> lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]); {ok, LocationCode} -> - iot_router:route(LocationCode, FieldsList) + iot_router:route(LocationCode, FieldsList, Timestamp) end, %% 数据写入influxdb diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index ad99e0b..31edf2e 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -11,23 +11,23 @@ -include("iot.hrl"). %% API --export([route/2]). +-export([route/3]). --spec route(LocationCode :: binary(), Fields :: list()) -> ok. -route(LocationCode, Fields) when is_binary(LocationCode), is_list(Fields) -> +-spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok. +route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) -> Endpoints = iot_endpoint_monitor:get_all_endpoints(), - router0(Endpoints, LocationCode, Fields). -router0([], _, _) -> + router0(Endpoints, LocationCode, Fields, Timestamp). +router0([], _, _, _) -> ok; -router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields) -> +router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields, Timestamp) -> {ok, MP} = re:compile(Regexp), case re:run(LocationCode, MP, [{capture, all, list}]) of nomatch -> - router0(Endpoints, LocationCode, Fields); + router0(Endpoints, LocationCode, Fields, Timestamp); {match, _} -> lager:debug("[iot_router] match endpoint: ~p", [Name]), Pid = iot_endpoint:get_pid(Name), - iot_endpoint:forward(Pid, LocationCode, Fields), + iot_endpoint:forward(Pid, LocationCode, Fields, Timestamp), %% 继续匹配其他的Endpoint - router0(Endpoints, LocationCode, Fields) + router0(Endpoints, LocationCode, Fields, Timestamp) end. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index b45a160..02f967d 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -27,7 +27,7 @@ start_link() -> %% modules => modules()} % optional init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - ChildSpecs = [ + Specs = [ #{ id => 'iot_endpoint_monitor', start => {'iot_endpoint_monitor', start_link, []}, @@ -55,7 +55,8 @@ init([]) -> modules => ['iot_host_sup'] } ], - {ok, {SupFlags, pools() ++ ChildSpecs}}. + + {ok, {SupFlags, pools() ++ Specs}}. %% internal functions diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index 4e8f378..519b4d6 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -109,7 +109,7 @@ parse_mapper(Mapper) when is_list(Mapper) -> {ok, Tokens, _} = erl_scan:string(Mapper), {ok, ExprList} = erl_parse:parse_exprs(Tokens), {value, F, _} = erl_eval:exprs(ExprList, []), - case is_function(F, 2) of + case is_function(F, 2) orelse is_function(F, 3) of true -> {ok, F}; false -> diff --git a/apps/iot/src/mnesia/mnesia_endpoint.erl b/apps/iot/src/mnesia/mnesia_endpoint.erl index 34828be..00613f9 100644 --- a/apps/iot/src/mnesia/mnesia_endpoint.erl +++ b/apps/iot/src/mnesia/mnesia_endpoint.erl @@ -88,4 +88,6 @@ config_map(#ws_endpoint{url = Url}) -> config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> #{<<"protocol">> => <<"kafka">>, <<"args">> => #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}}; config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> - #{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}. + #{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}; +config_map(#mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}) -> + #{<<"protocol">> => <<"mysql">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"database">> => Database, <<"table_name">> => TableName}}. diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index e642c50..77f2099 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -13,9 +13,9 @@ %% API -export([rsa_encode/1]). -export([insert_services/1]). --export([insert_endpoints/0, forward/0]). +-export([insert_endpoints/0, test_http/0, test_mysql/0, test_mqtt/0]). -forward() -> +test_http() -> Name = <<"zhongguodianli">>, Pid = iot_endpoint:get_pid(Name), @@ -28,19 +28,27 @@ forward() -> <<"unit">> => <<"cm">> } ], - iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields) + iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields, iot_util:timestamp_of_seconds()) end, lists:seq(1, 10000)). -insert_endpoints() -> - Mapper0 = "fun(LocationCode, Fields) -> - Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields), - Fields2 = maps:from_list(Fields1), - Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]), - iolist_to_binary(Bin) - end.", +test_mysql() -> + lists:foreach(fun(_) -> + iot_router:route(<<"mysql123">>, [ + #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, + #{<<"key">> => <<"age">>, <<"value">> => 30}, + #{<<"key">> => <<"flow">>, <<"value">> => 30} + ], iot_util:timestamp_of_seconds()) + end, lists:seq(1, 10000)). - Mapper = list_to_binary(Mapper0), - {ok, F} = iot_util:parse_mapper(Mapper), +test_mqtt() -> + iot_router:route(<<"test123">>, [ + #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, + #{<<"key">> => <<"age">>, <<"value">> => 30}, + #{<<"key">> => <<"flow">>, <<"value">> => 30} + ], iot_util:timestamp_of_seconds()). + +insert_endpoints() -> + {ok, Mapper, F} = simple_mapper(), mnesia_endpoint:insert(#endpoint{ name = <<"zhongguodianli">>, title = <<"中国电力"/utf8>>, @@ -68,11 +76,54 @@ insert_endpoints() -> created_at = iot_util:timestamp_of_seconds() }), + {ok, MysqlMapper, MysqlF} = mysql_mapper(), + mnesia_endpoint:insert(#endpoint{ + name = <<"mysql">>, + title = <<"测试mysql"/utf8>>, + matcher = <<"mysql*">>, + mapper = MysqlMapper, + mapper_fun = MysqlF, + config = #mysql_endpoint{ + host = <<"localhost">>, + port = 3306, + username = <<"php_an">>, + password = <<"123456">>, + database = <<"iot">>, + table_name = <<"north_data">> + }, + created_at = iot_util:timestamp_of_seconds() + }), + {Mapper, F}. +simple_mapper() -> + Mapper0 = "fun(LocationCode, Fields) -> + Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields), + Fields2 = maps:from_list(Fields1), + Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]), + iolist_to_binary(Bin) + end.", + + Mapper = list_to_binary(Mapper0), + {ok, F} = iot_util:parse_mapper(Mapper), + + {ok, Mapper, F}. + +mysql_mapper() -> + Mapper0 = "fun(LocationCode, Fields, Timestamp) -> + Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields), + Content = jiffy:encode(maps:from_list(Fields1), [force_utf8]), + [{<<\"location_code\">>, LocationCode}, {<<\"content\">>, Content}, {<<\"created_ts\">>, Timestamp}] + end.", + + Mapper = list_to_binary(Mapper0), + {ok, F} = iot_util:parse_mapper(Mapper), + + {ok, Mapper, F}. + insert_services(Num) -> lists:foreach(fun(Id) -> - Res = mysql_client:insert(<<"micro_service">>, + Res = mysql_pool:insert(mysql_iot, <<"micro_service">>, #{ <<"name">> => <<"微服务"/utf8, (integer_to_binary(Id))/binary>>, <<"code">> => <<"1223423423423423"/utf8>>, diff --git a/apps/iot/src/mysql/mysql_pool.erl b/apps/iot/src/mysql/mysql_pool.erl new file mode 100644 index 0000000..c141513 --- /dev/null +++ b/apps/iot/src/mysql/mysql_pool.erl @@ -0,0 +1,46 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2018, +%%% @doc +%%% +%%% @end +%%% Created : 29. 九月 2018 17:01 +%%%------------------------------------------------------------------- +-module(mysql_pool). +-author("aresei"). + +%% API +-export([get_row/2, get_row/3, get_all/2, get_all/3]). +-export([update/4, update_by/2, update_by/3, insert/4]). + +%% 从数据库中查找一行记录 +-spec get_row(Pool :: atom(), Sql::binary()) -> {ok, Record::map()} | undefined. +get_row(Pool, Sql) when is_atom(Pool), is_binary(Sql) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_row(ConnPid, Sql) end). + +-spec get_row(Pool :: atom(), Sql::binary(), Params::list()) -> {ok, Record::map()} | undefined. +get_row(Pool, Sql, Params) when is_atom(Pool), is_binary(Sql), is_list(Params) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_row(ConnPid, Sql, Params) end). + +-spec get_all(Pool :: atom(), Sql::binary()) -> {ok, Rows::list()} | {error, Reason :: any()}. +get_all(Pool, Sql) when is_atom(Pool), is_binary(Sql) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_all(ConnPid, Sql) end). + +-spec get_all(Pool :: atom(), Sql::binary(), Params::list()) -> {ok, Rows::list()} | {error, Reason::any()}. +get_all(Pool, Sql, Params) when is_atom(Pool), is_binary(Sql), is_list(Params) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:get_all(ConnPid, Sql, Params) end). + +-spec insert(Pool :: atom(), Table :: binary(), Fields :: map() | list(), boolean()) -> ok | {ok, InsertId :: integer()} | {error, Reason :: any()}. +insert(Pool, Table, Fields, FetchInsertId) when is_atom(Pool), is_binary(Table), is_list(Fields); is_map(Fields), is_boolean(FetchInsertId) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:insert(ConnPid, Table, Fields, FetchInsertId) end). + +update_by(Pool, UpdateSql) when is_atom(Pool), is_binary(UpdateSql) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:update_by(ConnPid, UpdateSql) end). + +-spec update_by(Pool :: atom(), UpdateSql :: binary(), Params :: list()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +update_by(Pool, UpdateSql, Params) when is_atom(Pool), is_binary(UpdateSql) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:update_by(ConnPid, UpdateSql, Params) end). + +-spec update(Pool :: atom(), Table :: binary(), Fields :: map(), WhereFields :: map()) -> {ok, AffectedRows::integer()} | {error, Reason::any()}. +update(Pool, Table, Fields, WhereFields) when is_atom(Pool), is_binary(Table), is_map(Fields), is_map(WhereFields) -> + poolboy:transaction(Pool, fun(ConnPid) -> mysql_provider:update(ConnPid, Table, Fields, WhereFields) end). \ No newline at end of file diff --git a/apps/iot/src/mysql/mysql_provider.erl b/apps/iot/src/mysql/mysql_provider.erl new file mode 100644 index 0000000..3e7823c --- /dev/null +++ b/apps/iot/src/mysql/mysql_provider.erl @@ -0,0 +1,143 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2018, +%%% @doc +%%% +%%% @end +%%% Created : 29. 九月 2018 17:01 +%%%------------------------------------------------------------------- +-module(mysql_provider). +-author("aresei"). + +-define(POOL_NAME, mysql_pool). + +%% API +-export([get_row/2, get_row/3, get_all/2, get_all/3]). +-export([update/4, update_by/2, update_by/3, insert/4]). + +%% 从数据库中查找一行记录 +-spec get_row(ConnPid :: pid(), Sql::binary()) -> {ok, Record::map()} | undefined. +get_row(ConnPid, Sql) when is_pid(ConnPid), is_binary(Sql) -> + lager:debug("[mysql_client] the get_row sql is: ~p", [Sql]), + case mysql:query(ConnPid, Sql) of + {ok, Names, [Row | _]} -> + {ok, maps:from_list(lists:zip(Names, Row))}; + {ok, _, []} -> + undefined; + Error -> + lager:warning("[mysql_client] get error: ~p", [Error]), + undefined + end. + +-spec get_row(ConnPid :: pid(), Sql::binary(), Params::list()) -> {ok, Record::map()} | undefined. +get_row(ConnPid, Sql, Params) when is_pid(ConnPid), is_binary(Sql), is_list(Params) -> + lager:debug("[mysql_client] the get_row sql is: ~p, params: ~p", [Sql, Params]), + case mysql:query(ConnPid, Sql, Params) of + {ok, Names, [Row | _]} -> + {ok, maps:from_list(lists:zip(Names, Row))}; + {ok, _, []} -> + undefined; + Error -> + lager:warning("[mysql_client] get error: ~p", [Error]), + undefined + end. + +-spec get_all(ConnPid :: pid(), Sql::binary()) -> {ok, Rows::list()} | {error, Reason :: any()}. +get_all(ConnPid, Sql) when is_pid(ConnPid), is_binary(Sql) -> + lager:debug("[mysql_client] the get_all sql is: ~p", [Sql]), + case mysql:query(ConnPid, Sql) of + {ok, Names, Rows} -> + {ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)}; + Error -> + lager:warning("[mysql_client] get error: ~p", [Error]), + Error + end. + +-spec get_all(ConnPid :: pid(), Sql::binary(), Params::list()) -> {ok, Rows::list()} | {error, Reason::any()}. +get_all(ConnPid, Sql, Params) when is_pid(ConnPid), is_binary(Sql), is_list(Params) -> + lager:debug("[mysql_client] the get_all sql is: ~p, params: ~p", [Sql, Params]), + case mysql:query(ConnPid, Sql, Params) of + {ok, Names, Rows} -> + {ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)}; + Error -> + lager:warning("[mysql_client] get error: ~p", [Error]), + {ok, []} + end. + +-spec insert(ConnPid :: pid(), Table :: binary(), Fields :: map() | list(), boolean()) -> ok | {ok, InsertId :: integer()} | {error, Reason :: any()}. +insert(ConnPid, Table, Fields, FetchInsertId) when is_pid(ConnPid), is_binary(Table), is_map(Fields), is_boolean(FetchInsertId) -> + insert(ConnPid, Table, maps:to_list(Fields), FetchInsertId); +insert(ConnPid, Table, Fields, FetchInsertId) when is_pid(ConnPid), is_binary(Table), is_list(Fields), is_boolean(FetchInsertId) -> + {Keys, Values} = kvs(Fields), + + FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), + Placeholders = lists:duplicate(length(Keys), <<"?">>), + ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), + + Sql = <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, + lager:debug("[mysql_client] the insert sql is: ~p, params: ~p", [Sql, Values]), + case mysql:query(ConnPid, Sql, Values) of + ok -> + case FetchInsertId of + true -> + InsertId = mysql:insert_id(ConnPid), + {ok, InsertId}; + false -> + ok + end; + Error -> + Error + end. + +update_by(ConnPid, UpdateSql) when is_pid(ConnPid), is_binary(UpdateSql) -> + lager:debug("[mysql_client] updateBySql sql: ~p", [UpdateSql]), + case mysql:query(ConnPid, UpdateSql) of + ok -> + AffectedRows = mysql:affected_rows(ConnPid), + {ok, AffectedRows}; + Error -> + Error + end. + +-spec update_by(ConnPid :: pid(), UpdateSql :: binary(), Params :: list()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +update_by(ConnPid, UpdateSql, Params) when is_pid(ConnPid), is_binary(UpdateSql) -> + lager:debug("[mysql_client] updateBySql sql: ~p, params: ~p", [UpdateSql, Params]), + case mysql:query(ConnPid, UpdateSql, Params) of + ok -> + AffectedRows = mysql:affected_rows(ConnPid), + {ok, AffectedRows}; + Error -> + Error + end. + +-spec update(ConnPid :: pid(), Sql :: binary(), Fields :: map(), WhereFields :: map()) -> {ok, AffectedRows::integer()} | {error, Reason::any()}. +update(ConnPid, Table, Fields, WhereFields) when is_pid(ConnPid), is_binary(Table), is_map(Fields), is_map(WhereFields) -> + %% 拼接set + {SetKeys, SetVals} = kvs(Fields), + SetKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, SetKeys), + SetSql = iolist_to_binary(lists:join(<<", ">>, SetKeys1)), + + %% 拼接where + {WhereKeys, WhereVals} = kvs(WhereFields), + WhereKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, WhereKeys), + WhereSql = iolist_to_binary(lists:join(<<" AND ">>, WhereKeys1)), + + Params = SetVals ++ WhereVals, + + Sql = <<"UPDATE ", Table/binary, " SET ", SetSql/binary, " WHERE ", WhereSql/binary>>, + lager:debug("[mysql_client] update sql is: ~p, params: ~p", [Sql, Params]), + case mysql:query(ConnPid, Sql, Params) of + ok -> + AffectedRows = mysql:affected_rows(ConnPid), + {ok, AffectedRows}; + Error -> + lager:error("[mysql_client] update sql: ~p, params: ~p, get a error: ~p", [Sql, Params, Error]), + Error + end. + +-spec kvs(Fields :: map() | list()) -> {Keys :: list(), Values :: list()}. +kvs(Fields) when is_map(Fields) -> + kvs(maps:to_list(Fields)); +kvs(Fields) when is_list(Fields) -> + {Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields), + {lists:reverse(Keys0), lists:reverse(Values0)}. \ No newline at end of file diff --git a/apps/iot/src/mysql_client.erl b/apps/iot/src/mysql_client.erl deleted file mode 100644 index 8c111e8..0000000 --- a/apps/iot/src/mysql_client.erl +++ /dev/null @@ -1,160 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2018, -%%% @doc -%%% -%%% @end -%%% Created : 29. 九月 2018 17:01 -%%%------------------------------------------------------------------- --module(mysql_client). --author("aresei"). - --define(POOL_NAME, mysql_pool). - -%% API --export([get_row/1, get_row/2, get_all/1, get_all/2]). --export([update/3, update_by/1, update_by/2, insert/3]). - -%% 从数据库中查找一行记录 --spec get_row(Sql::binary()) -> {ok, Record::map()} | undefined. -get_row(Sql) when is_binary(Sql) -> - lager:debug("[mysql_client] the get_row sql is: ~p", [Sql]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, Sql) of - {ok, Names, [Row | _]} -> - {ok, maps:from_list(lists:zip(Names, Row))}; - {ok, _, []} -> - undefined; - Error -> - lager:warning("[mysql_client] get error: ~p", [Error]), - undefined - end - end). - --spec get_row(Sql::binary(), Params::list()) -> {ok, Record::map()} | undefined. -get_row(Sql, Params) when is_binary(Sql), is_list(Params) -> - lager:debug("[mysql_client] the get_row sql is: ~p, params: ~p", [Sql, Params]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, Sql, Params) of - {ok, Names, [Row | _]} -> - {ok, maps:from_list(lists:zip(Names, Row))}; - {ok, _, []} -> - undefined; - Error -> - lager:warning("[mysql_client] get error: ~p", [Error]), - undefined - end - end). - --spec get_all(Sql::binary()) -> {ok, Rows::list()} | {error, Reason :: any()}. -get_all(Sql) when is_binary(Sql) -> - lager:debug("[mysql_client] the get_all sql is: ~p", [Sql]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, Sql) of - {ok, Names, Rows} -> - {ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)}; - Error -> - lager:warning("[mysql_client] get error: ~p", [Error]), - Error - end - end). - --spec get_all(Sql::binary(), Params::list()) -> {ok, Rows::list()} | {error, Reason::any()}. -get_all(Sql, Params) when is_binary(Sql), is_list(Params) -> - lager:debug("[mysql_client] the get_all sql is: ~p, params: ~p", [Sql, Params]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, Sql, Params) of - {ok, Names, Rows} -> - {ok, lists:map(fun(Row) -> maps:from_list(lists:zip(Names, Row)) end, Rows)}; - Error -> - lager:warning("[mysql_client] get error: ~p", [Error]), - {ok, []} - end - end). - --spec insert(Table :: binary(), Fields :: map() | list(), boolean()) -> ok | {ok, InsertId :: integer()} | {error, Reason :: any()}. -insert(Table, Fields, FetchInsertId) when is_binary(Table), is_map(Fields), is_boolean(FetchInsertId) -> - insert(Table, maps:to_list(Fields), FetchInsertId); -insert(Table, Fields, FetchInsertId) when is_binary(Table), is_list(Fields), is_boolean(FetchInsertId) -> - {Keys, Values} = kvs(Fields), - - FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), - Placeholders = lists:duplicate(length(Keys), <<"?">>), - ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), - - Sql = <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, - lager:debug("[mysql_client] the insert sql is: ~p, params: ~p", [Sql, Values]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, Sql, Values) of - ok -> - case FetchInsertId of - true -> - InsertId = mysql:insert_id(ConnPid), - {ok, InsertId}; - false -> - ok - end; - Error -> - Error - end - end). - -update_by(UpdateSql) when is_binary(UpdateSql) -> - lager:debug("[mysql_client] updateBySql sql: ~p", [UpdateSql]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, UpdateSql) of - ok -> - AffectedRows = mysql:affected_rows(ConnPid), - {ok, AffectedRows}; - Error -> - Error - end - end). - --spec update_by(UpdateSql :: binary(), Params :: list()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. -update_by(UpdateSql, Params) when is_binary(UpdateSql) -> - lager:debug("[mysql_client] updateBySql sql: ~p, params: ~p", [UpdateSql, Params]), - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, UpdateSql, Params) of - ok -> - AffectedRows = mysql:affected_rows(ConnPid), - {ok, AffectedRows}; - Error -> - Error - end - end). - --spec update(Sql :: binary(), Fields :: map(), WhereFields :: map()) -> {ok, AffectedRows::integer()} | {error, Reason::any()}. -update(Table, Fields, WhereFields) when is_binary(Table), is_map(Fields), is_map(WhereFields) -> - %% 拼接set - {SetKeys, SetVals} = kvs(Fields), - SetKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, SetKeys), - SetSql = iolist_to_binary(lists:join(<<", ">>, SetKeys1)), - - %% 拼接where - {WhereKeys, WhereVals} = kvs(WhereFields), - WhereKeys1 = lists:map(fun(K) when is_binary(K) -> <<"`", K/binary, "` = ?">> end, WhereKeys), - WhereSql = iolist_to_binary(lists:join(<<" AND ">>, WhereKeys1)), - - Params = SetVals ++ WhereVals, - - Sql = <<"UPDATE ", Table/binary, " SET ", SetSql/binary, " WHERE ", WhereSql/binary>>, - lager:debug("[mysql_client] update sql is: ~p, params: ~p", [Sql, Params]), - - poolboy:transaction(?POOL_NAME, fun(ConnPid) -> - case mysql:query(ConnPid, Sql, Params) of - ok -> - AffectedRows = mysql:affected_rows(ConnPid), - {ok, AffectedRows}; - Error -> - lager:error("[mysql_client] update sql: ~p, params: ~p, get a error: ~p", [Sql, Params, Error]), - Error - end - end). - --spec kvs(Fields :: map() | list()) -> {Keys :: list(), Values :: list()}. -kvs(Fields) when is_map(Fields) -> - kvs(maps:to_list(Fields)); -kvs(Fields) when is_list(Fields) -> - {Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields), - {lists:reverse(Keys0), lists:reverse(Values0)}. \ No newline at end of file diff --git a/apps/iot/src/postman/broker_postman.erl b/apps/iot/src/postman/broker_postman.erl new file mode 100644 index 0000000..82e180f --- /dev/null +++ b/apps/iot/src/postman/broker_postman.erl @@ -0,0 +1,109 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% 支持以多进程池的方式工作 +%%% @end +%%% Created : 07. 8月 2023 10:15 +%%%------------------------------------------------------------------- +-module(broker_postman). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/4]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-record(state, { + parent_pid :: pid(), + pool_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link(ParentPid :: pid(), Mod :: atom(), WorkerArgs :: list(), PoolSize :: integer()) -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link(ParentPid, Mod, WorkerArgs, PoolSize) when is_pid(ParentPid), is_atom(Mod), is_list(WorkerArgs), is_integer(PoolSize) -> + gen_server:start_link(?MODULE, [ParentPid, Mod, WorkerArgs, PoolSize], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([ParentPid, Mod, WorkerArgs, PoolSize]) -> + %% 启动工作的线程池 + {ok, PoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, Mod}], WorkerArgs), + + {ok, #state{parent_pid = ParentPid, pool_pid = PoolPid}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({post, ReceiverPid, NorthData}, State = #state{pool_pid = PoolPid}) -> + poolboy:transaction(PoolPid, fun(Pid) -> Pid ! {post, ReceiverPid, NorthData} end), + {noreply, State}; + +handle_info(stop, State = #state{pool_pid = PoolPid}) -> + catch poolboy:stop(PoolPid), + + {stop, normal, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{}) -> + lager:debug("[broker_postman] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== diff --git a/apps/iot/src/postman/http_postman.erl b/apps/iot/src/postman/http_postman.erl index b2e6af3..d3d9fef 100644 --- a/apps/iot/src/postman/http_postman.erl +++ b/apps/iot/src/postman/http_postman.erl @@ -4,7 +4,7 @@ %%% @doc %%% %%% @end -%%% Created : 04. 7月 2023 15:41 +%%% Created : 06. 7月 2023 16:23 %%%------------------------------------------------------------------- -module(http_postman). -author("aresei"). @@ -13,19 +13,13 @@ -behaviour(gen_server). %% API --export([start_link/4]). +-export([start_link/1]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, - code_change/3]). - --define(SERVER, ?MODULE). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -record(state, { - parent_pid :: pid(), - url :: binary(), - pool_name :: atom(), - worker_pool_pid :: pid() + url :: binary() }). %%%=================================================================== @@ -33,10 +27,10 @@ %%%=================================================================== %% @doc Spawns the server and registers the local name (unique) --spec(start_link(ParentPid :: pid(), Url :: binary(), PoolName :: atom(), PoolSize :: integer()) -> +-spec(start_link(Args :: proplists:proplist()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link(ParentPid, Url, PoolName, PoolSize) when is_pid(ParentPid), is_binary(Url), is_atom(PoolName), is_integer(PoolSize) -> - gen_server:start_link(?MODULE, [ParentPid, Url, PoolName, PoolSize], []). +start_link(Args) when is_list(Args) -> + gen_server:start_link(?MODULE, [Args], []). %%%=================================================================== %%% gen_server callbacks @@ -47,11 +41,9 @@ start_link(ParentPid, Url, PoolName, PoolSize) when is_pid(ParentPid), is_binary -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([ParentPid, Url, PoolName, PoolSize]) -> - ok = hackney_pool:start_pool(PoolName, [{timeout, 150000}, {max_connections, PoolSize}]), - %% 启动工作的线程池 - {ok, WorkerPoolPid} = poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, http_postman_worker}], []), - {ok, #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}}. +init([Args]) -> + Url = proplists:get_value(url, Args), + {ok, #state{url = Url}}. %% @private %% @doc Handling call messages @@ -64,7 +56,7 @@ init([ParentPid, Url, PoolName, PoolSize]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. + {noreply, State}. %% @private %% @doc Handling cast messages @@ -72,7 +64,7 @@ handle_call(_Request, _From, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast(_Info, State) -> +handle_cast(_Request, State = #state{}) -> {noreply, State}. %% @private @@ -81,22 +73,24 @@ handle_cast(_Info, State) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info(stop, State = #state{pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) -> - hackney_pool:stop_pool(PoolName), - poolboy:stop(WorkerPoolPid), - - {stop, normal, State}; - -handle_info({post, #north_data{body = Body, id = Id}}, State = #state{parent_pid = ParentPid, url = Url, pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) -> - poolboy:transaction(WorkerPoolPid, fun(Pid) -> - case http_postman_worker:post(Pid, Url, Body, PoolName) of - ok -> - ParentPid ! {ack, Id}; - {error, Reason} -> - lager:debug("[http_postman] post url: ~p, body: ~p, get error: ~p", [Url, Body, Reason]) - end - end), - {noreply, State}. +handle_info({post, ReceiverPid, #north_data{id = Id, body = Body}}, State = #state{url = Url}) -> + Headers = [ + {<<"content-type">>, <<"application/json">>} + ], + case hackney:request(post, Url, Headers, Body) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[http_postman] url: ~p, response is: ~p", [Url, RespBody]), + ReceiverPid ! {ack, Id}, + {noreply, State}; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[http_postman] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), + {noreply, State}; + {error, Reason} -> + lager:warning("[http_postman] url: ~p, get error: ~p", [Url, Reason]), + {noreply, State} + end. %% @private %% @doc This function is called by a gen_server when it is about to @@ -105,10 +99,8 @@ handle_info({post, #north_data{body = Body, id = Id}}, State = #state{parent_pid %% with Reason. The return value is ignored. -spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), State :: #state{}) -> term()). -terminate(Reason, _State = #state{pool_name = PoolName, worker_pool_pid = WorkerPoolPid}) -> - lager:debug("[http_postman] terminate with reasson: ~p", [Reason]), - catch hackney_pool:stop_pool(PoolName), - catch poolboy:stop(WorkerPoolPid), +terminate(Reason, #state{url = Url}) -> + lager:debug("[http_postman] url: ~p, terminate with reason: ~p", [Url, Reason]), ok. %% @private diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 0d2c8f1..4826341 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -100,12 +100,12 @@ handle_info({puback, Packet = #{packet_id := PacketId}}, State = #state{parent_p end; %% 转发信息 -handle_info({post, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> +handle_info({post, ReceiverPid, #north_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{parent_pid = ParentPid, conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), lager:debug("[mqtt_postman] will publish topic: ~p, message: ~p, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> - ParentPid ! {ack, Id}, + ReceiverPid ! {ack, Id}, {noreply, State}; {ok, PacketId} -> lager:debug("[mqtt_postman] send success, packet_id: ~p", [PacketId]), diff --git a/apps/iot/src/postman/http_postman_worker.erl b/apps/iot/src/postman/mysql_postman.erl similarity index 69% rename from apps/iot/src/postman/http_postman_worker.erl rename to apps/iot/src/postman/mysql_postman.erl index 3fd0471..031a564 100644 --- a/apps/iot/src/postman/http_postman_worker.erl +++ b/apps/iot/src/postman/mysql_postman.erl @@ -4,16 +4,16 @@ %%% @doc %%% %%% @end -%%% Created : 06. 7月 2023 16:23 +%%% Created : 07. 8月 2023 10:15 %%%------------------------------------------------------------------- --module(http_postman_worker). +-module(mysql_postman). -author("aresei"). +-include("iot.hrl"). -behaviour(gen_server). %% API -export([start_link/1]). --export([post/4]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -21,19 +21,16 @@ -define(SERVER, ?MODULE). -record(state, { - + mysql_pid :: pid(), + table :: binary() }). %%%=================================================================== %%% API %%%=================================================================== --spec post(Pid :: pid(), Url :: binary(), Body :: binary(), PoolName :: atom()) -> ok | {error, Reason :: any()}. -post(Pid, Url, Body, PoolName) when is_pid(Pid), is_binary(Url), is_binary(Body), is_atom(PoolName) -> - gen_server:call(Pid, {post, Url, Body, PoolName}). - %% @doc Spawns the server and registers the local name (unique) --spec(start_link(Args :: proplists:proplist()) -> +-spec(start_link(Args :: list()) -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). start_link(Args) when is_list(Args) -> gen_server:start_link(?MODULE, [Args], []). @@ -47,8 +44,13 @@ start_link(Args) when is_list(Args) -> -spec(init(Args :: term()) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). -init([_]) -> - {ok, #state{}}. +init([Args]) -> + MysqlOpts = proplists:get_value(mysql_opts, Args), + Table = proplists:get_value(table, Args), + + {ok, ConnPid} = mysql:start_link(MysqlOpts), + + {ok, #state{mysql_pid = ConnPid, table = Table}}. %% @private %% @doc Handling call messages @@ -60,23 +62,8 @@ init([_]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call({post, Url, Body, PoolName}, _From, State = #state{}) -> - Headers = [ - {<<"content-type">>, <<"application/json">>} - ], - case hackney:request(post, Url, Headers, Body, [{pool, PoolName}]) of - {ok, 200, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - lager:debug("[iot_http_client] url: ~p, response is: ~p", [Url, RespBody]), - {reply, ok, State}; - {ok, HttpCode, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - lager:debug("[iot_http_client] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), - {reply, {error, {http_code, HttpCode}}, State}; - {error, Reason} -> - lager:warning("[iot_http_client] url: ~p, get error: ~p", [Url, Reason]), - {reply, {error, Reason}} - end. +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. %% @private %% @doc Handling cast messages @@ -93,8 +80,18 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info(_Info, State = #state{}) -> - {noreply, State}. +handle_info({post, ReceiverPid, #north_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) -> + case catch mysql_provider:insert(ConnPid, Table, Fields, false) of + ok -> + ReceiverPid ! {ack, Id}; + Error -> + lager:debug("[mysql_postman] insert table: ~p, res is: ~p", [Table, Error]) + end, + {noreply, State}; + +handle_info(stop, State = #state{mysql_pid = ConnPid}) -> + mysql:stop(ConnPid), + {stop, normal, State}. %% @private %% @doc This function is called by a gen_server when it is about to diff --git a/config/sys-dev.config b/config/sys-dev.config index dcf9faf..1d26a19 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -32,7 +32,7 @@ {pools, [ %% mysql连接池配置 - {mysql_pool, + {mysql_iot, [{size, 10}, {max_overflow, 20}, {worker_module, mysql}], [ {host, {39, 98, 184, 67}}, diff --git a/config/sys-prod.config b/config/sys-prod.config index 9533af7..67cdc7d 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -32,7 +32,7 @@ {pools, [ %% mysql连接池配置 - {mysql_pool, + {mysql_iot, [{size, 10}, {max_overflow, 20}, {worker_module, mysql}], [ {host, {172, 19, 0, 2}},