简化endpoint概念,替换掉redis

This commit is contained in:
anlicheng 2023-09-20 18:47:01 +08:00
parent b9c3b0bdb2
commit 26c55cc21f
23 changed files with 361 additions and 2319 deletions

View File

@ -53,62 +53,6 @@
type :: atom() type :: atom()
}). }).
-record(http_endpoint, {
url = <<>> :: binary(),
pool_size = 10 :: integer()
}).
-record(ws_endpoint, {
url = <<>> :: binary()
}).
-record(mqtt_endpoint, {
host = <<>> :: binary(),
port = 0 :: integer(),
username = <<>> :: binary(),
password = <<>> :: binary(),
topic = <<>> :: binary(),
qos = 0 :: integer()
}).
-record(kafka_endpoint, {
username = <<>> :: binary(),
password = <<>> :: binary(),
bootstrap_servers = [] :: [binary()],
topic = <<>> :: binary()
}).
-record(mysql_endpoint, {
host = <<>> :: binary(),
port = 0 :: integer(),
username = <<>> :: binary(),
password = <<>> :: binary(),
database = <<>> :: binary(),
table_name = <<>> :: binary(),
pool_size = 10 :: integer()
}).
%%
-record(endpoint, {
%%
name = <<>> :: binary(),
%%
title = <<>> :: binary(),
%% ,
matcher = <<>> :: binary(),
mapper = <<>> :: binary(),
%%
%% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}])
%% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer())
mapper_fun = fun(_, Fields) -> Fields end :: fun(),
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{},
%%
updated_at = 0 :: integer(),
%%
created_at = 0 :: integer()
}).
%% id生成器 %% id生成器
-record(id_generator, { -record(id_generator, {
tab :: atom(), tab :: atom(),

View File

@ -904,7 +904,7 @@ handle_event({call, From}, stop, _StateName, _State) ->
{stop_and_reply, normal, [{reply, From, ok}]}; {stop_and_reply, normal, [{reply, From, ok}]};
handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl ->
lager:debug("[emqtt] RECV Data: ~p, client_id: ~p", [Data, State#state.clientid]), %lager:debug("[emqtt] RECV Data: ~p, client_id: ~p", [Data, State#state.clientid]),
process_incoming(Data, [], run_sock(State)); process_incoming(Data, [], run_sock(State));
handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error ->
@ -1214,7 +1214,6 @@ send(Msg, State) when is_record(Msg, mqtt_msg) ->
send(Packet, State = #state{socket = Sock, proto_ver = Ver}) send(Packet, State = #state{socket = Sock, proto_ver = Ver})
when is_record(Packet, mqtt_packet) -> when is_record(Packet, mqtt_packet) ->
Data = emqtt_frame:serialize(Packet, Ver), Data = emqtt_frame:serialize(Packet, Ver),
lager:debug("[emqtt] SEND Data: ~1000p, client_id: ~p", [Packet, State#state.clientid]),
case emqtt_sock:send(Sock, Data) of case emqtt_sock:send(Sock, Data) of
ok -> ok ->
{ok, bump_last_packet_id(State)}; {ok, bump_last_packet_id(State)};

View File

@ -1,184 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2020, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 26. 4 2020 3:36
%%%-------------------------------------------------------------------
-module(endpoint_handler).
-author("licheng5").
-include("iot.hrl").
%% API
-export([handle_request/4]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% name进行过滤
handle_request("GET", "/endpoint/all", GetParams, _) ->
Endpoints0 = mnesia_endpoint:get_all_endpoints(),
Endpoints = case maps:is_key(<<"name">>, GetParams) of
true ->
Name = maps:get(<<"name">>, GetParams),
lists:filter(fun(#endpoint{name = Name0}) -> Name == Name0 end, Endpoints0);
false ->
Endpoints0
end,
EndpointInfos = lists:map(fun mnesia_endpoint:to_map/1, Endpoints),
{ok, 200, iot_util:json_data(EndpointInfos)};
%% name进行过滤
handle_request("GET", "/endpoint/info", #{<<"name">> := Name}, _) ->
case mnesia_endpoint:get_endpoint(Name) of
{ok, Endpoint} ->
EndpointInfo = mnesia_endpoint:to_map(Endpoint),
{ok, 200, iot_util:json_data(EndpointInfo)};
undefined ->
{ok, 200, iot_util:json_error(404, <<"not found">>)}
end;
%%
handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) ->
RequiredFields = [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>],
iot_util:assert(lists:all(fun(Key) -> maps:is_key(Key, Params) end, RequiredFields), <<"missed required param">>),
case mnesia_endpoint:get_endpoint(Name) of
undefined ->
Endpoint0 = make_endpoint(maps:to_list(Params), #endpoint{}),
Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()},
ok = mnesia_endpoint:insert(Endpoint),
%% endpoint
{ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint),
{ok, 200, iot_util:json_data(<<"success">>)};
{ok, _} ->
{ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)}
end;
%%
handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) when is_binary(Name) ->
case mnesia_endpoint:get_endpoint(Name) of
undefined ->
lager:debug("[endpoint_handler] update endpoint, name: ~p not found", [Name]),
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
{ok, Endpoint} ->
Params1 = maps:remove(<<"name">>, Params),
NEndpoint = make_endpoint(maps:to_list(Params1), Endpoint),
NEndpoint1 = NEndpoint#endpoint{updated_at = iot_util:timestamp_of_seconds()},
ok = mnesia_endpoint:insert(NEndpoint1),
case iot_endpoint:get_pid(Name) of
undefined ->
{ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint);
Pid when is_pid(Pid) ->
iot_endpoint:reload(Pid, Endpoint)
end,
{ok, 200, iot_util:json_data(<<"success">>)}
end;
%%
handle_request("POST", "/endpoint/delete", _, #{<<"name">> := Name}) when is_binary(Name) ->
case mnesia_endpoint:delete(Name) of
ok ->
case iot_endpoint:get_pid(Name) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"endpoint not found">>)};
Pid ->
iot_endpoint:clean_up(Pid),
iot_endpoint_sup:delete_endpoint(Name),
{ok, 200, iot_util:json_data(<<"success">>)}
end;
{error, Reason} ->
lager:debug("[endpoint_handler] delete endpoint id: ~p, get error is: ~p", [Name, Reason]),
{ok, 200, iot_util:json_error(404, <<"error">>)}
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec make_endpoint(Params :: list(), #endpoint{}) -> #endpoint{}.
make_endpoint([], Endpoint) ->
Endpoint;
make_endpoint([{<<"name">>, Name} | Params], Endpoint) when is_binary(Name) andalso Name /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{name = Name});
make_endpoint([{<<"name">>, _} | _], _) ->
throw(<<"invalid name">>);
make_endpoint([{<<"title">>, Title} | Params], Endpoint) when is_binary(Title) andalso Title /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{title = Title});
make_endpoint([{<<"title">>, _} | _], _) ->
throw(<<"invalid title">>);
make_endpoint([{<<"matcher">>, Matcher} | Params], Endpoint) when is_binary(Matcher) andalso Matcher /= <<>> ->
%% matcher是否是合法的正则表达式
case re:compile(Matcher) of
{ok, _} ->
make_endpoint(Params, Endpoint#endpoint{matcher = Matcher});
{error, _} ->
throw(<<"invalid regexp">>)
end;
make_endpoint([{<<"matcher">>, _} | _], _) ->
throw(<<"invalid matcher">>);
make_endpoint([{<<"mapper">>, Mapper} | Params], Endpoint) when is_binary(Mapper) andalso Mapper /= <<>> ->
%% mapper是否是合理的erlang表达式
case catch iot_util:parse_mapper(Mapper) of
{ok, MapperFun} ->
make_endpoint(Params, Endpoint#endpoint{mapper = Mapper, mapper_fun = MapperFun});
error ->
throw(<<"invalid mapper">>);
Error ->
lager:debug("[endpoint_handler] parse_mapper get error: ~p", [Error]),
throw(<<"invalid mapper">>)
end;
make_endpoint([{<<"mapper">>, _} | _], _) ->
throw(<<"invalid mapper">>);
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"https">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> ->
make_endpoint(Params, Endpoint#endpoint{config = #ws_endpoint{url = Url}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint) ->
iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>),
iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>),
iot_util:assert(is_list(BootstrapServers) andalso length(BootstrapServers) > 0, <<"bootstrap_servers is invalid">>),
iot_util:assert(is_binary(Topic) andalso Topic /= <<>>, <<"topic is invalid">>),
make_endpoint(Params, Endpoint#endpoint{config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}});
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint) ->
iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>),
iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>),
iot_util:assert(is_binary(Topic) andalso Topic /= <<>>, <<"topic is invalid">>),
iot_util:assert(is_binary(Host) andalso Host /= <<>>, <<"host is invalid">>),
iot_util:assert(is_integer(Port) andalso Port > 0, <<"port is invalid">>),
iot_util:assert(Qos == 0 orelse Qos == 1 orelse Qos == 2, <<"qos is invalid">>),
make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}});
%% mysql的支持
make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint) ->
iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>),
iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>),
iot_util:assert(is_binary(Host) andalso Host /= <<>>, <<"host is invalid">>),
iot_util:assert(is_integer(Port) andalso Port > 0, <<"port is invalid">>),
iot_util:assert(is_binary(Database) andalso Database /= <<>>, <<"database is invalid">>),
iot_util:assert(is_binary(TableName) andalso TableName /= <<>>, <<"table_name is invalid">>),
make_endpoint(Params, Endpoint#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}});
make_endpoint([{<<"config">>, Config} | _], _) ->
lager:warning("[endpoint_handler] unsupport config: ~p", [Config]),
throw(<<"invalid config">>);
make_endpoint([{Key, _} | _], _) ->
throw(<<"unsupport param: ", Key/binary>>).

View File

@ -6,6 +6,7 @@
{applications, {applications,
[ [
sync, sync,
eredis,
ranch, ranch,
cowboy, cowboy,
lager, lager,

View File

@ -23,8 +23,6 @@ start(_StartType, _StartArgs) ->
start_udp_server(), start_udp_server(),
%% http服务 %% http服务
start_http_server(), start_http_server(),
%% redis服务器
start_redis_server(),
%% %%
ok = hackney_pool:start_pool(influx_pool, [{timeout, 150000}, {max_connections, 100}]), ok = hackney_pool:start_pool(influx_pool, [{timeout, 150000}, {max_connections, 100}]),
@ -48,7 +46,6 @@ start_http_server() ->
{'_', [ {'_', [
{"/host/[...]", http_protocol, [host_handler]}, {"/host/[...]", http_protocol, [host_handler]},
{"/device/[...]", http_protocol, [device_handler]}, {"/device/[...]", http_protocol, [device_handler]},
{"/endpoint/[...]", http_protocol, [endpoint_handler]},
{"/test/[...]", http_protocol, [test_handler]}, {"/test/[...]", http_protocol, [test_handler]},
{"/ws", ws_channel, []} {"/ws", ws_channel, []}
]} ]}
@ -65,29 +62,6 @@ start_http_server() ->
lager:debug("[iot_app] the http server start at: ~p, pid is: ~p", [Port, Pid]). lager:debug("[iot_app] the http server start at: ~p, pid is: ~p", [Port, Pid]).
start_redis_server() ->
{ok, Props} = application:get_env(iot, redis_server),
Acceptors = proplists:get_value(acceptors, Props, 50),
MaxConnections = proplists:get_value(max_connections, Props, 10240),
Backlog = proplists:get_value(backlog, Props, 1024),
Port = proplists:get_value(port, Props),
TransOpts = [
{tcp_options, [
binary,
{reuseaddr, true},
{active, false},
{packet, raw},
{nodelay, false},
{backlog, Backlog}
]},
{acceptors, Acceptors},
{max_connections, MaxConnections}
],
{ok, _} = esockd:open(redis_server, Port, TransOpts, {redis_protocol, start_link, []}),
lager:debug("[iot_app] the redis server start at: ~p", [Port]).
start_udp_server() -> start_udp_server() ->
{ok, Props} = application:get_env(iot, udp_server), {ok, Props} = application:get_env(iot, udp_server),
Port = proplists:get_value(port, Props), Port = proplists:get_value(port, Props),
@ -97,10 +71,11 @@ start_udp_server() ->
%% %%
start_mnesia() -> start_mnesia() ->
QueueTab = 'queue_data:zhongdian',
%% %%
ok = mnesia:start(), ok = mnesia:start(),
Tables = mnesia:system_info(tables), Tables = mnesia:system_info(tables),
LoadTables = [id_generator, kv, endpoint], LoadTables = [id_generator, QueueTab],
case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of
true -> true ->
%% %%
@ -117,7 +92,7 @@ start_mnesia() ->
%% %%
%% %% id生成器
mnesia:create_table(id_generator, [ mnesia:create_table(id_generator, [
{attributes, record_info(fields, id_generator)}, {attributes, record_info(fields, id_generator)},
{record_name, id_generator}, {record_name, id_generator},
@ -125,18 +100,10 @@ start_mnesia() ->
{type, ordered_set} {type, ordered_set}
]), ]),
%% %%
mnesia:create_table(kv, [ mnesia:create_table(QueueTab, [
{attributes, record_info(fields, kv)}, {attributes, record_info(fields, north_data)},
{record_name, kv}, {record_name, north_data},
{disc_copies, [node()]},
{type, ordered_set}
]),
%%
mnesia:create_table(endpoint, [
{attributes, record_info(fields, endpoint)},
{record_name, endpoint},
{disc_copies, [node()]}, {disc_copies, [node()]},
{type, ordered_set} {type, ordered_set}
]) ])

View File

@ -67,9 +67,7 @@ auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
%% initialize. To ensure a synchronized start-up procedure, this %% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned. %% function does not return until Module:init/1 has returned.
start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) -> start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) ->
gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []); gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []).
start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) ->
gen_statem:start_link({local, Name}, ?MODULE, [DeviceInfo], []).
%%%=================================================================== %%%===================================================================
%%% gen_statem callbacks %%% gen_statem callbacks
@ -80,26 +78,8 @@ start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) ->
%% gen_statem:start_link/[3,4], this function is called by the new %% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize. %% process to initialize.
init([DeviceUUID]) when is_binary(DeviceUUID) -> init([DeviceUUID]) when is_binary(DeviceUUID) ->
case device_bo:get_device_by_uuid(DeviceUUID) of gen_server:cast(self(), reload),
{ok, DeviceInfo} -> {ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID}}.
init0(DeviceInfo);
undefined ->
ignore
end;
init([DeviceInfo]) when is_map(DeviceInfo) ->
init0(DeviceInfo).
init0(#{<<"device_uuid">> := DeviceUUID, <<"status">> := Status, <<"authorize_status">> := AuthorizeStatus}) ->
case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of
true ->
lager:debug("[iot_device] started device: ~p, state_name: ~p, status: ~p", [DeviceUUID, ?STATE_ACTIVATED, Status]),
{ok, ?STATE_ACTIVATED, #state{device_uuid = DeviceUUID, status = Status}};
false ->
{ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE),
report_event(DeviceUUID, ?DEVICE_OFFLINE),
lager:debug("[iot_device] started device: ~p, state_name: ~p, status: ~p", [DeviceUUID, ?STATE_DENIED, ?DEVICE_OFFLINE]),
{ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID, status = ?DEVICE_OFFLINE}}
end.
%% @private %% @private
%% @doc This function is called by a gen_statem when it needs to find out %% @doc This function is called by a gen_statem when it needs to find out
@ -193,5 +173,5 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt
<<"name">> => <<"设备状态"/utf8>>, <<"name">> => <<"设备状态"/utf8>>,
<<"timestamp">> => Timestamp <<"timestamp">> => Timestamp
}], }],
iot_router:route(DeviceUUID, FieldsList, Timestamp), iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
lager:debug("[iot_host] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). lager:debug("[iot_host] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).

