diff --git a/apps/iot/src/database/ai_event_logs_bo.erl b/apps/iot/src/database/ai_event_logs_bo.erl deleted file mode 100644 index b9a4349..0000000 --- a/apps/iot/src/database/ai_event_logs_bo.erl +++ /dev/null @@ -1,30 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(ai_event_logs_bo). --author("aresei"). --include("iot.hrl"). - --export([insert/6]). - -%% API - --spec insert(HostUUID :: binary(), DeviceUUID :: binary(), SceneId :: integer(), MicroId :: integer(), EventType :: integer(), Content :: binary()) -> - ok | {ok, InsertId :: integer()} | {error, Reason :: any()}. -insert(HostUUID, DeviceUUID, SceneId, MicroId, EventType, Content) - when is_integer(EventType), is_binary(HostUUID), is_binary(DeviceUUID), is_integer(SceneId), is_integer(MicroId), is_binary(Content) -> - - mysql_pool:insert(mysql_iot, <<"ai_event_logs">>, #{ - <<"event_type">> => EventType, - <<"host_uuid">> => HostUUID, - <<"device_uuid">> => DeviceUUID, - <<"scene_id">> => SceneId, - <<"micro_id">> => MicroId, - <<"content">> => Content, - <<"created_at">> => calendar:local_time() - }, true). \ No newline at end of file diff --git a/apps/iot/src/database/device_bo.erl b/apps/iot/src/database/device_bo.erl deleted file mode 100644 index b877a6c..0000000 --- a/apps/iot/src/database/device_bo.erl +++ /dev/null @@ -1,43 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(device_bo). --author("aresei"). --include("iot.hrl"). - -%% API --export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2]). - --spec get_all_devices() -> {ok, Devices :: [map()]} | {error, Reason :: any()}. -get_all_devices() -> - mysql_pool:get_all(mysql_iot, <<"SELECT * FROM device WHERE device_uuid != ''">>). - --spec get_host_devices(HostId :: integer()) -> {ok, Devices :: [map()]} | {error, Reason::any()}. -get_host_devices(HostId) when is_integer(HostId) -> - mysql_pool:get_all(mysql_iot, <<"SELECT device_uuid FROM device WHERE host_id = ? AND device_uuid != ''">>, [HostId]). - --spec get_device_by_uuid(DeviceUUID :: binary()) -> {ok, DeviceInfo :: map()} | undefined. -get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]). - -%% 修改主机的状态 --spec change_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. -change_status(DeviceUUID, NStatus) when is_binary(DeviceUUID), is_integer(NStatus) -> - change_status0(DeviceUUID, NStatus). -change_status0(DeviceUUID, ?DEVICE_ONLINE) when is_binary(DeviceUUID) -> - Timestamp = calendar:local_time(), - case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]) of - {ok, #{<<"status">> := -1}} -> - mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, access_at = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, Timestamp, DeviceUUID]); - {ok, _} -> - mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, DeviceUUID]); - undefined -> - {error, <<"device not found">>} - end; -change_status0(DeviceUUID, ?DEVICE_OFFLINE) when is_binary(DeviceUUID) -> - mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]). \ No newline at end of file diff --git a/apps/iot/src/database/endpoint_bo.erl b/apps/iot/src/database/endpoint_bo.erl deleted file mode 100644 index 2688916..0000000 --- a/apps/iot/src/database/endpoint_bo.erl +++ /dev/null @@ -1,97 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(endpoint_bo). --author("aresei"). --include("endpoint.hrl"). - -%% API --export([get_all_endpoints/0, get_endpoint/1]). --export([endpoint_record/1]). - --spec get_all_endpoints() -> [Endpoint :: map()]. -get_all_endpoints() -> - case mysql_pool:get_all(mysql_iot, <<"SELECT * FROM endpoint where status = 1">>) of - {ok, Endpoints} -> - Endpoints; - {error, _} -> - [] - end. - --spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}. -get_endpoint(Id) when is_integer(Id) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]). - --spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. -endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, - <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> - try - Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])), - {ok, #endpoint { - id = Id, - name = Name, - title = Title, - config = Config, - status = Status, - updated_at = UpdatedAt, - created_at = CreatedAt - }} - catch throw:_ -> - error - end. - -parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> - #mqtt_endpoint{ - host = Host, - port = Port, - client_id = ClientId, - username = Username, - password = Password, - topic = Topic, - qos = Qos - }; -parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> - #http_endpoint{ - url = Url, - pool_size = PoolSize - }; -parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - Mechanism = case Mechanism0 of - <<"sha_256">> -> - scram_sha_256; - <<"sha_512">> -> - scram_sha_512; - <<"plain">> -> - plain; - _ -> - plain - end, - - #kafka_endpoint{ - sasl_config = {Mechanism, Username, Password}, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - #kafka_endpoint{ - sasl_config = undefined, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(_, _) -> - throw(invalid_config). - -parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> - lists:filtermap(fun(S) -> - case binary:split(S, <<":">>) of - [Host0, Port0] -> - {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; - _ -> - false - end - end, BootstrapServers). \ No newline at end of file diff --git a/apps/iot/src/database/event_logs_bo.erl b/apps/iot/src/database/event_logs_bo.erl deleted file mode 100644 index 87f98c8..0000000 --- a/apps/iot/src/database/event_logs_bo.erl +++ /dev/null @@ -1,25 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(event_logs_bo). --author("aresei"). --include("iot.hrl"). - --export([insert/3]). - -%% API - --spec insert(EventType :: integer(), AssocUUID :: binary(), Status :: integer()) -> - {ok, InsertId :: integer()} | {error, Reason :: any()}. -insert(EventType, AssocUUID, Status) when is_integer(EventType), is_binary(AssocUUID), is_integer(Status) -> - mysql_pool:insert(mysql_iot, <<"event_logs">>, #{ - <<"event_type">> => EventType, - <<"assoc_uuid">> => AssocUUID, - <<"status">> => Status, - <<"created_at">> => calendar:local_time() - }, true). \ No newline at end of file diff --git a/apps/iot/src/database/host_bo.erl b/apps/iot/src/database/host_bo.erl deleted file mode 100644 index f2e5087..0000000 --- a/apps/iot/src/database/host_bo.erl +++ /dev/null @@ -1,49 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(host_bo). --author("aresei"). --include("iot.hrl"). - -%% API --export([get_all_hosts/0, change_status/2, get_host_by_uuid/1, get_host_by_id/1]). - --spec get_all_hosts() -> UUIDList :: [binary()]. -get_all_hosts() -> - case mysql_pool:get_all(mysql_iot, <<"SELECT uuid FROM host where uuid != '' limit 10">>) of - {ok, Hosts} -> - lists:map(fun(#{<<"uuid">> := UUID}) -> UUID end, Hosts); - {error, _} -> - [] - end. - --spec get_host_by_uuid(UUID :: binary()) -> undefined | {ok, HostInfo :: map()}. -get_host_by_uuid(UUID) when is_binary(UUID) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]). - --spec get_host_by_id(HostId :: integer()) -> undefined | {ok, HostInfo :: map()}. -get_host_by_id(HostId) when is_integer(HostId) -> - mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]). - -%% 修改主机的状态 --spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. -change_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) -> - change_status0(UUID, NStatus). -change_status0(UUID, ?HOST_ONLINE) when is_binary(UUID) -> - Timestamp = calendar:local_time(), - case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM host WHERE uuid = ? LIMIT 1">>, [UUID]) of - %% 第一次更新激活 - {ok, #{<<"status">> := -1}} -> - mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, access_at = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, Timestamp, UUID]); - {ok, _} -> - mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, UUID]); - undefined -> - {error, <<"host not found">>} - end; -change_status0(UUID, ?HOST_OFFLINE) when is_binary(UUID) -> - mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [?HOST_OFFLINE, UUID]). \ No newline at end of file diff --git a/apps/iot/src/database/micro_inform_log.erl b/apps/iot/src/database/micro_inform_log.erl deleted file mode 100644 index c5128fd..0000000 --- a/apps/iot/src/database/micro_inform_log.erl +++ /dev/null @@ -1,17 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(micro_inform_log). --author("aresei"). --include("iot.hrl"). - -%% API --export([insert/1]). - -insert(Fields) when is_map(Fields) -> - mysql_pool:insert(mysql_iot, <<"micro_inform_log">>, Fields, true). \ No newline at end of file diff --git a/apps/iot/src/database/micro_service_bo.erl b/apps/iot/src/database/micro_service_bo.erl deleted file mode 100644 index 03a2b86..0000000 --- a/apps/iot/src/database/micro_service_bo.erl +++ /dev/null @@ -1,23 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(micro_service_bo). --author("aresei"). --export([get_service_config/1]). - -%% API - -%% TODO --spec get_service_config(ServiceId :: binary()) -> {ok, ConfigJson :: binary()} | error. -get_service_config(ServiceId) when is_binary(ServiceId) -> - case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]) of - undefined -> - error; - {ok, #{<<"config">> := Config}} -> - {ok, Config} - end. \ No newline at end of file diff --git a/apps/iot/src/database/micro_set_bo.erl b/apps/iot/src/database/micro_set_bo.erl deleted file mode 100644 index 80e30ee..0000000 --- a/apps/iot/src/database/micro_set_bo.erl +++ /dev/null @@ -1,19 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(micro_set_bo). --author("aresei"). --include("iot.hrl"). - -%% API --export([change_status/4]). - -%% 修改主机的状态 --spec change_status(HostId :: integer(), SceneId :: integer(), MircoId :: integer(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. -change_status(HostId, SceneId, MircoId, Status) when is_integer(HostId), is_integer(SceneId), is_integer(MircoId), is_integer(Status) -> - mysql_pool:update_by(mysql_iot, <<"UPDATE micro_set SET status = ? WHERE host_id = ? AND scene_id = ? AND micro_id = ? LIMIT 1">>, [Status, HostId, SceneId, MircoId]). \ No newline at end of file diff --git a/apps/iot/src/database/task_logs_bo.erl b/apps/iot/src/database/task_logs_bo.erl deleted file mode 100644 index 4d2b25a..0000000 --- a/apps/iot/src/database/task_logs_bo.erl +++ /dev/null @@ -1,19 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 16. 5月 2023 12:48 -%%%------------------------------------------------------------------- --module(task_logs_bo). --author("aresei"). --include("iot.hrl"). - -%% API --export([change_status/2]). - -%% 修改主机的状态 --spec change_status(TaskId :: integer(), Status :: integer()) -> {ok, AffectedRow :: integer()} | {error, Reason :: any()}. -change_status(TaskId, Status) when is_integer(TaskId), is_integer(Status) -> - mysql_pool:update_by(mysql_iot, <<"UPDATE task_logs SET status = ? WHERE id = ? LIMIT 1">>, [Status, TaskId]). \ No newline at end of file diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index a5fed49..b5a20e3 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -8,131 +8,255 @@ %%%------------------------------------------------------------------- -module(iot_api). -author("anlicheng"). - --behaviour(gen_server). +-include("endpoint.hrl"). %% API --export([start_link/0]). -export([ai_event/1]). -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - --define(SERVER, ?MODULE). -define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>). --record(state, { - -}). +-export([get_all_hosts/0, get_host_by_id/1, get_host_by_uuid/1, change_host_status/2]). +-export([get_host_devices/1, get_device_by_uuid/1, change_device_status/2]). +-export([get_all_endpoints/0, get_endpoint/1, endpoint_record/1]). %%%=================================================================== %%% API %%%=================================================================== -ai_event(Id) when is_integer(Id) -> - gen_server:cast(?MODULE, {ai_event, Id}). +-spec get_all_hosts() -> [HostUUID :: binary()]. +get_all_hosts() -> + case do_get("/get_all_hosts", []) of + {ok, Ids} -> + Ids; + _ -> + [] + end. -%% @doc Spawns the server and registers the local name (unique) --spec(start_link() -> - {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). -start_link() -> - gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). +-spec get_host_by_uuid(UUID :: binary()) -> undefined | {ok, HostInfo :: map()}. +get_host_by_uuid(UUID) when is_binary(UUID) -> + case do_get("/get_host_by_uuid", [{<<"uuid">>, UUID}]) of + {ok, HostInfo} -> + {ok, HostInfo}; + _ -> + undefined + end. -%%%=================================================================== -%%% gen_server callbacks -%%%=================================================================== +-spec get_host_by_id(HostId :: integer()) -> undefined | {ok, HostInfo :: map()}. +get_host_by_id(HostId) when is_integer(HostId) -> + case do_get("/get_host_by_id", [{<<"host_id">>, integer_to_binary(HostId)}]) of + {ok, HostInfo} -> + {ok, HostInfo}; + _ -> + undefined + end. -%% @private -%% @doc Initializes the server --spec(init(Args :: term()) -> - {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | - {stop, Reason :: term()} | ignore). -init([]) -> - {ok, #state{}}. +%% 修改主机的状态 +-spec change_host_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +change_host_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) -> + do_post("/change_host_status", [{<<"uuid">>, UUID}, {<<"new_status">>, integer_to_binary(NStatus)}]). -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Request, _From, State = #state{}) -> - {reply, ok, State}. +-spec get_host_devices(HostId :: integer()) -> {ok, Devices :: [map()]} | {error, Reason::any()}. +get_host_devices(HostId) when is_integer(HostId) -> + do_get("/get_host_devices", [{<<"host_id">>, integer_to_binary(HostId)}]). -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({ai_event, Id}, State = #state{}) -> - spawn_monitor(fun() -> - Token = iot_util:md5(<>), - {ok, Url} = application:get_env(iot, api_url), +-spec get_device_by_uuid(DeviceUUID :: binary()) -> {ok, DeviceInfo :: map()} | undefined. +get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) -> + case do_get("/get_device_by_uuid", [{<<"device_uuid">>, DeviceUUID}]) of + {ok, DeviceInfo} -> + {ok, DeviceInfo}; + _ -> + undefined + end. - Headers = [ - {<<"content-type">>, <<"application/json">>} - ], - ReqData = #{ - <<"token">> => Token, - <<"id">> => Id - }, - Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), - case hackney:request(post, Url, Headers, Body, [{pool, false}]) of - {ok, 200, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - lager:debug("[iot_api] send body: ~p, get error is: ~p", [Body, RespBody]), - hackney:close(ClientRef); - {ok, HttpCode, _, ClientRef} -> - {ok, RespBody} = hackney:body(ClientRef), - hackney:close(ClientRef), - lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]); - {error, Reason} -> - lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]) +%% 修改主机的状态 +-spec change_device_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +change_device_status(DeviceUUID, NStatus) when is_binary(DeviceUUID), is_integer(NStatus) -> + do_post("/change_device_status", [{<<"device_uuid">>, DeviceUUID}, {<<"new_status">>, integer_to_binary(NStatus)}]). + +%%%------------------------------------------------------------------- +%% endpoint相关的api +%%%------------------------------------------------------------------- +%% API + +-spec get_all_endpoints() -> [Endpoint :: map()]. +get_all_endpoints() -> + case do_get("/get_all_endpoints", []) of + {ok, Endpoints} -> + Endpoints; + _ -> + [] + end. + +-spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}. +get_endpoint(Id) when is_integer(Id) -> + case do_get(<<"/get_endpoint">>, [{<<"id">>, integer_to_binary(Id)}]) of + {ok, EndpointInfo} -> + {ok, EndpointInfo}; + _ -> + undefined + end. + +-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. +endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, + <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> + try + Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])), + {ok, #endpoint { + id = Id, + name = Name, + title = Title, + config = Config, + status = Status, + updated_at = UpdatedAt, + created_at = CreatedAt + }} + catch throw:_ -> + error + end. + +parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> + #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }; +parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> + #http_endpoint{ + url = Url, + pool_size = PoolSize + }; +parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + Mechanism = case Mechanism0 of + <<"sha_256">> -> + scram_sha_256; + <<"sha_512">> -> + scram_sha_512; + <<"plain">> -> + plain; + _ -> + plain + end, + + #kafka_endpoint{ + sasl_config = {Mechanism, Username, Password}, + bootstrap_servers = parse_bootstrap_servers(BootstrapServers), + topic = Topic + }; +parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + #kafka_endpoint{ + sasl_config = undefined, + bootstrap_servers = parse_bootstrap_servers(BootstrapServers), + topic = Topic + }; +parse_config(_, _) -> + throw(invalid_config). + +parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> + lists:filtermap(fun(S) -> + case binary:split(S, <<":">>) of + [Host0, Port0] -> + {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; + _ -> + false end - end), + end, BootstrapServers). - {noreply, State}. +ai_event(Id) when is_integer(Id) -> + Token = iot_util:md5(<>), + {ok, Url} = application:get_env(iot, api_url), -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -%% Task进程挂掉 -handle_info({'DOWN', _MRef, process, _Pid, normal}, State) -> - {noreply, State}; + Headers = [ + {<<"content-type">>, <<"application/json">>} + ], + ReqData = #{ + <<"token">> => Token, + <<"id">> => Id + }, + Body = iolist_to_binary(jiffy:encode(ReqData, [force_utf8])), + case hackney:request(post, Url, Headers, Body, [{pool, false}]) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[iot_api] send body: ~p, get error is: ~p", [Body, RespBody]), + hackney:close(ClientRef); + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]); + {error, Reason} -> + lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason]) + end. -handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) -> - lager:notice("[iot_api] task process down with reason: ~p", [Reason]), - {noreply, State}; +%%%------------------------------------------------------------------- +%% helper methods +%%%------------------------------------------------------------------- -handle_info(_Info, State = #state{}) -> - {noreply, State}. +-spec do_post(Path :: string(), Params :: map()) -> {ok, Resp :: binary()} | {error, Reason :: any()}. +do_post(Path, Params) when is_list(Path), is_map(Params) -> + {ok, BaseUrl} = application:get_env(iot, api_url), + Headers = [ + {<<"content-type">>, <<"application/json">>}, + {<<"Accept">>, <<"application/json">>} + ], + Url = BaseUrl ++ Path, + Body = iolist_to_binary(jiffy:encode(Params, [force_utf8])), + case hackney:request(post, Url, Headers, Body, [{pool, false}]) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, RespBody]), + hackney:close(ClientRef), + case catch jiffy:decode(RespBody) of + #{<<"result">> := Result} -> + {ok, Result}; + #{<<"error">> := #{<<"code">> := Code, <<"message">> := Message}} -> + {error, {Code, Message}}; + {error, Reason} -> + {error, Reason}; + Other -> + {error, Other} + end; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:warning("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, {HttpCode, RespBody}]), + {error, {HttpCode, RespBody}}; + {error, Reason} -> + lager:warning("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, Reason]), + {error, Reason} + end. -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(_Reason, _State = #state{}) -> - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== +-spec do_get(Path :: string(), Params :: [{Key :: binary(), Val :: binary()}]) -> {ok, Resp :: binary()} | {error, Reason :: any()}. +do_get(Path, Params) when is_list(Path), is_list(Params) -> + {ok, BaseUrl} = application:get_env(iot, api_url), + Headers = [ + {<<"Accept">>, <<"application/json">>} + ], + QS = binary_to_list(uri_string:compose_query(Params)), + Url = BaseUrl ++ Path ++ "?" ++ QS, + case hackney:request(get, Url, Headers, <<>>, [{pool, false}]) of + {ok, 200, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + lager:debug("[iot_api] url: ~p, get error is: ~p", [Url, RespBody]), + hackney:close(ClientRef), + case catch jiffy:decode(RespBody) of + #{<<"result">> := Result} -> + {ok, Result}; + #{<<"error">> := #{<<"code">> := Code, <<"message">> := Message}} -> + {error, {Code, Message}}; + {error, Reason} -> + {error, Reason}; + Other -> + {error, Other} + end; + {ok, HttpCode, _, ClientRef} -> + {ok, RespBody} = hackney:body(ClientRef), + hackney:close(ClientRef), + lager:warning("[iot_api] request url: ~p, get error is: ~p", [Url, {HttpCode, RespBody}]), + {error, {HttpCode, RespBody}}; + {error, Reason} -> + lager:warning("[iot_api] request url: ~p, get error is: ~p", [Url, Reason]), + {error, Reason} + end. \ No newline at end of file