remove databse
This commit is contained in:
parent
854b657ec5
commit
cdcfbdd014
@ -1,30 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(ai_event_logs_bo).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-export([insert/6]).
|
||||
|
||||
%% API
|
||||
|
||||
-spec insert(HostUUID :: binary(), DeviceUUID :: binary(), SceneId :: integer(), MicroId :: integer(), EventType :: integer(), Content :: binary()) ->
|
||||
ok | {ok, InsertId :: integer()} | {error, Reason :: any()}.
|
||||
insert(HostUUID, DeviceUUID, SceneId, MicroId, EventType, Content)
|
||||
when is_integer(EventType), is_binary(HostUUID), is_binary(DeviceUUID), is_integer(SceneId), is_integer(MicroId), is_binary(Content) ->
|
||||
|
||||
mysql_pool:insert(mysql_iot, <<"ai_event_logs">>, #{
|
||||
<<"event_type">> => EventType,
|
||||
<<"host_uuid">> => HostUUID,
|
||||
<<"device_uuid">> => DeviceUUID,
|
||||
<<"scene_id">> => SceneId,
|
||||
<<"micro_id">> => MicroId,
|
||||
<<"content">> => Content,
|
||||
<<"created_at">> => calendar:local_time()
|
||||
}, true).
|
||||
@ -1,43 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(device_bo).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_all_devices/0, get_host_devices/1, get_device_by_uuid/1, change_status/2]).
|
||||
|
||||
-spec get_all_devices() -> {ok, Devices :: [map()]} | {error, Reason :: any()}.
|
||||
get_all_devices() ->
|
||||
mysql_pool:get_all(mysql_iot, <<"SELECT * FROM device WHERE device_uuid != ''">>).
|
||||
|
||||
-spec get_host_devices(HostId :: integer()) -> {ok, Devices :: [map()]} | {error, Reason::any()}.
|
||||
get_host_devices(HostId) when is_integer(HostId) ->
|
||||
mysql_pool:get_all(mysql_iot, <<"SELECT device_uuid FROM device WHERE host_id = ? AND device_uuid != ''">>, [HostId]).
|
||||
|
||||
-spec get_device_by_uuid(DeviceUUID :: binary()) -> {ok, DeviceInfo :: map()} | undefined.
|
||||
get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_status(DeviceUUID, NStatus) when is_binary(DeviceUUID), is_integer(NStatus) ->
|
||||
change_status0(DeviceUUID, NStatus).
|
||||
change_status0(DeviceUUID, ?DEVICE_ONLINE) when is_binary(DeviceUUID) ->
|
||||
Timestamp = calendar:local_time(),
|
||||
case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM device WHERE device_uuid = ? LIMIT 1">>, [DeviceUUID]) of
|
||||
{ok, #{<<"status">> := -1}} ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, access_at = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, Timestamp, DeviceUUID]);
|
||||
{ok, _} ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ?, updated_at = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_ONLINE, Timestamp, DeviceUUID]);
|
||||
undefined ->
|
||||
{error, <<"device not found">>}
|
||||
end;
|
||||
change_status0(DeviceUUID, ?DEVICE_OFFLINE) when is_binary(DeviceUUID) ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE device SET status = ? WHERE device_uuid = ? LIMIT 1">>, [?DEVICE_OFFLINE, DeviceUUID]).
|
||||
@ -1,97 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(endpoint_bo).
|
||||
-author("aresei").
|
||||
-include("endpoint.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_all_endpoints/0, get_endpoint/1]).
|
||||
-export([endpoint_record/1]).
|
||||
|
||||
-spec get_all_endpoints() -> [Endpoint :: map()].
|
||||
get_all_endpoints() ->
|
||||
case mysql_pool:get_all(mysql_iot, <<"SELECT * FROM endpoint where status = 1">>) of
|
||||
{ok, Endpoints} ->
|
||||
Endpoints;
|
||||
{error, _} ->
|
||||
[]
|
||||
end.
|
||||
|
||||
-spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}.
|
||||
get_endpoint(Id) when is_integer(Id) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM endpoint WHERE id = ? and status = 1 LIMIT 1">>, [Id]).
|
||||
|
||||
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
|
||||
endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson,
|
||||
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
|
||||
try
|
||||
Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])),
|
||||
{ok, #endpoint {
|
||||
id = Id,
|
||||
name = Name,
|
||||
title = Title,
|
||||
config = Config,
|
||||
status = Status,
|
||||
updated_at = UpdatedAt,
|
||||
created_at = CreatedAt
|
||||
}}
|
||||
catch throw:_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
|
||||
#mqtt_endpoint{
|
||||
host = Host,
|
||||
port = Port,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
password = Password,
|
||||
topic = Topic,
|
||||
qos = Qos
|
||||
};
|
||||
parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
|
||||
#http_endpoint{
|
||||
url = Url,
|
||||
pool_size = PoolSize
|
||||
};
|
||||
parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
|
||||
Mechanism = case Mechanism0 of
|
||||
<<"sha_256">> ->
|
||||
scram_sha_256;
|
||||
<<"sha_512">> ->
|
||||
scram_sha_512;
|
||||
<<"plain">> ->
|
||||
plain;
|
||||
_ ->
|
||||
plain
|
||||
end,
|
||||
|
||||
#kafka_endpoint{
|
||||
sasl_config = {Mechanism, Username, Password},
|
||||
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
|
||||
topic = Topic
|
||||
};
|
||||
parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
|
||||
#kafka_endpoint{
|
||||
sasl_config = undefined,
|
||||
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
|
||||
topic = Topic
|
||||
};
|
||||
parse_config(_, _) ->
|
||||
throw(invalid_config).
|
||||
|
||||
parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) ->
|
||||
lists:filtermap(fun(S) ->
|
||||
case binary:split(S, <<":">>) of
|
||||
[Host0, Port0] ->
|
||||
{true, {binary_to_list(Host0), binary_to_integer(Port0)}};
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end, BootstrapServers).
|
||||
@ -1,25 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(event_logs_bo).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
-export([insert/3]).
|
||||
|
||||
%% API
|
||||
|
||||
-spec insert(EventType :: integer(), AssocUUID :: binary(), Status :: integer()) ->
|
||||
{ok, InsertId :: integer()} | {error, Reason :: any()}.
|
||||
insert(EventType, AssocUUID, Status) when is_integer(EventType), is_binary(AssocUUID), is_integer(Status) ->
|
||||
mysql_pool:insert(mysql_iot, <<"event_logs">>, #{
|
||||
<<"event_type">> => EventType,
|
||||
<<"assoc_uuid">> => AssocUUID,
|
||||
<<"status">> => Status,
|
||||
<<"created_at">> => calendar:local_time()
|
||||
}, true).
|
||||
@ -1,49 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(host_bo).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([get_all_hosts/0, change_status/2, get_host_by_uuid/1, get_host_by_id/1]).
|
||||
|
||||
-spec get_all_hosts() -> UUIDList :: [binary()].
|
||||
get_all_hosts() ->
|
||||
case mysql_pool:get_all(mysql_iot, <<"SELECT uuid FROM host where uuid != '' limit 10">>) of
|
||||
{ok, Hosts} ->
|
||||
lists:map(fun(#{<<"uuid">> := UUID}) -> UUID end, Hosts);
|
||||
{error, _} ->
|
||||
[]
|
||||
end.
|
||||
|
||||
-spec get_host_by_uuid(UUID :: binary()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_uuid(UUID) when is_binary(UUID) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE uuid = ? LIMIT 1">>, [UUID]).
|
||||
|
||||
-spec get_host_by_id(HostId :: integer()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_id(HostId) when is_integer(HostId) ->
|
||||
mysql_pool:get_row(mysql_iot, <<"SELECT * FROM host WHERE id = ? LIMIT 1">>, [HostId]).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) ->
|
||||
change_status0(UUID, NStatus).
|
||||
change_status0(UUID, ?HOST_ONLINE) when is_binary(UUID) ->
|
||||
Timestamp = calendar:local_time(),
|
||||
case mysql_pool:get_row(mysql_iot, <<"SELECT status FROM host WHERE uuid = ? LIMIT 1">>, [UUID]) of
|
||||
%% 第一次更新激活
|
||||
{ok, #{<<"status">> := -1}} ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, access_at = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, Timestamp, UUID]);
|
||||
{ok, _} ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ?, updated_at = ? WHERE uuid = ? LIMIT 1">>, [?HOST_ONLINE, Timestamp, UUID]);
|
||||
undefined ->
|
||||
{error, <<"host not found">>}
|
||||
end;
|
||||
change_status0(UUID, ?HOST_OFFLINE) when is_binary(UUID) ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE host SET status = ? WHERE uuid = ? LIMIT 1">>, [?HOST_OFFLINE, UUID]).
|
||||
@ -1,17 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(micro_inform_log).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([insert/1]).
|
||||
|
||||
insert(Fields) when is_map(Fields) ->
|
||||
mysql_pool:insert(mysql_iot, <<"micro_inform_log">>, Fields, true).
|
||||
@ -1,23 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(micro_service_bo).
|
||||
-author("aresei").
|
||||
-export([get_service_config/1]).
|
||||
|
||||
%% API
|
||||
|
||||
%% TODO
|
||||
-spec get_service_config(ServiceId :: binary()) -> {ok, ConfigJson :: binary()} | error.
|
||||
get_service_config(ServiceId) when is_binary(ServiceId) ->
|
||||
case mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]) of
|
||||
undefined ->
|
||||
error;
|
||||
{ok, #{<<"config">> := Config}} ->
|
||||
{ok, Config}
|
||||
end.
|
||||
@ -1,19 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(micro_set_bo).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([change_status/4]).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(HostId :: integer(), SceneId :: integer(), MircoId :: integer(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_status(HostId, SceneId, MircoId, Status) when is_integer(HostId), is_integer(SceneId), is_integer(MircoId), is_integer(Status) ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE micro_set SET status = ? WHERE host_id = ? AND scene_id = ? AND micro_id = ? LIMIT 1">>, [Status, HostId, SceneId, MircoId]).
|
||||
@ -1,19 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author aresei
|
||||
%%% @copyright (C) 2023, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 16. 5月 2023 12:48
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(task_logs_bo).
|
||||
-author("aresei").
|
||||
-include("iot.hrl").
|
||||
|
||||
%% API
|
||||
-export([change_status/2]).
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_status(TaskId :: integer(), Status :: integer()) -> {ok, AffectedRow :: integer()} | {error, Reason :: any()}.
|
||||
change_status(TaskId, Status) when is_integer(TaskId), is_integer(Status) ->
|
||||
mysql_pool:update_by(mysql_iot, <<"UPDATE task_logs SET status = ? WHERE id = ? LIMIT 1">>, [Status, TaskId]).
|
||||
@ -8,69 +8,164 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_api).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
-include("endpoint.hrl").
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([ai_event/1]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(API_TOKEN, <<"wv6fGyBhl*7@AsD9">>).
|
||||
|
||||
-record(state, {
|
||||
|
||||
}).
|
||||
-export([get_all_hosts/0, get_host_by_id/1, get_host_by_uuid/1, change_host_status/2]).
|
||||
-export([get_host_devices/1, get_device_by_uuid/1, change_device_status/2]).
|
||||
-export([get_all_endpoints/0, get_endpoint/1, endpoint_record/1]).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec get_all_hosts() -> [HostUUID :: binary()].
|
||||
get_all_hosts() ->
|
||||
case do_get("/get_all_hosts", []) of
|
||||
{ok, Ids} ->
|
||||
Ids;
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
-spec get_host_by_uuid(UUID :: binary()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_uuid(UUID) when is_binary(UUID) ->
|
||||
case do_get("/get_host_by_uuid", [{<<"uuid">>, UUID}]) of
|
||||
{ok, HostInfo} ->
|
||||
{ok, HostInfo};
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
-spec get_host_by_id(HostId :: integer()) -> undefined | {ok, HostInfo :: map()}.
|
||||
get_host_by_id(HostId) when is_integer(HostId) ->
|
||||
case do_get("/get_host_by_id", [{<<"host_id">>, integer_to_binary(HostId)}]) of
|
||||
{ok, HostInfo} ->
|
||||
{ok, HostInfo};
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_host_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_host_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) ->
|
||||
do_post("/change_host_status", [{<<"uuid">>, UUID}, {<<"new_status">>, integer_to_binary(NStatus)}]).
|
||||
|
||||
-spec get_host_devices(HostId :: integer()) -> {ok, Devices :: [map()]} | {error, Reason::any()}.
|
||||
get_host_devices(HostId) when is_integer(HostId) ->
|
||||
do_get("/get_host_devices", [{<<"host_id">>, integer_to_binary(HostId)}]).
|
||||
|
||||
-spec get_device_by_uuid(DeviceUUID :: binary()) -> {ok, DeviceInfo :: map()} | undefined.
|
||||
get_device_by_uuid(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
case do_get("/get_device_by_uuid", [{<<"device_uuid">>, DeviceUUID}]) of
|
||||
{ok, DeviceInfo} ->
|
||||
{ok, DeviceInfo};
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
%% 修改主机的状态
|
||||
-spec change_device_status(DeviceUUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}.
|
||||
change_device_status(DeviceUUID, NStatus) when is_binary(DeviceUUID), is_integer(NStatus) ->
|
||||
do_post("/change_device_status", [{<<"device_uuid">>, DeviceUUID}, {<<"new_status">>, integer_to_binary(NStatus)}]).
|
||||
|
||||
%%%-------------------------------------------------------------------
|
||||
%% endpoint相关的api
|
||||
%%%-------------------------------------------------------------------
|
||||
%% API
|
||||
|
||||
-spec get_all_endpoints() -> [Endpoint :: map()].
|
||||
get_all_endpoints() ->
|
||||
case do_get("/get_all_endpoints", []) of
|
||||
{ok, Endpoints} ->
|
||||
Endpoints;
|
||||
_ ->
|
||||
[]
|
||||
end.
|
||||
|
||||
-spec get_endpoint(Id :: integer()) -> undefined | {ok, EndpointInfo :: map()}.
|
||||
get_endpoint(Id) when is_integer(Id) ->
|
||||
case do_get(<<"/get_endpoint">>, [{<<"id">>, integer_to_binary(Id)}]) of
|
||||
{ok, EndpointInfo} ->
|
||||
{ok, EndpointInfo};
|
||||
_ ->
|
||||
undefined
|
||||
end.
|
||||
|
||||
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
|
||||
endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson,
|
||||
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
|
||||
try
|
||||
Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])),
|
||||
{ok, #endpoint {
|
||||
id = Id,
|
||||
name = Name,
|
||||
title = Title,
|
||||
config = Config,
|
||||
status = Status,
|
||||
updated_at = UpdatedAt,
|
||||
created_at = CreatedAt
|
||||
}}
|
||||
catch throw:_ ->
|
||||
error
|
||||
end.
|
||||
|
||||
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
|
||||
#mqtt_endpoint{
|
||||
host = Host,
|
||||
port = Port,
|
||||
client_id = ClientId,
|
||||
username = Username,
|
||||
password = Password,
|
||||
topic = Topic,
|
||||
qos = Qos
|
||||
};
|
||||
parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
|
||||
#http_endpoint{
|
||||
url = Url,
|
||||
pool_size = PoolSize
|
||||
};
|
||||
parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
|
||||
Mechanism = case Mechanism0 of
|
||||
<<"sha_256">> ->
|
||||
scram_sha_256;
|
||||
<<"sha_512">> ->
|
||||
scram_sha_512;
|
||||
<<"plain">> ->
|
||||
plain;
|
||||
_ ->
|
||||
plain
|
||||
end,
|
||||
|
||||
#kafka_endpoint{
|
||||
sasl_config = {Mechanism, Username, Password},
|
||||
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
|
||||
topic = Topic
|
||||
};
|
||||
parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
|
||||
#kafka_endpoint{
|
||||
sasl_config = undefined,
|
||||
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
|
||||
topic = Topic
|
||||
};
|
||||
parse_config(_, _) ->
|
||||
throw(invalid_config).
|
||||
|
||||
parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) ->
|
||||
lists:filtermap(fun(S) ->
|
||||
case binary:split(S, <<":">>) of
|
||||
[Host0, Port0] ->
|
||||
{true, {binary_to_list(Host0), binary_to_integer(Port0)}};
|
||||
_ ->
|
||||
false
|
||||
end
|
||||
end, BootstrapServers).
|
||||
|
||||
ai_event(Id) when is_integer(Id) ->
|
||||
gen_server:cast(?MODULE, {ai_event, Id}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_link() ->
|
||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||
start_link() ->
|
||||
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({ai_event, Id}, State = #state{}) ->
|
||||
spawn_monitor(fun() ->
|
||||
Token = iot_util:md5(<<?API_TOKEN/binary, (integer_to_binary(Id))/binary, ?API_TOKEN/binary>>),
|
||||
{ok, Url} = application:get_env(iot, api_url),
|
||||
|
||||
@ -93,46 +188,75 @@ handle_cast({ai_event, Id}, State = #state{}) ->
|
||||
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, {HttpCode, RespBody}]);
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_api] send body: ~p, get error is: ~p", [Body, Reason])
|
||||
end
|
||||
end),
|
||||
end.
|
||||
|
||||
{noreply, State}.
|
||||
%%%-------------------------------------------------------------------
|
||||
%% helper methods
|
||||
%%%-------------------------------------------------------------------
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
%% Task进程挂掉
|
||||
handle_info({'DOWN', _MRef, process, _Pid, normal}, State) ->
|
||||
{noreply, State};
|
||||
-spec do_post(Path :: string(), Params :: map()) -> {ok, Resp :: binary()} | {error, Reason :: any()}.
|
||||
do_post(Path, Params) when is_list(Path), is_map(Params) ->
|
||||
{ok, BaseUrl} = application:get_env(iot, api_url),
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>},
|
||||
{<<"Accept">>, <<"application/json">>}
|
||||
],
|
||||
Url = BaseUrl ++ Path,
|
||||
Body = iolist_to_binary(jiffy:encode(Params, [force_utf8])),
|
||||
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
lager:debug("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, RespBody]),
|
||||
hackney:close(ClientRef),
|
||||
case catch jiffy:decode(RespBody) of
|
||||
#{<<"result">> := Result} ->
|
||||
{ok, Result};
|
||||
#{<<"error">> := #{<<"code">> := Code, <<"message">> := Message}} ->
|
||||
{error, {Code, Message}};
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
Other ->
|
||||
{error, Other}
|
||||
end;
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:warning("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, {HttpCode, RespBody}]),
|
||||
{error, {HttpCode, RespBody}};
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
|
||||
handle_info({'DOWN', _MRef, process, _Pid, Reason}, State) ->
|
||||
lager:notice("[iot_api] task process down with reason: ~p", [Reason]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
-spec do_get(Path :: string(), Params :: [{Key :: binary(), Val :: binary()}]) -> {ok, Resp :: binary()} | {error, Reason :: any()}.
|
||||
do_get(Path, Params) when is_list(Path), is_list(Params) ->
|
||||
{ok, BaseUrl} = application:get_env(iot, api_url),
|
||||
Headers = [
|
||||
{<<"Accept">>, <<"application/json">>}
|
||||
],
|
||||
QS = binary_to_list(uri_string:compose_query(Params)),
|
||||
Url = BaseUrl ++ Path ++ "?" ++ QS,
|
||||
case hackney:request(get, Url, Headers, <<>>, [{pool, false}]) of
|
||||
{ok, 200, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
lager:debug("[iot_api] url: ~p, get error is: ~p", [Url, RespBody]),
|
||||
hackney:close(ClientRef),
|
||||
case catch jiffy:decode(RespBody) of
|
||||
#{<<"result">> := Result} ->
|
||||
{ok, Result};
|
||||
#{<<"error">> := #{<<"code">> := Code, <<"message">> := Message}} ->
|
||||
{error, {Code, Message}};
|
||||
{error, Reason} ->
|
||||
{error, Reason};
|
||||
Other ->
|
||||
{error, Other}
|
||||
end;
|
||||
{ok, HttpCode, _, ClientRef} ->
|
||||
{ok, RespBody} = hackney:body(ClientRef),
|
||||
hackney:close(ClientRef),
|
||||
lager:warning("[iot_api] request url: ~p, get error is: ~p", [Url, {HttpCode, RespBody}]),
|
||||
{error, {HttpCode, RespBody}};
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_api] request url: ~p, get error is: ~p", [Url, Reason]),
|
||||
{error, Reason}
|
||||
end.
|
||||
Loading…
x
Reference in New Issue
Block a user