View File

@ -1,358 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(iot_endpoint).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/2]).
-export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%%
-define(RETRY_INTERVAL, 5000).
-record(state, {
endpoint :: #endpoint{},
mp,
postman_pid :: undefined | pid(),
%% ,
tab_name :: atom(),
%% #north_data的id
cursor = 0 :: integer(),
%%
timer_map = #{},
%%
window_size = 10,
%%
flight_num = 0,
%%
acc_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_name(Name :: binary() | #endpoint{}) -> atom().
get_name(#endpoint{name = Name}) when is_binary(Name) ->
get_name(Name);
get_name(EndpointName) when is_binary(EndpointName) ->
binary_to_atom(<<"iot_endpoint:", EndpointName/binary>>).
-spec get_pid(Name :: binary()) -> undefined | pid().
get_pid(Name) when is_binary(Name) ->
whereis(get_name(Name)).
-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(undefined, _, _, _) ->
ok;
forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}).
-spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}.
get_stat(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_stat, 5000).
-spec clean_up(Pid :: pid()) -> ok.
clean_up(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, clean_up, 5000).
-spec get_mapper_fun(Pid :: pid()) -> fun().
get_mapper_fun(Pid) when is_pid(Pid) ->
gen_statem:call(Pid, get_mapper_fun).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Name, Endpoint = #endpoint{}) ->
gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Endpoint = #endpoint{name = Name, matcher = Regexp}]) ->
erlang:process_flag(trap_exit, true),
%%
{ok, MP} = re:compile(Regexp),
%% ,
erlang:start_timer(0, self(), create_postman),
try
%%
TabName = binary_to_atom(<<"queue_data:", Name/binary>>),
mnesia_queue:ensure_queue(TabName),
{ok, disconnected, #state{endpoint = Endpoint, mp = MP, tab_name = TabName, postman_pid = undefined}}
catch _:Error ->
lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p, ignore", [Name, Error]),
ignore
end.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
%%
handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint = Endpoint}) ->
lager:warning("[iot_endpoint] state_name: disconnected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]),
{keep_state, State#state{endpoint = NEndpoint}};
handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, timer_map = TimerMap, postman_pid = PostmanPid}) ->
lager:debug("[iot_endpoint] state_name: connected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]),
case mnesia_endpoint:config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of
true ->
lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]),
{keep_state, State#state{endpoint = NEndpoint}};
false ->
%% postman的link关系
unlink(PostmanPid),
%% postman进程
catch PostmanPid ! stop,
%%
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
%% postman
erlang:start_timer(0, self(), create_postman),
{next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}}
end;
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{mp = MP, tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name}}) ->
case re:run(LocationCode, MP, [{capture, all, list}]) of
nomatch ->
{keep_state, State};
{match, _} ->
lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]),
mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}),
%%
Actions = case StateName =:= connected andalso FlightNum < WindowSize of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State, Actions}
end;
%%
handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) ->
lager:debug("[iot_endpoint] fetch_next endpoint: ~p, postman offline, data in queue", [Name]),
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize ->
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, timer_map = TimerMap, flight_num = FlightNum}) ->
case mnesia_queue:dirty_fetch_next(TabName, Cursor) of
{ok, NCursor, NorthData = #north_data{id = Id}} ->
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
case do_post(NorthData, State) of
error ->
{keep_state, State};
{ok, TimerRef} ->
{keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}}
end;
'$end_of_table' ->
{keep_state, State}
end;
%%
handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name}, timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) ->
ok = mnesia_queue:delete(TabName, Id),
lager:debug("[iot_endpoint] endpoint: ~p, get ack: ~p, delete from mnesia", [Name, Id]),
Actions = case StateName =:= connected of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions};
%%
handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) ->
lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]),
case do_post(NorthData, State) of
error ->
{keep_state, State};
{ok, TimerRef} ->
{keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}}
end;
%% 线
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
{keep_state, State};
handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{endpoint = Endpoint = #endpoint{name = Name, config = Config}, window_size = WindowSize}) ->
lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]),
try
{ok, PostmanPid} = create_postman(Endpoint),
%% window_size
Actions = lists:map(fun(_) -> {next_event, info, fetch_next} end, lists:seq(1, WindowSize)),
{next_state, connected, State#state{endpoint = Endpoint, postman_pid = PostmanPid, timer_map = maps:new(), flight_num = 0}, Actions}
catch _:Error ->
lager:warning("[iot_endpoint] endpoint: ~p, config: ~p, create postman get error: ~p", [Name, Config, Error]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{keep_state, State#state{endpoint = Endpoint, postman_pid = undefined}}
end;
%%
handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) ->
mnesia:delete_table(TabName),
{keep_state, State, [{reply, From, ok}]};
handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) ->
{keep_state, State, [{reply, From, F}]};
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"queue_num">> => mnesia_queue:table_size(TabName),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%% postman进程挂掉时
handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) ->
lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]),
lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{next_state, disconnected, State#state{timer_map = maps:new(), postman_pid = undefined}};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}}) ->
lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec remove_timer(Id :: integer(), TimerMap :: #{}) -> NTimerMap :: #{}.
remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) ->
case maps:take(Id, TimerMap) of
error ->
TimerMap;
{TimerRef, NTimerMap} ->
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
NTimerMap
end.
%% http和https协议的支持
create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) ->
WorkerArgs = [{url, Url}],
broker_postman:start_link(http_postman, WorkerArgs, PoolSize);
%% mqtt协议的支持,
create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) ->
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>,
Opts = [
{clientid, ClientId},
{host, binary_to_list(Host)},
{port, Port},
{tcp_opts, []},
{username, binary_to_list(Username)},
{password, binary_to_list(Password)},
{keepalive, 86400},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
mqtt_postman:start_link(Opts, Topic, Qos);
%% mysql协议的支持
create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) ->
WorkerArgs = [
{mysql_opts, [
{host, binary_to_list(Host)},
{port, Port},
{user, binary_to_list(Username)},
{password, binary_to_list(Password)},
{keep_alive, true},
{database, binary_to_list(Database)},
{queries, [<<"set names utf8">>]}
]},
{table, TableName}
],
broker_postman:start_link(mysql_postman, WorkerArgs, PoolSize);
create_postman(#endpoint{}) ->
throw(<<"not supported">>).
-spec do_post(NorthData :: #north_data{}, State :: #state{}) -> error | {ok, TimerRef :: reference()}.
do_post(NorthData = #north_data{id = Id}, #state{postman_pid = PostmanPid, tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) ->
lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]),
case safe_invoke_mapper(MapperFun, NorthData) of
{ok, Body} ->
PostmanPid ! {post, self(), make_post_data(NorthData, Body)},
%% , mapper可能会改变
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
{ok, TimerRef};
{error, Error} ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]),
mnesia_queue:delete(TabName, Id),
error;
ignore ->
lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]),
mnesia_queue:delete(TabName, Id),
error
end.
-spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) ->
{ok, Body :: any()} | ignore | {error, Reason :: any()}.
safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) ->
try
if
is_function(MapperFun, 2) ->
MapperFun(LocationCode, Fields);
is_function(MapperFun, 3) ->
MapperFun(LocationCode, Fields, Timestamp)
end
catch _:Error ->
{error, Error}
end.
-spec make_post_data(NorthData :: #north_data{}, Body :: any()) -> PostData :: #post_data{}.
make_post_data(#north_data{id = Id, location_code = LocationCode}, Body) ->
#post_data{id = Id, location_code = LocationCode, body = Body}.

View File

@ -1,53 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%% @end
%%%-------------------------------------------------------------------
-module(iot_endpoint_sup).
-include("iot.hrl").
-behaviour(supervisor).
-export([start_link/0, init/1, delete_endpoint/1, ensured_endpoint_started/1, stat/0]).
start_link() ->
supervisor:start_link({local, ?MODULE}, ?MODULE, []).
init([]) ->
mnesia_endpoint:import_static_endpoints(),
Specs = lists:map(fun child_spec/1, mnesia_endpoint:get_all_endpoints()),
{ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}.
-spec ensured_endpoint_started(Name :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}.
ensured_endpoint_started(Endpoint = #endpoint{}) ->
case supervisor:start_child(?MODULE, child_spec(Endpoint)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
{error, {'already_started', Pid}} when is_pid(Pid) ->
{ok, Pid};
{error, Error} ->
{error, Error}
end.
stat() ->
Children = supervisor:which_children(?MODULE),
lists:foreach(fun({Id, Pid, _, _}) ->
Stat = catch iot_endpoint:get_stat(Pid),
lager:debug("[iot_endpoint] id: ~p, stat: ~p", [Id, Stat])
end, Children).
delete_endpoint(Name) when is_binary(Name) ->
Id = iot_endpoint:get_name(Name),
supervisor:terminate_child(?MODULE, Id),
supervisor:delete_child(?MODULE, Id).
child_spec(Endpoint) ->
Id = iot_endpoint:get_name(Endpoint),
#{id => Id,
start => {iot_endpoint, start_link, [Id, Endpoint]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_endpoint']}.

View File

@ -1,5 +1,5 @@
%%%------------------------------------------------------------------- %%%-------------------------------------------------------------------
%%% @author aresei %%% @author
%%% @copyright (C) 2023, <COMPANY> %%% @copyright (C) 2023, <COMPANY>
%%% @doc %%% @doc
%%% %%%
@ -29,16 +29,15 @@
host_id :: integer(), host_id :: integer(),
%% %%
uuid :: binary(), uuid :: binary(),
%%
last_status :: integer(),
%% aes的key, %% aes的key,
aes = <<>> :: binary(), aes = <<>> :: binary(),
has_session = false :: boolean(), has_session = false :: boolean(),
%% %%
heartbeat_counter = 0 :: integer(), heartbeat_counter = 0 :: integer(),
%% websocket相关 %% websocket相关
channel_pid :: undefined | pid(), channel_pid :: undefined | pid(),
%% %%
metrics = #{} :: map() metrics = #{} :: map()
}). }).
@ -145,7 +144,7 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore). {stop, Reason :: term()} | ignore).
init([UUID]) -> init([UUID]) ->
{ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), host_bo:change_status(UUID, ?HOST_OFFLINE),
report_event(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE),
case host_bo:get_host_by_uuid(UUID) of case host_bo:get_host_by_uuid(UUID) of
@ -162,7 +161,7 @@ init([UUID]) ->
%% %%
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
{ok, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}}; {ok, #state{host_id = HostId, uuid = UUID, last_status = ?HOST_OFFLINE, aes = Aes, has_session = false}};
undefined -> undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore ignore
@ -185,7 +184,7 @@ handle_call(get_aes, _From, State = #state{aes = Aes}) ->
{reply, {ok, Aes}, State}; {reply, {ok, Aes}, State};
%% %%
handle_call(get_status, _From, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> handle_call(get_status, _From, State = #state{host_id = HostId, last_status = LastStatus, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) ->
%% devices %% devices
{ok, Devices} = device_bo:get_host_devices(HostId), {ok, Devices} = device_bo:get_host_devices(HostId),
DeviceInfos = lists:map(fun(DeviceUUID) -> DeviceInfos = lists:map(fun(DeviceUUID) ->
@ -200,6 +199,7 @@ handle_call(get_status, _From, State = #state{host_id = HostId, channel_pid = Ch
HasChannel = (ChannelPid /= undefined), HasChannel = (ChannelPid /= undefined),
Reply = #{ Reply = #{
<<"last_status">> => LastStatus,
<<"has_channel">> => HasChannel, <<"has_channel">> => HasChannel,
<<"has_session">> => HasSession, <<"has_session">> => HasSession,
<<"heartbeat_counter">> => HeartbeatCounter, <<"heartbeat_counter">> => HeartbeatCounter,
@ -255,7 +255,7 @@ handle_call({activate, false}, _From, State = #state{uuid = UUID, host_id = Host
report_event(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE),
change_devices_status(HostId, ?DEVICE_UNKNOWN), change_devices_status(HostId, ?DEVICE_UNKNOWN),
{reply, ok, State#state{channel_pid = undefined, has_session = false}}; {reply, ok, State#state{last_status = ?HOST_OFFLINE, channel_pid = undefined, has_session = false}};
%% channel %% channel
handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) ->
@ -281,7 +281,7 @@ handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = A
lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]),
{reply, {ok, <<10:8, EncReply/binary>>}, State#state{has_session = true}}; {reply, {ok, <<10:8, EncReply/binary>>}, State#state{last_status = ?HOST_ONLINE, has_session = true}};
false -> false ->
lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]),
Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
@ -441,7 +441,7 @@ handle_info({timeout, _, heartbeat_ticker}, State = #state{uuid = UUID, host_id
report_event(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE),
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
{noreply, State#state{heartbeat_counter = 0}}; {noreply, State#state{last_status = ?HOST_OFFLINE, heartbeat_counter = 0}};
%% %%
handle_info({timeout, _, heartbeat_ticker}, State = #state{}) -> handle_info({timeout, _, heartbeat_ticker}, State = #state{}) ->
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
@ -466,8 +466,8 @@ handle_info(Info, State = #state{has_session = HasSession}) ->
%% terminate. It should be the opposite of Module:init/1 and do any %% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with %% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored. %% Reason. The return value is ignored.
terminate(Reason, _State = #state{host_id = HostId, uuid = UUID, has_session = HasSession}) -> terminate(Reason, #state{host_id = HostId, uuid = UUID, has_session = HasSession}) ->
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]), lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p, status: offline", [UUID, Reason, HasSession]),
host_bo:change_status(UUID, ?HOST_OFFLINE), host_bo:change_status(UUID, ?HOST_OFFLINE),
report_event(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE),
@ -537,7 +537,7 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
<<"name">> => <<"主机状态"/utf8>>, <<"name">> => <<"主机状态"/utf8>>,
<<"timestamp">> => Timestamp <<"timestamp">> => Timestamp
}], }],
iot_router:route(UUID, FieldsList, Timestamp), iot_router:route_uuid(UUID, FieldsList, Timestamp),
lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]). lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]).
%% state转换成map %% state转换成map

View File

@ -11,25 +11,16 @@
-include("iot.hrl"). -include("iot.hrl").
%% API %% API
-export([route/3, route_uuid/3]). -export([route_uuid/3]).
-spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). -spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) -> route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) ->
%% %%
case mnesia_kv:hget(RouterUUID, <<"location_code">>) of case redis_client:hget(RouterUUID, <<"location_code">>) of
none -> {ok, undefined} ->
lager:debug("[iot_host] the north_data hget location_code, uuid: ~p, not found", [RouterUUID]); lager:debug("[iot_host] the north_data hget location_code, uuid: ~p, not found", [RouterUUID]);
{ok, LocationCode} when is_binary(LocationCode) ->
iot_zd_endpoint:forward(LocationCode, Fields, Timestamp);
{error, Reason} -> {error, Reason} ->
lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]); lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
{ok, LocationCode} -> end.
route(LocationCode, Fields, Timestamp)
end.
-spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok.
route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) ->
[begin
Pid = iot_endpoint:get_pid(EndpointName),
iot_endpoint:forward(Pid, LocationCode, Fields, Timestamp)
end || EndpointName <- mnesia_endpoint:get_keys()],
ok.

View File

@ -26,6 +26,8 @@ start_link() ->
%% type => worker(), % optional %% type => worker(), % optional
%% modules => modules()} % optional %% modules => modules()} % optional
init([]) -> init([]) ->
{ok, MqttOpts} = application:get_env(iot, zhongdian),
SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600},
Specs = [ Specs = [
#{ #{
@ -37,15 +39,6 @@ init([]) ->
modules => ['iot_logger'] modules => ['iot_logger']
}, },
#{
id => 'iot_endpoint_sup',
start => {'iot_endpoint_sup', start_link, []},
restart => permanent,
shutdown => 2000,
type => supervisor,
modules => ['iot_endpoint_sup']
},
#{ #{
id => 'iot_device_sup', id => 'iot_device_sup',
start => {'iot_device_sup', start_link, []}, start => {'iot_device_sup', start_link, []},
@ -62,8 +55,16 @@ init([]) ->
shutdown => 2000, shutdown => 2000,
type => supervisor, type => supervisor,
modules => ['iot_host_sup'] modules => ['iot_host_sup']
} },
#{
id => 'iot_zd_endpoint',
start => {'iot_zd_endpoint', start_link, [MqttOpts]},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_zd_endpoint']
}
], ],
{ok, {SupFlags, pools() ++ Specs}}. {ok, {SupFlags, pools() ++ Specs}}.

View File

@ -0,0 +1,232 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 06. 7 2023 12:02
%%%-------------------------------------------------------------------
-module(iot_zd_endpoint).
-author("aresei").
-include("iot.hrl").
-behaviour(gen_statem).
%% API
-export([start_link/1]).
-export([get_pid/0, forward/3, get_stat/0]).
%% gen_statem callbacks
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
%%
-define(RETRY_INTERVAL, 5000).
-record(state, {
mqtt_opts = [],
postman_pid :: undefined | pid(),
%% #north_data的id
cursor = 0 :: integer(),
%%
timer_ref :: undefined | reference(),
%%
is_busy = false :: boolean(),
%%
acc_num = 0
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec get_pid() -> undefined | pid().
get_pid() ->
whereis(?MODULE).
-spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}).
-spec get_stat() -> {ok, Stat :: #{}}.
get_stat() ->
gen_statem:call(?MODULE, get_stat, 5000).
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this
%% function does not return until Module:init/1 has returned.
start_link(Opts) when is_list(Opts) ->
gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []).
%%%===================================================================
%%% gen_statem callbacks
%%%===================================================================
%% @private
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
%% gen_statem:start_link/[3,4], this function is called by the new
%% process to initialize.
init([Opts]) ->
erlang:process_flag(trap_exit, true),
%% ,
erlang:start_timer(0, self(), create_postman),
{ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined}}.
%% @private
%% @doc This function is called by a gen_statem when it needs to find out
%% the callback mode of the callback module.
callback_mode() ->
handle_event_function.
%% @private
%% @doc There should be one instance of this function for each possible
%% state name. If callback_mode is state_functions, one of these
%% functions is called when gen_statem receives and event from
%% call/2, cast/2, or as a normal process message.
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) ->
mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}),
%%
Actions = case StateName =:= connected andalso not IsBusy of
true -> [{next_event, info, fetch_next}];
false -> []
end,
{keep_state, State, Actions};
%%
handle_event(info, fetch_next, disconnected, State) ->
lager:debug("[iot_zd_endpoint] fetch_next postman offline, data in queue"),
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{is_busy = true}) ->
{keep_state, State};
handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, cursor = Cursor}) ->
case mnesia_queue:dirty_fetch_next(Cursor) of
{ok, NCursor, NorthData} ->
lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]),
do_post(PostmanPid, NorthData),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
{keep_state, State#state{cursor = NCursor, timer_ref = TimerRef, is_busy = true}};
'$end_of_table' ->
{keep_state, State}
end;
%%
handle_event(info, {ack, Id}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum}) ->
ok = mnesia_queue:delete(Id),
lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]),
Actions = case StateName =:= connected of
true -> [{next_event, info, fetch_next}];
false -> []
end,
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
{keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions};
%%
handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) ->
lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]),
do_post(PostmanPid, NorthData),
TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}),
{keep_state, State#state{timer_ref = TimerRef}};
%% 线
handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) ->
{keep_state, State};
handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mqtt_opts = Opts}) ->
lager:debug("[iot_zd_endpoint] create postman"),
try
{ok, PostmanPid} = create_postman(Opts),
{next_state, connected, State#state{postman_pid = PostmanPid, timer_ref = undefined, is_busy = false}, [{next_event, info, fetch_next}]}
catch _:Error:Stack ->
lager:warning("[iot_zd_endpoint] config: ~p, create postman get error: ~p, stack: ~p", [Opts, Error, Stack]),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{keep_state, State#state{postman_pid = undefined}}
end;
%%
handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) ->
Stat = #{
<<"acc_num">> => AccNum,
<<"queue_num">> => mnesia_queue:table_size(),
<<"state_name">> => atom_to_binary(StateName)
},
{keep_state, State, [{reply, From, Stat}]};
%% postman进程挂掉时
handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{timer_ref = TimerRef, postman_pid = PostmanPid}) ->
lager:warning("[iot_zd_endpoint] postman exited with reason: ~p", [Reason]),
is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{next_state, disconnected, State#state{timer_ref = undefined, postman_pid = undefined}};
%% @private
%% @doc If callback_mode is handle_event_function, then whenever a
%% gen_statem receives an event from call/2, cast/2, or as a normal
%% process message, this function is called.
handle_event(EventType, Event, StateName, State) ->
lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]),
{keep_state, State}.
%% @private
%% @doc This function is called by a gen_statem when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_statem terminates with
%% Reason. The return value is ignored.
terminate(Reason, _StateName, #state{}) ->
lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
{ok, StateName, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
%% mqtt协议的支持,
create_postman(Opts) ->
Host = proplists:get_value(host, Opts),
Port = proplists:get_value(port, Opts),
Username = proplists:get_value(username, Opts),
Password = proplists:get_value(password, Opts),
Topic = proplists:get_value(topic, Opts),
Qos = proplists:get_value(qos, Opts),
Node = atom_to_binary(node()),
ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt">>,
PostmanOpts = [
{clientid, ClientId},
{host, Host},
{port, Port},
{tcp_opts, []},
{username, Username},
{password, Password},
{keepalive, 86400},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
mqtt_postman:start_link(PostmanOpts, Topic, Qos).
-spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return().
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
Data = #{
<<"version">> => <<"1.0">>,
<<"location_code">> => LocationCode,
<<"ts">> => Timestamp,
<<"properties">> => Fields
},
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
ok.

View File

@ -1,157 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 04. 7 2023 11:31
%%%-------------------------------------------------------------------
-module(mnesia_endpoint).
-author("aresei").
-include_lib("stdlib/include/qlc.hrl").
-include("iot.hrl").
%% API
-export([get_keys/0, get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]).
-export([to_map/1, config_equals/2]).
-export([import_static_endpoints/0]).
%%
-spec import_static_endpoints() -> no_return().
import_static_endpoints() ->
zhongdian_mqtt_endpoint().
zhongdian_mqtt_endpoint() ->
Mapper0 = "fun(LocationCode, Fields, Timestamp) ->
Data = #{
<<\"version\">> => <<\"1.0\">>,
<<\"location_code\">> => LocationCode,
<<\"ts\">> => Timestamp,
<<\"properties\">> => Fields
},
{ok, iolist_to_binary(jiffy:encode(Data, [force_utf8]))}
end.",
Mapper = list_to_binary(Mapper0),
{ok, MapperFun} = iot_util:parse_mapper(Mapper),
mnesia_endpoint:insert(#endpoint{
name = <<"zhongdian_mqtt">>,
title = <<"中电mqtt北向数据"/utf8>>,
matcher = <<".*">>,
mapper = Mapper,
mapper_fun = MapperFun,
config = #mqtt_endpoint{
host = <<"172.30.6.161">>,
port = 1883,
username = <<"admin">>,
password = <<"123456">>,
topic = <<"CET/NX/upload">>,
qos = 2
},
created_at = iot_util:timestamp_of_seconds()
}).
-spec get_keys() -> [Name :: binary()].
get_keys() ->
mnesia:dirty_all_keys(endpoint).
-spec get_all_endpoints() -> [#endpoint{}].
get_all_endpoints() ->
Fun = fun() ->
Q = qlc:q([E || E <- mnesia:table(endpoint)]),
qlc:e(Q)
end,
case mnesia:transaction(Fun) of
{atomic, Endpoints} ->
Endpoints;
{aborted, _} ->
[]
end.
-spec get_endpoint(Name :: binary()) -> undefined | {ok, #endpoint{}}.
get_endpoint(Name) when is_binary(Name) ->
case mnesia:dirty_read(endpoint, Name) of
[Endpoint | _] ->
{ok, Endpoint};
[] ->
undefined
end.
-spec insert(Endpoint :: #endpoint{}) -> ok | {error, Reason :: any()}.
insert(Endpoint = #endpoint{}) ->
case mnesia:transaction(fun() -> mnesia:write(endpoint, Endpoint, write) end) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
{error, Reason}
end.
-spec delete(Name :: binary()) -> ok | {error, Reason :: any()}.
delete(Name) when is_binary(Name) ->
case mnesia:transaction(fun() -> mnesia:delete(endpoint, Name, write) end) of
{atomic, ok} ->
ok;
{aborted, Reason} ->
{error, Reason}
end.
%% 2endpoint的配置项是否相同
-spec config_equals(any(), any()) -> boolean().
config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) ->
true;
config_equals(#ws_endpoint{url = Url}, #ws_endpoint{url = Url}) ->
true;
config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic},
#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
true;
config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos},
#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
true;
config_equals(_, _) ->
false.
to_map(#endpoint{name = Name, title = Title, matcher = Matcher, mapper = Mapper, config = Config, updated_at = UpdatedAt, created_at = CreatedAt}) ->
#{
<<"name">> => Name,
<<"title">> => Title,
<<"matcher">> => Matcher,
<<"mapper">> => Mapper,
<<"config">> => config_map(Config),
<<"updated_at">> => UpdatedAt,
<<"created_at">> => CreatedAt
}.
config_map(#http_endpoint{url = Url}) ->
#{<<"protocol">> => <<"http">>, <<"args">> => #{
<<"url">> => Url
}};
config_map(#ws_endpoint{url = Url}) ->
#{<<"protocol">> => <<"ws">>, <<"args">> => #{
<<"url">> => Url
}};
config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) ->
#{<<"protocol">> => <<"kafka">>, <<"args">> => #{
<<"username">> => Username,
<<"password">> => Password,
<<"bootstrap_servers">> => BootstrapServers,
<<"topic">> => Topic
}};
config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) ->
#{<<"protocol">> => <<"mqtt">>, <<"args">> => #{
<<"host">> => Host,
<<"port">> => Port,
<<"username">> => Username,
<<"password">> => Password,
<<"topic">> => Topic,
<<"qos">> => Qos
}};
config_map(#mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}) ->
#{<<"protocol">> => <<"mysql">>, <<"args">> => #{
<<"host">> => Host,
<<"port">> => Port,
<<"username">> => Username,
<<"password">> => Password,
<<"database">> => Database,
<<"table_name">> => TableName
}}.

View File

@ -1,949 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2021, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 21. 1 2021 2:17
%%%-------------------------------------------------------------------
-module(mnesia_kv).
-author("licheng5").
-include_lib("stdlib/include/qlc.hrl").
-include("iot.hrl").
%%
-define(WRONG_KIND, <<"Operation against a key holding the wrong kind of value">>).
-type(wrong_kind() :: binary()).
-type(redis_nil() :: none).
%% API
-export([all_expireable_keys/0, all_expired_keys/1, clean_expired_keys/0]).
-export([del/1, exists/1, expire/2, keys/1, persist/1, ttl/1, type/1]).
-export([get/1, set/2, setnx/2]).
-export([hexists/2, hdel/2, hkeys/1, hget/2, hmget/2, hset/3, hmset/2, hgetall/1, hlen/1]).
-export([sadd/2, scard/1, sdiff/2, sismember/2, smembers/1, sinter/2, sunion/2, spop/1, srandmember/2, srem/2]).
-export([lindex/2, linsert/4, llen/1, lpop/1, lpush/2, lpushx/2, lrange/3, lrem/3, lset/3, ltrim/3, rpop/1, rpush/2, rpushx/2]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% keys
-spec clean_expired_keys() -> ok | {error, Reason :: any()}.
clean_expired_keys() ->
%NowSecond = iot_util:timestamp_of_seconds(),
%case redis_mnesia_kv:all_expired_keys(NowSecond) of
% {ok, []} ->
% ok;
% {ok, Keys} ->
% lists:foreach(fun(Key) -> mnesia:transaction(fun() -> mnesia:delete(kv, Key, write) end) end, Keys),
% ok
%end.
ok.
%% keys
-spec all_expireable_keys() -> {ok, [{Key :: binary(), ExpireAt :: integer()}]} | {error, Reason :: any()}.
all_expireable_keys() ->
Fun = fun() ->
Q = qlc:q([{E#kv.key, E#kv.expire_at} || E <- mnesia:table(kv), E#kv.expire_at > 0]),
qlc:e(Q)
end,
case mnesia:transaction(Fun) of
{atomic, Items} ->
{ok, Items};
{aborted, Reason} ->
{error, Reason}
end.
-spec all_expired_keys(ExpireAt :: integer()) -> {ok, [Key :: binary()]} | {error, Reason :: any()}.
all_expired_keys(ExpireAt) when is_integer(ExpireAt) ->
Fun = fun() ->
Q = qlc:q([E#kv.key || E <- mnesia:table(kv),
E#kv.expire_at > 0, E#kv.expire_at =< ExpireAt]),
qlc:e(Q)
end,
case mnesia:transaction(Fun) of
{atomic, Items} ->
{ok, Items};
{aborted, Reason} ->
{error, Reason}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Key管理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% key
-spec del(Key :: binary()) -> 0 | 1.
del(Key) when is_binary(Key) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[#kv{expire_at = ExpireAt}] ->
ok = mnesia:delete(kv, Key, write),
{1, ExpireAt}
end
end,
case mnesia:transaction(Fun) of
{atomic, {N, _ExpireAt}} ->
N;
{atomic, N} when is_integer(N) ->
N;
{aborted, _Reason} ->
0
end.
%% key 1 0
-spec exists(Key :: binary()) -> 0 | 1.
exists(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
0;
[#kv{}] ->
1
end.
% 1 key key ( 2.1.3 Redis key ) 0
-spec expire(Key :: binary(), Second :: integer()) -> 0 | 1.
expire(Key, Second) when is_binary(Key), is_integer(Second) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV] ->
NExpireAt = iot_util:timestamp_of_seconds() + Second,
ok = mnesia:write(kv, KV#kv{expire_at = NExpireAt}, write),
1
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, _Reason} ->
0
end.
%%
-spec keys(Pattern :: binary()) -> Keys :: list().
keys(Pattern) when is_binary(Pattern) ->
Keys = mnesia:dirty_all_keys(kv),
case Pattern of
<<"*">> ->
Keys;
_ ->
case binary:split(Pattern, <<"*">>) of
[<<>> | _] ->
[];
[Prefix | _] ->
Len = byte_size(Prefix),
lists:filter(fun(Key) ->
case Key of
<<Prefix:Len/binary, _/binary>> ->
true;
_ ->
false
end
end, Keys)
end
end.
%% 1 key key 0
-spec persist(Key :: binary()) -> 0 | 1.
persist(Key) when is_binary(Key) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[#kv{expire_at = 0}] ->
0;
[KV] ->
ok = mnesia:write(kv, KV#kv{expire_at = 0}, write),
1
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, _} ->
0
end.
%% key -2 key -1 key
-spec ttl(Key :: binary()) -> TTL :: integer().
ttl(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
-2;
[#kv{expire_at = 0}] ->
-1;
[#kv{expire_at = ExpireAt}] ->
NowSeconds = iot_util:timestamp_of_seconds(),
ExpireAt - NowSeconds
end.
%% key的类型
%%-spec type(Key :: binary()) -> None :: redis_nil() | Type :: atom().
type(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
none;
[#kv{type = Type}] ->
Type
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%-spec get(Key :: binary()) -> redis_nil() | Val :: binary() | {error, Reason :: binary()} .
get(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
none;
[#kv{val = Val, type = string}] ->
Val;
_ ->
{error, ?WRONG_KIND}
end.
-spec set(Key :: binary(), Val :: binary()) -> boolean().
set(Key, Val) when is_binary(Key), is_binary(Val) ->
KV = #kv{key = Key, val = Val, type = string},
case mnesia:transaction(fun() -> mnesia:write(kv, KV, write) end) of
{atomic, ok} ->
true;
{aborted, _} ->
false
end.
-spec setnx(Key :: binary(), Val :: binary()) -> boolean().
setnx(Key, Val) when is_binary(Key), is_binary(Val) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
KV = #kv{key = Key, val = Val, type = string},
ok = mnesia:write(kv, KV, write),
1;
[#kv{}] ->
0
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% HashTable处理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% 1 0
-spec hset(Key :: binary(), Field :: binary(), Val :: binary()) -> N :: integer() | {error, Reason :: binary()}.
hset(Key, Field, Val) when is_binary(Key), is_binary(Field), is_binary(Val) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
KV = #kv{key = Key, val = #{Field => Val}, type = hash},
ok = mnesia:write(kv, KV, write),
1;
[KV = #kv{val = Map0, type = hash}] ->
IsKey = maps:is_key(Field, Map0),
Map = maps:put(Field, Val, Map0),
ok = mnesia:write(kv, KV#kv{key = Key, val = Map}, write),
case IsKey of
true -> 0;
false -> 1
end;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
-spec hmset(Key :: binary(), Map :: map()) -> ok | {error, Reason :: binary()}.
hmset(Key, Map) when is_binary(Key), is_map(Map) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
KV = #kv{key = Key, val = Map, type = hash},
mnesia:write(kv, KV, write);
[KV = #kv{val = Map0, type = hash}] ->
Map1 = maps:merge(Map, Map0),
mnesia:write(kv, KV#kv{key = Key, val = Map1}, write);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, ok} ->
<<"OK">>;
{aborted, Reason} ->
{error, Reason}
end.
%% key nil
-spec hget(Key :: binary(), Field :: binary()) -> redis_nil() | {ok, Val :: any()} | {error, Reason :: binary()}.
hget(Key, Field) when is_binary(Key), is_binary(Field) ->
case mnesia:dirty_read(kv, Key) of
[] ->
none;
[#kv{val = #{Field := Val}, type = hash}] ->
{ok, Val};
_ ->
{error, ?WRONG_KIND}
end.
-spec hmget(Key :: binary(), [binary()]) -> list() | {error, Reason :: binary()}.
hmget(Key, Fields) when is_binary(Key), is_list(Fields) ->
case mnesia:dirty_read(kv, Key) of
[] ->
[none || _ <- Fields];
[#kv{val = Map0, type = hash}] ->
[maps:get(Field, Map0, none) || Field <- Fields];
_ ->
{error, ?WRONG_KIND}
end.
-spec hdel(Key :: binary(), [Field :: binary()]) -> Num :: integer() | {error, Reason :: binary()}.
hdel(Key, Fields) when is_binary(Key), is_list(Fields) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV = #kv{val = Map, type = hash}] ->
Map1 = lists:foldl(fun(Field, Map0) -> maps:remove(Field, Map0) end, Map, Fields),
ok = mnesia:write(kv, KV#kv{key = Key, val = Map1}, write),
map_size(Map) - map_size(Map1);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% 1 key 0
-spec hexists(Key :: binary(), Field :: binary()) -> 0 | 1 | {error, Reason :: binary()}.
hexists(Key, Field) when is_binary(Key), is_binary(Field) ->
case mnesia:dirty_read(kv, Key) of
[] ->
0;
[#kv{val = Map0, type = hash}] ->
case maps:is_key(Field, Map0) of
true -> 1;
false -> 0
end;
_ ->
{error, ?WRONG_KIND}
end.
%% key
-spec hgetall(Key :: binary()) -> Map :: map() | {error, Reason :: binary()}.
hgetall(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
[];
[#kv{val = Map, type = hash}] ->
lists:foldl(fun({Field, Val}, Acc) -> [Field, Val | Acc] end, [], maps:to_list(Map));
_ ->
{error, ?WRONG_KIND}
end.
%% field key
-spec hkeys(Key :: binary()) -> Keys :: list() | {error, Reason :: binary()}.
hkeys(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
[];
[#kv{val = Map, type = hash}] ->
maps:keys(Map);
_ ->
{error, ?WRONG_KIND}
end.
%% key 0
-spec hlen(Key :: binary()) -> 0 | 1 | {error, Reason :: binary()}.
hlen(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
0;
[#kv{val = Map, type = hash}] ->
map_size(Map);
_ ->
{error, ?WRONG_KIND}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% set处理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
-spec sadd(Key :: binary(), Members :: list()) -> Num :: integer() | {error, Reason :: binary()}.
sadd(Key, Members) when is_binary(Key), is_list(Members) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
S = sets:from_list(Members),
KV = #kv{key = Key, val = S, type = set},
ok = mnesia:write(kv, KV, write),
sets:size(S);
[KV = #kv{val = Set0, type = set}] ->
Set1 = lists:foldl(fun(E, S0) -> sets:add_element(E, S0) end, Set0, Members),
ok = mnesia:write(kv, KV#kv{key = Key, val = Set1}, write),
sets:size(Set1) - sets:size(Set0);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% key 0
-spec scard(Key :: binary()) -> Num :: integer() | {error, Reason :: wrong_kind()}.
scard(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
0;
[#kv{val = Set0, type = set}] ->
sets:size(Set0);
_ ->
{error, ?WRONG_KIND}
end.
%% 1 key 0
-spec sismember(Key :: binary(), Member :: binary()) -> boolean() | {error, wrong_kind()}.
sismember(Key, Member) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
0;
[#kv{val = S, type = set}] ->
case sets:is_element(Member, S) of
true -> 1;
false -> 0
end;
_ ->
{error, ?WRONG_KIND}
end.
-spec sdiff(Key1 :: binary(), Key2 :: binary()) -> list() | {error, wrong_kind()}.
sdiff(Key1, Key2) when is_binary(Key1), is_binary(Key2) ->
case {mnesia:dirty_read(kv, Key1), mnesia:dirty_read(kv, Key2)} of
{[#kv{val = S1, type = set}], [#kv{val = S2, type = set}]} ->
sets:to_list(S1) -- sets:to_list(S2);
{[#kv{val = S1, type = set}], []} ->
sets:to_list(S1);
{[], [#kv{type = set}]} ->
[];
{[], []} ->
[];
_ ->
{error, ?WRONG_KIND}
end.
-spec sinter(Key1 :: binary(), Key2 :: binary()) -> list() | {error, wrong_kind()}.
sinter(Key1, Key2) when is_binary(Key1), is_binary(Key2) ->
case {mnesia:dirty_read(kv, Key1), mnesia:dirty_read(kv, Key2)} of
{[#kv{val = S1, type = set}], [#kv{val = S2, type = set}]} ->
sets:to_list(sets:intersection(S1, S2));
{[#kv{type = set}], []} ->
[];
{[], [#kv{type = set}]} ->
[];
{[], []} ->
[];
_ ->
{error, ?WRONG_KIND}
end.
%% key
-spec smembers(Key :: binary()) -> list() | {error, wrong_kind()}.
smembers(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[#kv{val = S, type = set}] ->
sets:to_list(S);
[] ->
[];
_ ->
{error, ?WRONG_KIND}
end.
%% nil
%%-spec spop(Key :: binary()) -> redis_nil() | Member :: binary() | {error, wrong_kind()}.
spop(Key) when is_binary(Key) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
none;
[KV = #kv{val = S0, type = set}] ->
case sets:size(S0) of
0 ->
none;
Size ->
E = lists:nth(rand:uniform(Size), sets:to_list(S0)),
S1 = sets:del_element(E, S0),
ok = mnesia:write(kv, KV#kv{val = S1}, write),
E
end;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, E} ->
E;
{aborted, Reason} ->
{error, Reason}
end.
%% key nil count
-spec srandmember(Key :: binary(), Count :: integer()) -> list() | {error, wrong_kind()}.
srandmember(Key, Count) when is_binary(Key), is_integer(Count), Count > 0 ->
case mnesia:dirty_read(kv, Key) of
[] ->
[];
[#kv{val = S, type = set}] ->
Size = sets:size(S),
L = sets:to_list(S),
case Size =< Count of
true ->
L;
false ->
lists:sublist(L, rand:uniform(Size - Count + 1), Count)
end;
_ ->
{error, ?WRONG_KIND}
end.
%%
-spec srem(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}.
srem(Key, Members) when is_binary(Key), is_list(Members) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV = #kv{val = S, type = set}] ->
Size = sets:size(S),
S1 = lists:foldl(fun(E, S0) -> sets:del_element(E, S0) end, S, Members),
ok = mnesia:write(kv, KV#kv{val = S1}, write),
Size - sets:size(S1);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% key
-spec sunion(Key :: binary(), Key2 :: binary()) -> Members :: list() | {error, wrong_kind()}.
sunion(Key1, Key2) when is_binary(Key1), is_binary(Key2) ->
case {mnesia:dirty_read(kv, Key1), mnesia:dirty_read(kv, Key2)} of
{[#kv{val = S1, type = set}], [#kv{val = S2, type = set}]} ->
sets:to_list(sets:union(S1, S2));
{[#kv{val = S1, type = set}], []} ->
sets:to_list(S1);
{[], [#kv{val = S2, type = set}]} ->
sets:to_list(S2);
{[], []} ->
[];
{_, _} ->
{error, ?WRONG_KIND}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% List
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% -1 -2 nil
%%-spec lindex(Key :: binary(), Idx :: integer()) -> redis_nil() | Member :: binary() | {error, wrong_kind()}.
lindex(Key, Idx) when is_binary(Key), is_integer(Idx) ->
case mnesia:dirty_read(kv, Key) of
[] ->
none;
[#kv{val = L, type = list}] ->
Idx1 = fix_pos(Idx, length(L)),
case Idx1 >= 1 andalso Idx1 =< length(L) of
true ->
lists:nth(fix_pos(Idx, length(L)), L);
false ->
none
end;
_ ->
{error, ?WRONG_KIND}
end.
-spec llen(Key :: binary()) -> Size :: integer() | {error, wrong_kind()}.
llen(Key) when is_binary(Key) ->
case mnesia:dirty_read(kv, Key) of
[] ->
0;
[#kv{val = L, type = list}] ->
length(L);
_ ->
{error, ?WRONG_KIND}
end.
%% key nil
%%-spec lpop(Key :: binary()) -> redis_nil() | E :: binary() | {error, wrong_kind()}.
lpop(Key) when is_binary(Key) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
none;
[#kv{val = [], type = list}] ->
none;
[KV = #kv{val = [H | Tail], type = list}] ->
ok = mnesia:write(kv, KV#kv{val = Tail}, write),
H;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, E} ->
E;
{aborted, Reason} ->
{error, Reason}
end.
%%
%%-spec rpop(Key :: binary()) -> redis_nil() | E :: binary() | {error, wrong_kind()}.
rpop(Key) when is_binary(Key) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
none;
[#kv{val = [], type = list}] ->
none;
[KV = #kv{val = L0, type = list}] ->
[H | Tail] = lists:reverse(L0),
ok = mnesia:write(kv, KV#kv{val = lists:reverse(Tail)}, write),
H;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, E} ->
E;
{aborted, Reason} ->
{error, Reason}
end.
%% key LPUSH key
%% LPUSH
-spec lpush(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}.
lpush(Key, Members) when is_binary(Key), is_list(Members) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
ok = mnesia:write(kv, #kv{key = Key, val = Members, type = list}, write),
length(Members);
[KV = #kv{val = L0, type = list}] ->
L = Members ++ L0,
ok = mnesia:write(kv, KV#kv{val = L, type = list}, write),
length(L);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%%
%% LPUSHX
-spec lpushx(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}.
lpushx(Key, Members) when is_binary(Key), is_list(Members) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV = #kv{val = L0, type = list}] ->
L = Members ++ L0,
ok = mnesia:write(kv, KV#kv{val = L}, write),
length(L);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% ()
%% RPUSH
%% RPUSH
-spec rpush(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}.
rpush(Key, Members) when is_binary(Key), is_list(Members) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
ok = mnesia:write(kv, #kv{key = Key, val = Members, type = list}, write),
length(Members);
[KV = #kv{val = L0, type = list}] ->
L = L0 ++ Members,
ok = mnesia:write(kv, KV#kv{val = L}, write),
length(L);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% ()
%%
%% RPUSH
-spec rpushx(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}.
rpushx(Key, Members) when is_binary(Key), is_list(Members) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV = #kv{val = L0, type = list}] ->
L = L0 ++ Members,
ok = mnesia:write(kv, KV#kv{val = L, type = list}, write),
length(L);
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% START END 0 1
%% 使 -1 -2
-spec lrange(Key :: binary(), Start :: integer(), End :: integer()) -> list() | {error, wrong_kind()}.
lrange(Key, Start, End) when is_binary(Key), is_integer(Start), is_integer(End) ->
case mnesia:dirty_read(kv, Key) of
[] ->
[];
[#kv{val = L, type = list}] ->
Len = length(L),
Start1 = fix_pos(Start, Len),
End1 = fix_pos(End, Len),
case Start1 =< End1 of
true ->
lists:sublist(L, Start1, End1 - Start1 + 1);
false ->
[]
end;
_ ->
{error, ?WRONG_KIND}
end.
%% COUNT VALUE
%% COUNT
%% count > 0 : VALUE COUNT
%% count < 0 : VALUE COUNT
%% count = 0 : VALUE
%% 0
-spec lrem(Key :: binary(), Count :: integer(), Val :: binary()) -> Num :: integer() | {error, wrong_kind()}.
lrem(Key, Count, Val) when is_binary(Key), is_integer(Count) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV = #kv{val = L0, type = list}] ->
if
Count > 0 ->
L1 = lists:foldl(fun(_, L) -> lists:delete(Val, L) end, L0, lists:seq(1, Count)),
ok = mnesia:write(kv, KV#kv{val = L1}, write),
length(L0) - length(L1);
Count =:= 0 ->
{DeletedVals, L1} = lists:partition(fun(E) -> E =:= Val end, L0),
case DeletedVals =/= [] of
true ->
ok = mnesia:write(kv, KV#kv{val = L1}, write);
false ->
ok
end,
length(DeletedVals);
Count < 0 ->
L1 = lists:foldl(fun(_, L) ->
lists:delete(Val, L) end, lists:reverse(L0), lists:seq(1, abs(Count))),
ok = mnesia:write(kv, KV#kv{val = lists:reverse(L1)}, write),
length(L0) - length(L1)
end;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%% LSET
%% ok
-spec lset(Key :: binary(), Idx :: integer(), Val :: binary()) -> ok | {error, wrong_kind()}.
lset(Key, Idx, Val) when is_binary(Key), is_integer(Idx) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
mnesia:abort(?WRONG_KIND);
[KV = #kv{val = L0, type = list}] ->
case length(L0) < Idx of
true ->
mnesia:abort(<<"Index out of bounds">>);
false ->
L1 = lists_update(L0, Idx, Val),
mnesia:write(kv, KV#kv{val = L1}, write)
end;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, ok} ->
<<"OK">>;
{aborted, Reason} ->
{error, Reason}
end.
%%
%% 0 1
%% -1 -2
%% ok
-spec ltrim(Key :: binary(), Start :: integer(), End :: integer()) -> ok | {error, wrong_kind()}.
ltrim(Key, Start, End) when is_binary(Key), is_integer(Start), is_integer(End) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
ok;
[KV = #kv{val = L0, type = list}] ->
Len = length(L0),
Start1 = fix_pos(Start, Len),
End1 = fix_pos(End, Len),
case Start1 =< End1 of
true ->
L1 = lists:sublist(L0, Start1, End1 - Start1 + 1),
mnesia:write(kv, KV#kv{val = L1}, write);
false ->
mnesia:write(kv, KV#kv{val = []}, write)
end
end
end,
case mnesia:transaction(Fun) of
{atomic, ok} ->
<<"OK">>;
{aborted, Reason} ->
{error, Reason}
end.
%%
%%
%% key
%% -1 key 0
-spec linsert(Key :: binary(), Position :: binary(), Pivot :: binary(), Value :: binary()) ->
Num :: integer() |
{error, wrong_kind()}.
linsert(Key, Position, Pivot, Value) when is_binary(Key), is_binary(Position) ->
Fun = fun() ->
case mnesia:read(kv, Key) of
[] ->
0;
[KV = #kv{val = L0, type = list}] ->
L = case Position of
<<"BEFORE">> ->
lists_insert_before(L0, Pivot, Value);
<<"AFTER">> ->
lists_insert_after(L0, Pivot, Value)
end,
ok = mnesia:write(kv, KV#kv{val = L}, write),
case L0 =:= L of
true -> -1;
false -> length(L)
end;
_ ->
mnesia:abort(?WRONG_KIND)
end
end,
case mnesia:transaction(Fun) of
{atomic, N} ->
N;
{aborted, Reason} ->
{error, Reason}
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
fix_pos(Pos, Len) when is_integer(Pos), is_integer(Len) ->
case Pos >= 1 of
true -> Pos;
false -> Len + Pos
end.
%%
-spec lists_update(L :: list(), N :: integer(), Val :: any()) -> L1 :: list().
lists_update(L, N, Val) when is_integer(N), N > 0, is_list(L) ->
case length(L) < N of
true -> L;
false -> lists_update0(L, N, Val)
end.
lists_update0([_ | Tail], 1, Val) ->
[Val | Tail];
lists_update0([Hd | Tail], N, Val) ->
[Hd | lists_update0(Tail, N - 1, Val)].
%%
lists_insert_before(L, Pivot, Val) when is_list(L) ->
lists_insert_before0(L, Pivot, Val).
lists_insert_before0([], _Pivot, _Val) ->
[];
lists_insert_before0([Pivot | Tail], Pivot, Val) ->
[Val, Pivot | Tail];
lists_insert_before0([H | Tail], Pivot, Val) ->
[H | lists_insert_before0(Tail, Pivot, Val)].
%%
lists_insert_after(L, Pivot, Val) when is_list(L) ->
lists_insert_after0(L, Pivot, Val).
lists_insert_after0([], _Pivot, _Val) ->
[];
lists_insert_after0([Pivot | Tail], Pivot, Val) ->
[Pivot, Val | Tail];
lists_insert_after0([H | Tail], Pivot, Val) ->
[H | lists_insert_after0(Tail, Pivot, Val)].

View File

@ -10,56 +10,42 @@
-author("aresei"). -author("aresei").
-include("iot.hrl"). -include("iot.hrl").
-define(TAB_NAME, 'queue_data:zhongdian').
%% API %% API
-export([insert/2, delete/2, ensure_queue/1, table_size/1, dirty_fetch_next/2]). -export([insert/1, delete/1, table_size/0, dirty_fetch_next/1]).
-spec insert(Tab :: atom(), #north_data{}) -> ok | {error, Reason :: any()}. -spec insert(#north_data{}) -> ok | {error, Reason :: any()}.
insert(Tab, Item = #north_data{}) -> insert(Item = #north_data{}) ->
Id = mnesia_id_generator:next_id(Tab), Id = mnesia_id_generator:next_id(?TAB_NAME),
NItem = Item#north_data{id = Id}, NItem = Item#north_data{id = Id},
case mnesia:transaction(fun() -> mnesia:write(Tab, NItem, write) end) of case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, NItem, write) end) of
{atomic, ok} -> {atomic, ok} ->
ok; ok;
{aborted, Reason} -> {aborted, Reason} ->
{error, Reason} {error, Reason}
end. end.
-spec delete(Tab :: atom(), Key :: any()) -> ok | {error, Reason :: any()}. -spec delete(Key :: any()) -> ok | {error, Reason :: any()}.
delete(Tab, Key) when is_atom(Tab) -> delete(Key) when is_integer(Key) ->
case mnesia:transaction(fun() -> mnesia:delete(Tab, Key, write) end) of case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, Key, write) end) of
{atomic, ok} -> {atomic, ok} ->
ok; ok;
{aborted, Reason} -> {aborted, Reason} ->
{error, Reason} {error, Reason}
end. end.
%% -spec table_size() -> integer().
-spec ensure_queue(Name :: atom()) -> no_return(). table_size() ->
ensure_queue(Name) when is_atom(Name) -> mnesia:table_info(?TAB_NAME, size).
Tables = mnesia:system_info(tables),
case lists:member(Name, Tables) of
true ->
mnesia:wait_for_tables([Name], infinity);
false ->
mnesia:create_table(Name, [
{attributes, record_info(fields, north_data)},
{record_name, north_data},
{disc_copies, [node()]},
{type, ordered_set}
])
end.
-spec table_size(Tab :: atom()) -> integer(). -spec dirty_fetch_next(Cursor :: integer()) ->
table_size(Tab) when is_atom(Tab) ->
mnesia:table_info(Tab, size).
-spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) ->
{ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'.
dirty_fetch_next(Tab, Cursor) when is_atom(Tab), is_integer(Cursor) -> dirty_fetch_next(Cursor) when is_integer(Cursor) ->
case mnesia:dirty_next(Tab, Cursor) of case mnesia:dirty_next(?TAB_NAME, Cursor) of
'$end_of_table' -> '$end_of_table' ->
'$end_of_table'; '$end_of_table';
NextKey -> NextKey ->
[Item] = mnesia:dirty_read(Tab, NextKey), [Item] = mnesia:dirty_read(?TAB_NAME, NextKey),
{ok, NextKey, Item} {ok, NextKey, Item}
end. end.

View File

@ -13,16 +13,7 @@
%% API %% API
-export([rsa_encode/1]). -export([rsa_encode/1]).
-export([insert_services/1]). -export([insert_services/1]).
-export([insert_endpoints/0, test_http/0, test_mysql/0, test_mqtt/0, test_influxdb/0]). -export([test_mqtt/0, test_influxdb/0]).
fun_x(LocationCode, Fields, Timestamp) ->
Data = #{
<<"version">> => <<"1.0">>,
<<"location_code">> => LocationCode,
<<"ts">> => Timestamp,
<<"properties">> => Fields
},
{ok, iolist_to_binary(jiffy:encode(Data, [force_utf8]))}.
test_influxdb() -> test_influxdb() ->
UUID = <<"device123123">>, UUID = <<"device123123">>,
@ -38,112 +29,13 @@ test_influxdb() ->
end) end)
end, lists:seq(1, 100)). end, lists:seq(1, 100)).
test_http() ->
Name = <<"zhongguodianli">>,
Pid = iot_endpoint:get_pid(Name),
lists:foreach(fun(Id0) ->
Id = integer_to_binary(Id0),
Fields = [
#{
<<"key">> => <<"test:", Id/binary>>,
<<"value">> => Id,
<<"unit">> => <<"cm">>
}
],
iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields, iot_util:timestamp_of_seconds())
end, lists:seq(1, 10000)).
test_mysql() ->
lists:foreach(fun(_) ->
iot_router:route(<<"mysql123">>, [
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
#{<<"key">> => <<"age">>, <<"value">> => 30},
#{<<"key">> => <<"flow">>, <<"value">> => 30}
], iot_util:timestamp_of_seconds())
end, lists:seq(1, 10000)).
test_mqtt() -> test_mqtt() ->
iot_router:route(<<"test123">>, [ iot_zd_endpoint:forward(<<"location_code_test123">>, [
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
#{<<"key">> => <<"age">>, <<"value">> => 30}, #{<<"key">> => <<"age">>, <<"value">> => 30},
#{<<"key">> => <<"flow">>, <<"value">> => 30} #{<<"key">> => <<"flow">>, <<"value">> => 30}
], iot_util:timestamp_of_seconds()). ], iot_util:timestamp_of_seconds()).
insert_endpoints() ->
{ok, Mapper, F} = simple_mapper(),
mnesia_endpoint:insert(#endpoint{
name = <<"zhongguodianli">>,
title = <<"中国电力"/utf8>>,
matcher = <<"test12*">>,
mapper = Mapper,
mapper_fun = F,
config = #http_endpoint{url = <<"http://localhost:18080/test/receiver">>},
created_at = iot_util:timestamp_of_seconds()
}),
mnesia_endpoint:insert(#endpoint{
name = <<"mytest">>,
title = <<"测试数据"/utf8>>,
matcher = <<"test*">>,
mapper = Mapper,
mapper_fun = F,
config = #mqtt_endpoint{
host = <<"39.98.184.67">>,
port = 1883,
username = <<"test">>,
password = <<"test1234">>,
topic = <<"CET/NX/${location_code}/upload">>,
qos = 2
},
created_at = iot_util:timestamp_of_seconds()
}),
{ok, MysqlMapper, MysqlF} = mysql_mapper(),
mnesia_endpoint:insert(#endpoint{
name = <<"mysql">>,
title = <<"测试mysql"/utf8>>,
matcher = <<"mysql*">>,
mapper = MysqlMapper,
mapper_fun = MysqlF,
config = #mysql_endpoint{
host = <<"localhost">>,
port = 3306,
username = <<"php_an">>,
password = <<"123456">>,
database = <<"iot">>,
table_name = <<"north_data">>
},
created_at = iot_util:timestamp_of_seconds()
}),
{Mapper, F}.
simple_mapper() ->
Mapper0 = "fun(LocationCode, Fields) ->
Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields),
Fields2 = maps:from_list(Fields1),
Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]),
{ok, iolist_to_binary(Bin)}
end.",
Mapper = list_to_binary(Mapper0),
{ok, F} = iot_util:parse_mapper(Mapper),
{ok, Mapper, F}.
mysql_mapper() ->
Mapper0 = "fun(LocationCode, Fields, Timestamp) ->
Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields),
Content = jiffy:encode(maps:from_list(Fields1), [force_utf8]),
{ok, [{<<\"location_code\">>, LocationCode}, {<<\"content\">>, Content}, {<<\"created_ts\">>, Timestamp}]}
end.",
Mapper = list_to_binary(Mapper0),
{ok, F} = iot_util:parse_mapper(Mapper),
{ok, Mapper, F}.
insert_services(Num) -> insert_services(Num) ->
lists:foreach(fun(Id) -> lists:foreach(fun(Id) ->
Res = mysql_pool:insert(mysql_iot, <<"micro_service">>, Res = mysql_pool:insert(mysql_iot, <<"micro_service">>,

View File

@ -0,0 +1,21 @@
%%%-------------------------------------------------------------------
%%% @author aresei
%%% @copyright (C) 2017, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 21. 2017 13:33
%%%-------------------------------------------------------------------
-module(redis_client).
-author("aresei").
%% API
-export([hget/2]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% HashTable处理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}.
hget(Key, Field) when is_binary(Key), is_binary(Field) ->
poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end).

View File

@ -1,197 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2021, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 21. 1 2021 11:23
%%%-------------------------------------------------------------------
-module(redis_handler).
-author("licheng5").
%% API
-export([handle/1]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% Key管理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle([<<"DEL">>, Key]) when is_binary(Key) ->
N = mnesia_kv:del(Key),
{reply, N};
handle([<<"EXISTS">>, Key]) when is_binary(Key) ->
N = mnesia_kv:exists(Key),
{reply, N};
handle([<<"EXPIRE">>, Key, Second0]) when is_binary(Key), is_binary(Second0) ->
Second = binary_to_integer(Second0),
N = mnesia_kv:expire(Key, Second),
{reply, N};
handle([<<"KEYS">>, Pattern]) when is_binary(Pattern) andalso Pattern =/= <<>> ->
Keys = mnesia_kv:keys(Pattern),
{reply, Keys};
handle([<<"PERSIST">>, Key]) when is_binary(Key) ->
N = mnesia_kv:persist(Key),
{reply, N};
handle([<<"TTL">>, Key]) when is_binary(Key) ->
TTL = mnesia_kv:ttl(Key),
{reply, TTL};
handle([<<"TYPE">>, Key]) when is_binary(Key) ->
Type = mnesia_kv:type(Key),
{reply, atom_to_binary(Type)};
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle([<<"GET">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:get(Key)};
handle([<<"SET">>, Key, Val]) when is_binary(Key), is_binary(Val) ->
case mnesia_kv:set(Key, Val) of
true ->
{reply, <<"OK">>};
false ->
{reply, <<"FAILED">>}
end;
handle([<<"SETNX">>, Key, Val]) when is_binary(Key), is_binary(Val) ->
{reply, mnesia_kv:setnx(Key, Val)};
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% HashTable处理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle([<<"HSET">>, Key, Field, Val]) when is_binary(Key), is_binary(Field), is_binary(Val) ->
{reply, mnesia_kv:hset(Key, Field, Val)};
handle([<<"HMSET">>, Key | KvPairs]) when is_binary(Key), length(KvPairs) rem 2 =:= 0 ->
{reply, mnesia_kv:hmset(Key, lists_to_map(KvPairs))};
handle([<<"HGET">>, Key, Field]) when is_binary(Key), is_binary(Field) ->
{reply, mnesia_kv:hget(Key, Field)};
handle([<<"HMGET">>, Key | Fields]) when is_binary(Key), is_list(Fields) ->
{reply, mnesia_kv:hmget(Key, Fields)};
handle([<<"HDEL">>, Key | Fields]) when is_binary(Key), is_list(Fields) ->
{reply, mnesia_kv:hdel(Key, Fields)};
handle([<<"HEXISTS">>, Key, Field]) when is_binary(Key), is_binary(Field) ->
{reply, mnesia_kv:hexists(Key, Field)};
handle([<<"HGETALL">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:hgetall(Key)};
handle([<<"HKEYS">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:hkeys(Key)};
handle([<<"HLEN">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:hlen(Key)};
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% set处理
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle([<<"SADD">>, Key | Members]) when is_binary(Key), is_list(Members) ->
{reply, mnesia_kv:sadd(Key, Members)};
handle([<<"SCARD">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:scard(Key)};
handle([<<"SISMEMBER">>, Key, Member]) when is_binary(Key), is_binary(Member) ->
{reply, mnesia_kv:sismember(Key, Member)};
handle([<<"SDIFF">>, Key1, Key2]) when is_binary(Key1), is_binary(Key2) ->
{reply, mnesia_kv:sdiff(Key1, Key2)};
handle([<<"SINTER">>, Key1, Key2]) when is_binary(Key1), is_binary(Key2) ->
{reply, mnesia_kv:sinter(Key1, Key2)};
handle([<<"SUNION">>, Key1, Key2]) when is_binary(Key1), is_binary(Key2) ->
{reply, mnesia_kv:sunion(Key1, Key2)};
handle([<<"SMEMBERS">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:smembers(Key)};
handle([<<"SPOP">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:spop(Key)};
handle([<<"SRANDMEMBER">>, Key, Count0]) when is_binary(Key), is_binary(Count0) ->
Count = binary_to_integer(Count0),
{reply, mnesia_kv:srandmember(Key, Count)};
handle([<<"SREM">>, Key | Members]) when is_binary(Key), is_list(Members) ->
{reply, mnesia_kv:srem(Key, Members)};
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% List
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
handle([<<"LINDEX">>, Key, Idx0]) when is_binary(Key), is_binary(Idx0) ->
Idx = binary_to_integer(Idx0),
{reply, mnesia_kv:lindex(Key, Idx + 1)};
handle([<<"LLEN">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:llen(Key)};
handle([<<"LPOP">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:lpop(Key)};
handle([<<"RPOP">>, Key]) when is_binary(Key) ->
{reply, mnesia_kv:rpop(Key)};
handle([<<"LPUSH">>, Key | Members]) when is_binary(Key) ->
{reply, mnesia_kv:lpush(Key, Members)};
handle([<<"LPUSHX">>, Key | Members]) when is_binary(Key) ->
{reply, mnesia_kv:lpushx(Key, Members)};
handle([<<"RPUSH">>, Key | Members]) when is_binary(Key) ->
{reply, mnesia_kv:rpush(Key, Members)};
handle([<<"RPUSHX">>, Key | Members]) when is_binary(Key) ->
{reply, mnesia_kv:rpushx(Key, Members)};
handle([<<"LRANGE">>, Key, Start0, End0]) when is_binary(Key), is_binary(Start0), is_binary(End0) ->
Start = binary_to_integer(Start0),
End = binary_to_integer(End0),
{reply, mnesia_kv:lrange(Key, Start + 1, End + 1)};
handle([<<"LREM">>, Key, Count0, Val]) when is_binary(Key), is_binary(Count0), is_binary(Val) ->
Count = binary_to_integer(Count0),
{reply, mnesia_kv:lrem(Key, Count, Val)};
handle([<<"LSET">>, Key, Idx0, Val]) when is_binary(Key), is_binary(Idx0), is_binary(Val) ->
Idx = binary_to_integer(Idx0),
{reply, mnesia_kv:lset(Key, Idx + 1, Val)};
handle([<<"LTRIM">>, Key, Start0, End0]) when is_binary(Key), is_binary(Start0), is_binary(End0) ->
Start = binary_to_integer(Start0),
End = binary_to_integer(End0),
{reply, mnesia_kv:ltrim(Key, Start + 1, End + 1)};
handle([<<"LINSERT">>, Key, Position, Pivot, Val]) when is_binary(Key), Position =:= <<"BEFORE">>; Position =:= <<"AFTER">> ->
{reply, mnesia_kv:linsert(Key, Position, Pivot, Val)};
handle(_) ->
{reply, {error, <<"Unsuported Command">>}}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% map
lists_to_map(L) when is_list(L) ->
lists_to_map(L, #{}).
lists_to_map([], Map) ->
Map;
lists_to_map([K, V | Tail], Map) ->
lists_to_map(Tail, Map#{K => V}).

View File

@ -1,106 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2020, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 10. 12 2020 11:17
%%%-------------------------------------------------------------------
-module(redis_protocol).
-author("licheng5").
%% API
-export([start_link/2, init/2]).
-record(command, {
data = <<>>,
stage = parse_arg_num,
arg_num = 0,
args = []
}).
%%--------------------------------------------------------------------
%% esockd callback
%%--------------------------------------------------------------------
start_link(Transport, Sock) ->
{ok, spawn_link(?MODULE, init, [Transport, Sock])}.
init(Transport, Sock) ->
{ok, NewSock} = Transport:wait(Sock),
loop(Transport, NewSock, #command{data = <<>>, arg_num = 0, args = []}).
loop(Transport, Sock, Command = #command{data = Data}) ->
Transport:setopts(Sock, [{active, once}]),
receive
{tcp, _, Packet} ->
%% , redis基于长连接
NData = <<Data/binary, Packet/binary>>,
case parse(Command#command{data = NData}) of
{ok, #command{args = [Method0|Args]}} ->
Method = string:uppercase(Method0),
lager:debug("[redis_protocol] get a command: ~p", [[Method|Args]]),
{reply, Reply} = redis_handler:handle([Method | Args]),
Transport:send(Sock, encode(Reply)),
%%
loop(Transport, Sock, #command{});
{more_data, NCommand} ->
%%
loop(Transport, Sock, NCommand)
end;
{tcp_error, _} ->
exit(normal);
{tcp_closed, _} ->
exit(normal)
end.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% , ,
parse(Command = #command{stage = parse_arg_num, data = <<$*, Rest/binary>>}) ->
[ArgNum0, ArgBin] = binary:split(Rest, <<$\r, $\n>>),
ArgNum = binary_to_integer(ArgNum0),
parse(Command#command{arg_num = ArgNum, data = ArgBin, stage = parse_arg});
%%
parse(Command = #command{stage = parse_arg, args = Args, arg_num = 0, data = <<>>}) ->
{ok, Command#command{args = lists:reverse(Args)}};
parse(Command = #command{stage = parse_arg, args = Args, arg_num = ArgNum, data = ArgBin}) ->
case binary:split(ArgBin, <<$\r, $\n>>) of
[<<"$", ArgLen0/binary>>, RestArgBin] ->
ArgLen = binary_to_integer(ArgLen0),
case RestArgBin of
<<Arg:ArgLen/binary, $\r, $\n, RestArgBin1/binary>> ->
parse(Command#command{arg_num = ArgNum - 1, args = [Arg | Args], data = RestArgBin1});
_ ->
{more_data, Command}
end;
_ ->
{more_data, Command}
end.
%% redis数据返回格式化
-spec encode(tuple() | binary() | list()) -> iolist().
encode({single_line, Arg}) when is_binary(Arg) ->
[<<$+>>, Arg, <<$\r, $\n>>];
encode({error, Arg}) when is_binary(Arg) ->
[<<$->>, Arg, <<$\r, $\n>>];
encode(Arg) when is_integer(Arg) ->
[<<$:>>, integer_to_list(Arg), <<$\r, $\n>>];
encode(Arg) when is_binary(Arg) ->
[<<$$>>, integer_to_list(iolist_size(Arg)), <<$\r, $\n>>, Arg, <<$\r, $\n>>];
encode(Args) when is_list(Args) ->
ArgCount = [<<$*>>, integer_to_list(length(Args)), <<$\r, $\n>>],
ArgsBin = lists:map(fun encode/1, lists:map(fun to_binary/1, Args)),
[ArgCount, ArgsBin].
%% binary
to_binary(X) when is_list(X) ->
unicode:characters_to_binary(X);
to_binary(X) when is_atom(X) ->
list_to_binary(atom_to_list(X));
to_binary(X) when is_binary(X) ->
X;
to_binary(X) when is_integer(X) ->
list_to_binary(integer_to_list(X)).

View File

@ -34,6 +34,16 @@
{<<"test">>, <<"iot2023">>} {<<"test">>, <<"iot2023">>}
]}, ]},
%% 配置中电的数据转发, mqtt协议
{zhongdian, [
{host, "39.98.184.67"},
{port, 1883},
{username, "test"},
{password, "test1234"},
{topic, "CET/NX/upload"},
{qos, 2}
]},
{pools, [ {pools, [
%% mysql连接池配置 %% mysql连接池配置
{mysql_iot, {mysql_iot,
@ -50,13 +60,22 @@
] ]
}, },
%% redis连接池
{redis_pool,
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
[
{host, "127.0.0.1"},
{port, 26379}
]
},
%% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234 %% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234
{influx_pool, {influx_pool,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
[ [
{host, "39.98.184.67"}, {host, "39.98.184.67"},
{port, 8086}, {port, 8086},
{token, <<"0DGZaV_wk6OCt_Bazk5W_9L3Zo-y-bGzwPS6NDGAqy7iSzPVTgC0paINNJ6V3y3Eo_JYfWAHhXI8OZTDnW3IRQ==">>} {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
] ]
} }

View File

@ -18,22 +18,21 @@
{port, 18080} {port, 18080}
]}, ]},
%% 目标服务器地址
%{emqx_server, [
% {host, {39, 98, 184, 67}},
% {port, 1883},
% {tcp_opts, []},
% {username, "test"},
% {password, "test1234"},
% {keepalive, 86400},
% {retry_interval, 5}
%]},
%% 权限检验时的预埋token %% 权限检验时的预埋token
{pre_tokens, [ {pre_tokens, [
{<<"test">>, <<"iot2023">>} {<<"test">>, <<"iot2023">>}
]}, ]},
%% 配置中电的数据转发, mqtt协议
{zhongdian, [
{host, "172.30.6.161"},
{port, 1883},
{username, "admin"},
{password, "123456"},
{topic, "CET/NX/upload"},
{qos, 2}
]},
{pools, [ {pools, [
%% mysql连接池配置 %% mysql连接池配置
{mysql_iot, {mysql_iot,
@ -50,6 +49,15 @@
] ]
}, },
%% redis连接池
{redis_pool,
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
[
{host, "172.19.0.7"},
{port, 26379}
]
},
%% influxdb数据库配置 %% influxdb数据库配置
{influx_pool, {influx_pool,
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],

View File

@ -7,6 +7,7 @@
{esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.7.3"}}}, {esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.7.3"}}},
{jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}},
{mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}}, {mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}},
{eredis, ".*", {git, "https://github.com/wooga/eredis.git", {tag, "v1.2.0"}}},
{parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}},
{lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}}
]}. ]}.

View File

@ -8,6 +8,10 @@
{git,"https://github.com/ninenines/cowlib", {git,"https://github.com/ninenines/cowlib",
{ref,"cc04201c1d0e1d5603cd1cde037ab729b192634c"}}, {ref,"cc04201c1d0e1d5603cd1cde037ab729b192634c"}},
1}, 1},
{<<"eredis">>,
{git,"https://github.com/wooga/eredis.git",
{ref,"9ad91f149310a7d002cb966f62b7e2c3330abb04"}},
0},
{<<"esockd">>, {<<"esockd">>,
{git,"https://github.com/emqx/esockd.git", {git,"https://github.com/emqx/esockd.git",
{ref,"d9ce4024cc42a65e9a05001997031e743442f955"}}, {ref,"d9ce4024cc42a65e9a05001997031e743442f955"}},