From 26c55cc21f7c427f55d838f3194f2285b6fb6701 Mon Sep 17 00:00:00 2001 From: anlicheng Date: Wed, 20 Sep 2023 18:47:01 +0800 Subject: [PATCH] =?UTF-8?q?=E7=AE=80=E5=8C=96endpoint=E6=A6=82=E5=BF=B5?= =?UTF-8?q?=EF=BC=8C=E6=9B=BF=E6=8D=A2=E6=8E=89redis?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 56 -- apps/iot/src/emqtt/emqtt.erl | 3 +- .../iot/src/http_handler/endpoint_handler.erl | 184 ---- apps/iot/src/iot.app.src | 1 + apps/iot/src/iot_app.erl | 47 +- apps/iot/src/iot_device.erl | 28 +- apps/iot/src/iot_endpoint.erl | 358 ------- apps/iot/src/iot_endpoint_sup.erl | 53 - apps/iot/src/iot_host.erl | 26 +- apps/iot/src/iot_router.erl | 23 +- apps/iot/src/iot_sup.erl | 21 +- apps/iot/src/iot_zd_endpoint.erl | 232 +++++ apps/iot/src/mnesia/mnesia_endpoint.erl | 157 --- apps/iot/src/mnesia/mnesia_kv.erl | 949 ------------------ apps/iot/src/mnesia/mnesia_queue.erl | 50 +- apps/iot/src/mocker/iot_mock.erl | 112 +-- apps/iot/src/redis/redis_client.erl | 21 + apps/iot/src/redis/redis_handler.erl | 197 ---- apps/iot/src/redis/redis_protocol.erl | 106 -- config/sys-dev.config | 21 +- config/sys-prod.config | 30 +- rebar.config | 1 + rebar.lock | 4 + 23 files changed, 361 insertions(+), 2319 deletions(-) delete mode 100644 apps/iot/src/http_handler/endpoint_handler.erl delete mode 100644 apps/iot/src/iot_endpoint.erl delete mode 100644 apps/iot/src/iot_endpoint_sup.erl create mode 100644 apps/iot/src/iot_zd_endpoint.erl delete mode 100644 apps/iot/src/mnesia/mnesia_endpoint.erl delete mode 100644 apps/iot/src/mnesia/mnesia_kv.erl create mode 100755 apps/iot/src/redis/redis_client.erl delete mode 100644 apps/iot/src/redis/redis_handler.erl delete mode 100644 apps/iot/src/redis/redis_protocol.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 8936196..e594496 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -53,62 +53,6 @@ type :: atom() }). --record(http_endpoint, { - url = <<>> :: binary(), - pool_size = 10 :: integer() -}). - --record(ws_endpoint, { - url = <<>> :: binary() -}). - --record(mqtt_endpoint, { - host = <<>> :: binary(), - port = 0 :: integer(), - username = <<>> :: binary(), - password = <<>> :: binary(), - topic = <<>> :: binary(), - qos = 0 :: integer() -}). - --record(kafka_endpoint, { - username = <<>> :: binary(), - password = <<>> :: binary(), - bootstrap_servers = [] :: [binary()], - topic = <<>> :: binary() -}). - --record(mysql_endpoint, { - host = <<>> :: binary(), - port = 0 :: integer(), - username = <<>> :: binary(), - password = <<>> :: binary(), - database = <<>> :: binary(), - table_name = <<>> :: binary(), - pool_size = 10 :: integer() -}). - -%% 对端配置 --record(endpoint, { - %% 不同的对端名字要唯一 - name = <<>> :: binary(), - %% 标题描述 - title = <<>> :: binary(), - %% 匹配规则, 固定了满足点位信息的前缀匹配的数据的转发规则 - matcher = <<>> :: binary(), - mapper = <<>> :: binary(), - %% 数据转换规则,基于 - %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}]) - %% fun(LocationCode :: binary(), Fields :: [{<<"key">> => <<>>, <<"value">> => <<>>, <<"unit">> => <<>>}], Timestamp :: integer()) - mapper_fun = fun(_, Fields) -> Fields end :: fun(), - %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} - config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{}, - %% 更新时间 - updated_at = 0 :: integer(), - %% 创建时间 - created_at = 0 :: integer() -}). - %% id生成器 -record(id_generator, { tab :: atom(), diff --git a/apps/iot/src/emqtt/emqtt.erl b/apps/iot/src/emqtt/emqtt.erl index 999d27a..586c828 100644 --- a/apps/iot/src/emqtt/emqtt.erl +++ b/apps/iot/src/emqtt/emqtt.erl @@ -904,7 +904,7 @@ handle_event({call, From}, stop, _StateName, _State) -> {stop_and_reply, normal, [{reply, From, ok}]}; handle_event(info, {TcpOrSsL, _Sock, Data}, _StateName, State) when TcpOrSsL =:= tcp; TcpOrSsL =:= ssl -> - lager:debug("[emqtt] RECV Data: ~p, client_id: ~p", [Data, State#state.clientid]), + %lager:debug("[emqtt] RECV Data: ~p, client_id: ~p", [Data, State#state.clientid]), process_incoming(Data, [], run_sock(State)); handle_event(info, {Error, _Sock, Reason}, _StateName, State) when Error =:= tcp_error; Error =:= ssl_error -> @@ -1214,7 +1214,6 @@ send(Msg, State) when is_record(Msg, mqtt_msg) -> send(Packet, State = #state{socket = Sock, proto_ver = Ver}) when is_record(Packet, mqtt_packet) -> Data = emqtt_frame:serialize(Packet, Ver), - lager:debug("[emqtt] SEND Data: ~1000p, client_id: ~p", [Packet, State#state.clientid]), case emqtt_sock:send(Sock, Data) of ok -> {ok, bump_last_packet_id(State)}; diff --git a/apps/iot/src/http_handler/endpoint_handler.erl b/apps/iot/src/http_handler/endpoint_handler.erl deleted file mode 100644 index b0caddc..0000000 --- a/apps/iot/src/http_handler/endpoint_handler.erl +++ /dev/null @@ -1,184 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2020, -%%% @doc -%%% -%%% @end -%%% Created : 26. 4月 2020 3:36 下午 -%%%------------------------------------------------------------------- --module(endpoint_handler). --author("licheng5"). --include("iot.hrl"). - -%% API --export([handle_request/4]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 可以根据name进行过滤 -handle_request("GET", "/endpoint/all", GetParams, _) -> - Endpoints0 = mnesia_endpoint:get_all_endpoints(), - Endpoints = case maps:is_key(<<"name">>, GetParams) of - true -> - Name = maps:get(<<"name">>, GetParams), - lists:filter(fun(#endpoint{name = Name0}) -> Name == Name0 end, Endpoints0); - false -> - Endpoints0 - end, - EndpointInfos = lists:map(fun mnesia_endpoint:to_map/1, Endpoints), - - {ok, 200, iot_util:json_data(EndpointInfos)}; - -%% 可以根据name进行过滤 -handle_request("GET", "/endpoint/info", #{<<"name">> := Name}, _) -> - case mnesia_endpoint:get_endpoint(Name) of - {ok, Endpoint} -> - EndpointInfo = mnesia_endpoint:to_map(Endpoint), - {ok, 200, iot_util:json_data(EndpointInfo)}; - undefined -> - {ok, 200, iot_util:json_error(404, <<"not found">>)} - end; - -%% 重新加载对应的主机信息 -handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) -> - RequiredFields = [<<"name">>, <<"title">>, <<"matcher">>, <<"mapper">>, <<"config">>], - iot_util:assert(lists:all(fun(Key) -> maps:is_key(Key, Params) end, RequiredFields), <<"missed required param">>), - - case mnesia_endpoint:get_endpoint(Name) of - undefined -> - Endpoint0 = make_endpoint(maps:to_list(Params), #endpoint{}), - Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()}, - ok = mnesia_endpoint:insert(Endpoint), - %% 重新启动endpoint - {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint), - - {ok, 200, iot_util:json_data(<<"success">>)}; - {ok, _} -> - {ok, 200, iot_util:json_error(404, <<"endpoint name exists">>)} - end; - -%% 更新规则 -handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) when is_binary(Name) -> - case mnesia_endpoint:get_endpoint(Name) of - undefined -> - lager:debug("[endpoint_handler] update endpoint, name: ~p not found", [Name]), - {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - {ok, Endpoint} -> - Params1 = maps:remove(<<"name">>, Params), - NEndpoint = make_endpoint(maps:to_list(Params1), Endpoint), - NEndpoint1 = NEndpoint#endpoint{updated_at = iot_util:timestamp_of_seconds()}, - ok = mnesia_endpoint:insert(NEndpoint1), - - case iot_endpoint:get_pid(Name) of - undefined -> - {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint); - Pid when is_pid(Pid) -> - iot_endpoint:reload(Pid, Endpoint) - end, - - {ok, 200, iot_util:json_data(<<"success">>)} - end; - -%% 删除对应的主机信息 -handle_request("POST", "/endpoint/delete", _, #{<<"name">> := Name}) when is_binary(Name) -> - case mnesia_endpoint:delete(Name) of - ok -> - case iot_endpoint:get_pid(Name) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; - Pid -> - iot_endpoint:clean_up(Pid), - iot_endpoint_sup:delete_endpoint(Name), - - {ok, 200, iot_util:json_data(<<"success">>)} - end; - {error, Reason} -> - lager:debug("[endpoint_handler] delete endpoint id: ~p, get error is: ~p", [Name, Reason]), - {ok, 200, iot_util:json_error(404, <<"error">>)} - end; - -handle_request(_, Path, _, _) -> - Path1 = list_to_binary(Path), - {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - --spec make_endpoint(Params :: list(), #endpoint{}) -> #endpoint{}. -make_endpoint([], Endpoint) -> - Endpoint; -make_endpoint([{<<"name">>, Name} | Params], Endpoint) when is_binary(Name) andalso Name /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{name = Name}); -make_endpoint([{<<"name">>, _} | _], _) -> - throw(<<"invalid name">>); -make_endpoint([{<<"title">>, Title} | Params], Endpoint) when is_binary(Title) andalso Title /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{title = Title}); -make_endpoint([{<<"title">>, _} | _], _) -> - throw(<<"invalid title">>); -make_endpoint([{<<"matcher">>, Matcher} | Params], Endpoint) when is_binary(Matcher) andalso Matcher /= <<>> -> - %% 检测matcher是否是合法的正则表达式 - case re:compile(Matcher) of - {ok, _} -> - make_endpoint(Params, Endpoint#endpoint{matcher = Matcher}); - {error, _} -> - throw(<<"invalid regexp">>) - end; -make_endpoint([{<<"matcher">>, _} | _], _) -> - throw(<<"invalid matcher">>); -make_endpoint([{<<"mapper">>, Mapper} | Params], Endpoint) when is_binary(Mapper) andalso Mapper /= <<>> -> - %% 检测mapper是否是合理的erlang表达式 - case catch iot_util:parse_mapper(Mapper) of - {ok, MapperFun} -> - make_endpoint(Params, Endpoint#endpoint{mapper = Mapper, mapper_fun = MapperFun}); - error -> - throw(<<"invalid mapper">>); - Error -> - lager:debug("[endpoint_handler] parse_mapper get error: ~p", [Error]), - throw(<<"invalid mapper">>) - end; -make_endpoint([{<<"mapper">>, _} | _], _) -> - throw(<<"invalid mapper">>); - -make_endpoint([{<<"config">>, #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}}); -make_endpoint([{<<"config">>, #{<<"protocol">> := <<"https">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}}); -make_endpoint([{<<"config">>, #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = #ws_endpoint{url = Url}}); -make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint) -> - iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>), - iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>), - iot_util:assert(is_list(BootstrapServers) andalso length(BootstrapServers) > 0, <<"bootstrap_servers is invalid">>), - iot_util:assert(is_binary(Topic) andalso Topic /= <<>>, <<"topic is invalid">>), - - make_endpoint(Params, Endpoint#endpoint{config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}}); - -make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint) -> - iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>), - iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>), - iot_util:assert(is_binary(Topic) andalso Topic /= <<>>, <<"topic is invalid">>), - iot_util:assert(is_binary(Host) andalso Host /= <<>>, <<"host is invalid">>), - iot_util:assert(is_integer(Port) andalso Port > 0, <<"port is invalid">>), - iot_util:assert(Qos == 0 orelse Qos == 1 orelse Qos == 2, <<"qos is invalid">>), - - make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}); - -%% mysql的支持 -make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mysql">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}}} | Params], Endpoint) -> - iot_util:assert(is_binary(Username) andalso Username /= <<>>, <<"username is invalid">>), - iot_util:assert(is_binary(Password) andalso Password /= <<>>, <<"password is invalid">>), - iot_util:assert(is_binary(Host) andalso Host /= <<>>, <<"host is invalid">>), - iot_util:assert(is_integer(Port) andalso Port > 0, <<"port is invalid">>), - iot_util:assert(is_binary(Database) andalso Database /= <<>>, <<"database is invalid">>), - iot_util:assert(is_binary(TableName) andalso TableName /= <<>>, <<"table_name is invalid">>), - - make_endpoint(Params, Endpoint#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}}); - -make_endpoint([{<<"config">>, Config} | _], _) -> - lager:warning("[endpoint_handler] unsupport config: ~p", [Config]), - throw(<<"invalid config">>); -make_endpoint([{Key, _} | _], _) -> - throw(<<"unsupport param: ", Key/binary>>). diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 159b039..d35662d 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -6,6 +6,7 @@ {applications, [ sync, + eredis, ranch, cowboy, lager, diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 10beffa..e734133 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -23,8 +23,6 @@ start(_StartType, _StartArgs) -> start_udp_server(), %% 启动http服务 start_http_server(), - %% 启动redis服务器 - start_redis_server(), %% 启动连接池 ok = hackney_pool:start_pool(influx_pool, [{timeout, 150000}, {max_connections, 100}]), @@ -48,7 +46,6 @@ start_http_server() -> {'_', [ {"/host/[...]", http_protocol, [host_handler]}, {"/device/[...]", http_protocol, [device_handler]}, - {"/endpoint/[...]", http_protocol, [endpoint_handler]}, {"/test/[...]", http_protocol, [test_handler]}, {"/ws", ws_channel, []} ]} @@ -65,29 +62,6 @@ start_http_server() -> lager:debug("[iot_app] the http server start at: ~p, pid is: ~p", [Port, Pid]). -start_redis_server() -> - {ok, Props} = application:get_env(iot, redis_server), - Acceptors = proplists:get_value(acceptors, Props, 50), - MaxConnections = proplists:get_value(max_connections, Props, 10240), - Backlog = proplists:get_value(backlog, Props, 1024), - Port = proplists:get_value(port, Props), - - TransOpts = [ - {tcp_options, [ - binary, - {reuseaddr, true}, - {active, false}, - {packet, raw}, - {nodelay, false}, - {backlog, Backlog} - ]}, - {acceptors, Acceptors}, - {max_connections, MaxConnections} - ], - {ok, _} = esockd:open(redis_server, Port, TransOpts, {redis_protocol, start_link, []}), - - lager:debug("[iot_app] the redis server start at: ~p", [Port]). - start_udp_server() -> {ok, Props} = application:get_env(iot, udp_server), Port = proplists:get_value(port, Props), @@ -97,10 +71,11 @@ start_udp_server() -> %% 启动内存数据库 start_mnesia() -> + QueueTab = 'queue_data:zhongdian', %% 启动数据库 ok = mnesia:start(), Tables = mnesia:system_info(tables), - LoadTables = [id_generator, kv, endpoint], + LoadTables = [id_generator, QueueTab], case lists:all(fun(Tab) -> lists:member(Tab, Tables) end, LoadTables) of true -> %% 加载必须等待的数据库表 @@ -117,7 +92,7 @@ start_mnesia() -> %% 创建数据库表 - %% 缓存表 + %% id生成器 mnesia:create_table(id_generator, [ {attributes, record_info(fields, id_generator)}, {record_name, id_generator}, @@ -125,18 +100,10 @@ start_mnesia() -> {type, ordered_set} ]), - %% 缓存表 - mnesia:create_table(kv, [ - {attributes, record_info(fields, kv)}, - {record_name, kv}, - {disc_copies, [node()]}, - {type, ordered_set} - ]), - - %% 对端配置表 - mnesia:create_table(endpoint, [ - {attributes, record_info(fields, endpoint)}, - {record_name, endpoint}, + %% 数据转发缓存表 + mnesia:create_table(QueueTab, [ + {attributes, record_info(fields, north_data)}, + {record_name, north_data}, {disc_copies, [node()]}, {type, ordered_set} ]) diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 779aebb..1b04ae0 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -67,9 +67,7 @@ auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. start_link(Name, DeviceUUID) when is_atom(Name), is_binary(DeviceUUID) -> - gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []); -start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) -> - gen_statem:start_link({local, Name}, ?MODULE, [DeviceInfo], []). + gen_statem:start_link({local, Name}, ?MODULE, [DeviceUUID], []). %%%=================================================================== %%% gen_statem callbacks @@ -80,26 +78,8 @@ start_link(Name, DeviceInfo) when is_atom(Name), is_map(DeviceInfo) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([DeviceUUID]) when is_binary(DeviceUUID) -> - case device_bo:get_device_by_uuid(DeviceUUID) of - {ok, DeviceInfo} -> - init0(DeviceInfo); - undefined -> - ignore - end; -init([DeviceInfo]) when is_map(DeviceInfo) -> - init0(DeviceInfo). -init0(#{<<"device_uuid">> := DeviceUUID, <<"status">> := Status, <<"authorize_status">> := AuthorizeStatus}) -> - case AuthorizeStatus =:= ?DEVICE_AUTH_AUTHED of - true -> - lager:debug("[iot_device] started device: ~p, state_name: ~p, status: ~p", [DeviceUUID, ?STATE_ACTIVATED, Status]), - {ok, ?STATE_ACTIVATED, #state{device_uuid = DeviceUUID, status = Status}}; - false -> - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), - report_event(DeviceUUID, ?DEVICE_OFFLINE), - - lager:debug("[iot_device] started device: ~p, state_name: ~p, status: ~p", [DeviceUUID, ?STATE_DENIED, ?DEVICE_OFFLINE]), - {ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID, status = ?DEVICE_OFFLINE}} - end. + gen_server:cast(self(), reload), + {ok, ?STATE_DENIED, #state{device_uuid = DeviceUUID}}. %% @private %% @doc This function is called by a gen_statem when it needs to find out @@ -193,5 +173,5 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt <<"name">> => <<"设备状态"/utf8>>, <<"timestamp">> => Timestamp }], - iot_router:route(DeviceUUID, FieldsList, Timestamp), + iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), lager:debug("[iot_host] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). \ No newline at end of file diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl deleted file mode 100644 index ece12ad..0000000 --- a/apps/iot/src/iot_endpoint.erl +++ /dev/null @@ -1,358 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 06. 7月 2023 12:02 -%%%------------------------------------------------------------------- --module(iot_endpoint). --author("aresei"). --include("iot.hrl"). - --behaviour(gen_statem). - -%% API --export([start_link/2]). --export([get_name/1, get_pid/1, forward/4, get_stat/1, reload/2, clean_up/1, get_mapper_fun/1]). - -%% gen_statem callbacks --export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). - -%% 消息重发间隔 --define(RETRY_INTERVAL, 5000). - --record(state, { - endpoint :: #endpoint{}, - mp, - postman_pid :: undefined | pid(), - %% 队列数据库名, 写入到对端都有可能失败,因此缓存队列需要自己维护 - tab_name :: atom(), - %% 当前数据的游标, #north_data的id - cursor = 0 :: integer(), - %% 定时器 - timer_map = #{}, - %% 窗口大小,允许最大的未确认消息数 - window_size = 10, - %% 未确认的消息数 - flight_num = 0, - %% 记录成功处理的消息数 - acc_num = 0 -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec get_name(Name :: binary() | #endpoint{}) -> atom(). -get_name(#endpoint{name = Name}) when is_binary(Name) -> - get_name(Name); -get_name(EndpointName) when is_binary(EndpointName) -> - binary_to_atom(<<"iot_endpoint:", EndpointName/binary>>). - --spec get_pid(Name :: binary()) -> undefined | pid(). -get_pid(Name) when is_binary(Name) -> - whereis(get_name(Name)). - --spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). -forward(undefined, _, _, _) -> - ok; -forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> - gen_statem:cast(Pid, {forward, LocationCode, Fields, Timestamp}). - -reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> - gen_statem:cast(Pid, {reload, NEndpoint}). - --spec get_stat(Pid :: pid()) -> {ok, Stat :: #{}}. -get_stat(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_stat, 5000). - --spec clean_up(Pid :: pid()) -> ok. -clean_up(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, clean_up, 5000). - --spec get_mapper_fun(Pid :: pid()) -> fun(). -get_mapper_fun(Pid) when is_pid(Pid) -> - gen_statem:call(Pid, get_mapper_fun). - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. -start_link(Name, Endpoint = #endpoint{}) -> - gen_statem:start_link({local, Name}, ?MODULE, [Endpoint], []). - -%%%=================================================================== -%%% gen_statem callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. -init([Endpoint = #endpoint{name = Name, matcher = Regexp}]) -> - erlang:process_flag(trap_exit, true), - - %% 编译正则表达式 - {ok, MP} = re:compile(Regexp), - %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 - erlang:start_timer(0, self(), create_postman), - try - %% 创建自己的队列数据库表 - TabName = binary_to_atom(<<"queue_data:", Name/binary>>), - mnesia_queue:ensure_queue(TabName), - - {ok, disconnected, #state{endpoint = Endpoint, mp = MP, tab_name = TabName, postman_pid = undefined}} - catch _:Error -> - lager:warning("[iot_endpoint] endpoint: ~p, init get error: ~p, ignore", [Name, Error]), - ignore - end. - -%% @private -%% @doc This function is called by a gen_statem when it needs to find out -%% the callback mode of the callback module. -callback_mode() -> - handle_event_function. - -%% @private -%% @doc There should be one instance of this function for each possible -%% state name. If callback_mode is state_functions, one of these -%% functions is called when gen_statem receives and event from -%% call/2, cast/2, or as a normal process message. - -%% 重新加载新的终端配置 -handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint = Endpoint}) -> - lager:warning("[iot_endpoint] state_name: disconnected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]), - {keep_state, State#state{endpoint = NEndpoint}}; - -handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, timer_map = TimerMap, postman_pid = PostmanPid}) -> - lager:debug("[iot_endpoint] state_name: connected, reload endpoint, old: ~p, new: ~p", [Endpoint, NEndpoint]), - case mnesia_endpoint:config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of - true -> - lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]), - {keep_state, State#state{endpoint = NEndpoint}}; - false -> - %% 解除和postman的link关系 - unlink(PostmanPid), - %% 关闭postman进程 - catch PostmanPid ! stop, - %% 未确认的消息需要暂存 - lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), - %% 重新建立新的postman - erlang:start_timer(0, self(), create_postman), - - {next_state, disconnected, State#state{endpoint = NEndpoint, timer_map = maps:new(), postman_pid = undefined}} - end; - -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{mp = MP, tab_name = TabName, window_size = WindowSize, flight_num = FlightNum, endpoint = #endpoint{name = Name}}) -> - case re:run(LocationCode, MP, [{capture, all, list}]) of - nomatch -> - {keep_state, State}; - {match, _} -> - lager:debug("[iot_endpoint] name: ~p, match location_code: ~p", [Name, LocationCode]), - mnesia_queue:insert(TabName, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), - %% 避免不必要的内部消息 - Actions = case StateName =:= connected andalso FlightNum < WindowSize of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State, Actions} - end; - -%% 触发读取下一条数据 -handle_event(info, fetch_next, disconnected, State = #state{endpoint = #endpoint{name = Name}}) -> - lager:debug("[iot_endpoint] fetch_next endpoint: ~p, postman offline, data in queue", [Name]), - {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{flight_num = FlightNum, window_size = WindowSize}) when FlightNum >= WindowSize -> - {keep_state, State}; -handle_event(info, fetch_next, connected, State = #state{tab_name = TabName, cursor = Cursor, endpoint = #endpoint{name = Name}, timer_map = TimerMap, flight_num = FlightNum}) -> - case mnesia_queue:dirty_fetch_next(TabName, Cursor) of - {ok, NCursor, NorthData = #north_data{id = Id}} -> - lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), - case do_post(NorthData, State) of - error -> - {keep_state, State}; - {ok, TimerRef} -> - {keep_state, State#state{cursor = NCursor, timer_map = maps:put(Id, TimerRef, TimerMap), flight_num = FlightNum + 1}} - end; - '$end_of_table' -> - {keep_state, State} - end; - -%% 收到确认消息 -handle_event(info, {ack, Id}, StateName, State = #state{tab_name = TabName, endpoint = #endpoint{name = Name}, timer_map = TimerMap, acc_num = AccNum, flight_num = FlightNum}) -> - ok = mnesia_queue:delete(TabName, Id), - lager:debug("[iot_endpoint] endpoint: ~p, get ack: ~p, delete from mnesia", [Name, Id]), - Actions = case StateName =:= connected of - true -> [{next_event, info, fetch_next}]; - false -> [] - end, - {keep_state, State#state{timer_map = remove_timer(Id, TimerMap), acc_num = AccNum + 1, flight_num = FlightNum - 1}, Actions}; - -%% 收到重发过期请求 -handle_event(info, {timeout, _, {repost_ticker, NorthData = #north_data{id = Id}}}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap}) -> - lager:debug("[iot_endpoint] endpoint: ~p, repost data: ~p", [Name, Id]), - case do_post(NorthData, State) of - error -> - {keep_state, State}; - {ok, TimerRef} -> - {keep_state, State#state{timer_map = maps:put(Id, TimerRef, TimerMap)}} - end; - -%% 离线时,忽略超时逻辑 -handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> - {keep_state, State}; - -handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{endpoint = Endpoint = #endpoint{name = Name, config = Config}, window_size = WindowSize}) -> - lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Name]), - try - {ok, PostmanPid} = create_postman(Endpoint), - %% 最多允许window_size - Actions = lists:map(fun(_) -> {next_event, info, fetch_next} end, lists:seq(1, WindowSize)), - {next_state, connected, State#state{endpoint = Endpoint, postman_pid = PostmanPid, timer_map = maps:new(), flight_num = 0}, Actions} - catch _:Error -> - lager:warning("[iot_endpoint] endpoint: ~p, config: ~p, create postman get error: ~p", [Name, Config, Error]), - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), - - {keep_state, State#state{endpoint = Endpoint, postman_pid = undefined}} - end; - -%% 删除时需要清理 -handle_event({call, From}, clean_up, _, State = #state{tab_name = TabName}) -> - mnesia:delete_table(TabName), - {keep_state, State, [{reply, From, ok}]}; - -handle_event({call, From}, get_mapper_fun, _, State = #state{endpoint = #endpoint{mapper_fun = F}}) -> - {keep_state, State, [{reply, From, F}]}; - -%% 获取当前统计信息 -handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum, tab_name = TabName}) -> - Stat = #{ - <<"acc_num">> => AccNum, - <<"queue_num">> => mnesia_queue:table_size(TabName), - <<"state_name">> => atom_to_binary(StateName) - }, - {keep_state, State, [{reply, From, Stat}]}; - -%% postman进程挂掉时,重新建立新的 -handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{endpoint = #endpoint{name = Name}, timer_map = TimerMap, postman_pid = PostmanPid}) -> - lager:warning("[iot_endpoint] endpoint: ~p, postman exited with reason: ~p", [Name, Reason]), - lists:foreach(fun({_, TimerRef}) -> catch erlang:cancel_timer(TimerRef) end, maps:to_list(TimerMap)), - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), - - {next_state, disconnected, State#state{timer_map = maps:new(), postman_pid = undefined}}; - -%% @private -%% @doc If callback_mode is handle_event_function, then whenever a -%% gen_statem receives an event from call/2, cast/2, or as a normal -%% process message, this function is called. -handle_event(EventType, Event, StateName, State) -> - lager:warning("[iot_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), - {keep_state, State}. - -%% @private -%% @doc This function is called by a gen_statem when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_statem terminates with -%% Reason. The return value is ignored. -terminate(Reason, _StateName, #state{endpoint = #endpoint{name = Name}}) -> - lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Name, Reason]), - ok. - -%% @private -%% @doc Convert process state when code is changed -code_change(_OldVsn, StateName, State = #state{}, _Extra) -> - {ok, StateName, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - --spec remove_timer(Id :: integer(), TimerMap :: #{}) -> NTimerMap :: #{}. -remove_timer(Id, TimerMap) when is_integer(Id), is_map(TimerMap) -> - case maps:take(Id, TimerMap) of - error -> - TimerMap; - {TimerRef, NTimerMap} -> - is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), - NTimerMap - end. - -%% 对http和https协议的支持 -create_postman(#endpoint{config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> - WorkerArgs = [{url, Url}], - broker_postman:start_link(http_postman, WorkerArgs, PoolSize); - -%% 对mqtt协议的支持, 只需要建立单个链接 -create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> - Node = atom_to_binary(node()), - ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>, - Opts = [ - {clientid, ClientId}, - {host, binary_to_list(Host)}, - {port, Port}, - {tcp_opts, []}, - {username, binary_to_list(Username)}, - {password, binary_to_list(Password)}, - {keepalive, 86400}, - {auto_ack, true}, - {connect_timeout, 5000}, - {proto_ver, v5}, - {retry_interval, 5000} - ], - - mqtt_postman:start_link(Opts, Topic, Qos); - -%% 对mysql协议的支持 -create_postman(#endpoint{config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName, pool_size = PoolSize}}) -> - WorkerArgs = [ - {mysql_opts, [ - {host, binary_to_list(Host)}, - {port, Port}, - {user, binary_to_list(Username)}, - {password, binary_to_list(Password)}, - {keep_alive, true}, - {database, binary_to_list(Database)}, - {queries, [<<"set names utf8">>]} - ]}, - {table, TableName} - ], - broker_postman:start_link(mysql_postman, WorkerArgs, PoolSize); -create_postman(#endpoint{}) -> - throw(<<"not supported">>). - --spec do_post(NorthData :: #north_data{}, State :: #state{}) -> error | {ok, TimerRef :: reference()}. -do_post(NorthData = #north_data{id = Id}, #state{postman_pid = PostmanPid, tab_name = TabName, endpoint = #endpoint{name = Name, mapper_fun = MapperFun}}) -> - lager:debug("[iot_endpoint] endpoint: ~p, fetch_next success, north data is: ~p", [Name, NorthData]), - case safe_invoke_mapper(MapperFun, NorthData) of - {ok, Body} -> - PostmanPid ! {post, self(), make_post_data(NorthData, Body)}, - %% 重发机制, 在发送的过程中mapper可能会改变 - TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), - {ok, TimerRef}; - {error, Error} -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper get error: ~p, message discard", [Name, Error]), - mnesia_queue:delete(TabName, Id), - error; - ignore -> - lager:debug("[iot_endpoint] forward endpoint: ~p, mapper ignore, messge discard", [Name]), - mnesia_queue:delete(TabName, Id), - error - end. - --spec safe_invoke_mapper(MapperFun :: fun(), NorthData :: #north_data{}) -> - {ok, Body :: any()} | ignore | {error, Reason :: any()}. -safe_invoke_mapper(MapperFun, #north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}) -> - try - if - is_function(MapperFun, 2) -> - MapperFun(LocationCode, Fields); - is_function(MapperFun, 3) -> - MapperFun(LocationCode, Fields, Timestamp) - end - catch _:Error -> - {error, Error} - end. - --spec make_post_data(NorthData :: #north_data{}, Body :: any()) -> PostData :: #post_data{}. -make_post_data(#north_data{id = Id, location_code = LocationCode}, Body) -> - #post_data{id = Id, location_code = LocationCode, body = Body}. \ No newline at end of file diff --git a/apps/iot/src/iot_endpoint_sup.erl b/apps/iot/src/iot_endpoint_sup.erl deleted file mode 100644 index 2c79c1d..0000000 --- a/apps/iot/src/iot_endpoint_sup.erl +++ /dev/null @@ -1,53 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% @end -%%%------------------------------------------------------------------- --module(iot_endpoint_sup). --include("iot.hrl"). - --behaviour(supervisor). - --export([start_link/0, init/1, delete_endpoint/1, ensured_endpoint_started/1, stat/0]). - -start_link() -> - supervisor:start_link({local, ?MODULE}, ?MODULE, []). - -init([]) -> - mnesia_endpoint:import_static_endpoints(), - Specs = lists:map(fun child_spec/1, mnesia_endpoint:get_all_endpoints()), - - {ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}. - --spec ensured_endpoint_started(Name :: #endpoint{}) -> {ok, Pid :: pid()} | {error, Reason :: any()}. -ensured_endpoint_started(Endpoint = #endpoint{}) -> - case supervisor:start_child(?MODULE, child_spec(Endpoint)) of - {ok, Pid} when is_pid(Pid) -> - {ok, Pid}; - {error, {'already_started', Pid}} when is_pid(Pid) -> - {ok, Pid}; - {error, Error} -> - {error, Error} - end. - -stat() -> - Children = supervisor:which_children(?MODULE), - lists:foreach(fun({Id, Pid, _, _}) -> - Stat = catch iot_endpoint:get_stat(Pid), - lager:debug("[iot_endpoint] id: ~p, stat: ~p", [Id, Stat]) - end, Children). - -delete_endpoint(Name) when is_binary(Name) -> - Id = iot_endpoint:get_name(Name), - supervisor:terminate_child(?MODULE, Id), - supervisor:delete_child(?MODULE, Id). - -child_spec(Endpoint) -> - Id = iot_endpoint:get_name(Endpoint), - #{id => Id, - start => {iot_endpoint, start_link, [Id, Endpoint]}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['iot_endpoint']}. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index d561002..b99e8e2 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -1,5 +1,5 @@ %%%------------------------------------------------------------------- -%%% @author aresei +%%% @author %%% @copyright (C) 2023, %%% @doc %%% @@ -29,16 +29,15 @@ host_id :: integer(), %% 从数据库里面读取到的数据 uuid :: binary(), + %% 记录最后的状态信息 + last_status :: integer(), %% aes的key, 后续通讯需要基于这个加密 aes = <<>> :: binary(), has_session = false :: boolean(), - %% 心跳计数器 heartbeat_counter = 0 :: integer(), - %% websocket相关 channel_pid :: undefined | pid(), - %% 主机的相关信息 metrics = #{} :: map() }). @@ -145,7 +144,7 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | {stop, Reason :: term()} | ignore). init([UUID]) -> - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + host_bo:change_status(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE), case host_bo:get_host_by_uuid(UUID) of @@ -162,7 +161,7 @@ init([UUID]) -> %% 心跳检测机制 erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), - {ok, #state{host_id = HostId, uuid = UUID, aes = Aes, has_session = false}}; + {ok, #state{host_id = HostId, uuid = UUID, last_status = ?HOST_OFFLINE, aes = Aes, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -185,7 +184,7 @@ handle_call(get_aes, _From, State = #state{aes = Aes}) -> {reply, {ok, Aes}, State}; %% 获取主机的状态 -handle_call(get_status, _From, State = #state{host_id = HostId, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> +handle_call(get_status, _From, State = #state{host_id = HostId, last_status = LastStatus, channel_pid = ChannelPid, heartbeat_counter = HeartbeatCounter, metrics = Metrics, has_session = HasSession}) -> %% 启动主机相关的devices {ok, Devices} = device_bo:get_host_devices(HostId), DeviceInfos = lists:map(fun(DeviceUUID) -> @@ -200,6 +199,7 @@ handle_call(get_status, _From, State = #state{host_id = HostId, channel_pid = Ch HasChannel = (ChannelPid /= undefined), Reply = #{ + <<"last_status">> => LastStatus, <<"has_channel">> => HasChannel, <<"has_session">> => HasSession, <<"heartbeat_counter">> => HeartbeatCounter, @@ -255,7 +255,7 @@ handle_call({activate, false}, _From, State = #state{uuid = UUID, host_id = Host report_event(UUID, ?HOST_OFFLINE), change_devices_status(HostId, ?DEVICE_UNKNOWN), - {reply, ok, State#state{channel_pid = undefined, has_session = false}}; + {reply, ok, State#state{last_status = ?HOST_OFFLINE, channel_pid = undefined, has_session = false}}; %% 绑定channel handle_call({attach_channel, ChannelPid}, _From, State = #state{uuid = UUID, channel_pid = OldChannelPid}) -> @@ -281,7 +281,7 @@ handle_call({create_session, PubKey}, _From, State = #state{uuid = UUID, aes = A lager:debug("[iot_host] host_id(session) uuid: ~p, create_session, will change status, affected_row: ~p", [UUID, AffectedRow]), - {reply, {ok, <<10:8, EncReply/binary>>}, State#state{has_session = true}}; + {reply, {ok, <<10:8, EncReply/binary>>}, State#state{last_status = ?HOST_ONLINE, has_session = true}}; false -> lager:debug("[iot_host] host_id(denied) uuid: ~p, create_session, will not change host status", [UUID]), Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, @@ -441,7 +441,7 @@ handle_info({timeout, _, heartbeat_ticker}, State = #state{uuid = UUID, host_id report_event(UUID, ?HOST_OFFLINE), erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), - {noreply, State#state{heartbeat_counter = 0}}; + {noreply, State#state{last_status = ?HOST_OFFLINE, heartbeat_counter = 0}}; %% 其他情况下需要重置系统计数器 handle_info({timeout, _, heartbeat_ticker}, State = #state{}) -> erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker), @@ -466,8 +466,8 @@ handle_info(Info, State = #state{has_session = HasSession}) -> %% terminate. It should be the opposite of Module:init/1 and do any %% necessary cleaning up. When it returns, the gen_statem terminates with %% Reason. The return value is ignored. -terminate(Reason, _State = #state{host_id = HostId, uuid = UUID, has_session = HasSession}) -> - lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p", [UUID, Reason, HasSession]), +terminate(Reason, #state{host_id = HostId, uuid = UUID, has_session = HasSession}) -> + lager:debug("[iot_host] host: ~p, terminate with reason: ~p, has_session: ~p, status: offline", [UUID, Reason, HasSession]), host_bo:change_status(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE), @@ -537,7 +537,7 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> <<"name">> => <<"主机状态"/utf8>>, <<"timestamp">> => Timestamp }], - iot_router:route(UUID, FieldsList, Timestamp), + iot_router:route_uuid(UUID, FieldsList, Timestamp), lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]). %% 将当前的state转换成map diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index d1ec86d..46a60c1 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -11,25 +11,16 @@ -include("iot.hrl"). %% API --export([route/3, route_uuid/3]). +-export([route_uuid/3]). -spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) -> %% 查找终端设备对应的点位信息 - case mnesia_kv:hget(RouterUUID, <<"location_code">>) of - none -> + case redis_client:hget(RouterUUID, <<"location_code">>) of + {ok, undefined} -> lager:debug("[iot_host] the north_data hget location_code, uuid: ~p, not found", [RouterUUID]); + {ok, LocationCode} when is_binary(LocationCode) -> + iot_zd_endpoint:forward(LocationCode, Fields, Timestamp); {error, Reason} -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]); - {ok, LocationCode} -> - route(LocationCode, Fields, Timestamp) - end. - --spec route(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> ok. -route(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields), is_integer(Timestamp) -> - [begin - Pid = iot_endpoint:get_pid(EndpointName), - iot_endpoint:forward(Pid, LocationCode, Fields, Timestamp) - end || EndpointName <- mnesia_endpoint:get_keys()], - ok. - + lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]) + end. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index 24c260a..a61c3b9 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -26,6 +26,8 @@ start_link() -> %% type => worker(), % optional %% modules => modules()} % optional init([]) -> + {ok, MqttOpts} = application:get_env(iot, zhongdian), + SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, Specs = [ #{ @@ -37,15 +39,6 @@ init([]) -> modules => ['iot_logger'] }, - #{ - id => 'iot_endpoint_sup', - start => {'iot_endpoint_sup', start_link, []}, - restart => permanent, - shutdown => 2000, - type => supervisor, - modules => ['iot_endpoint_sup'] - }, - #{ id => 'iot_device_sup', start => {'iot_device_sup', start_link, []}, @@ -62,8 +55,16 @@ init([]) -> shutdown => 2000, type => supervisor, modules => ['iot_host_sup'] - } + }, + #{ + id => 'iot_zd_endpoint', + start => {'iot_zd_endpoint', start_link, [MqttOpts]}, + restart => permanent, + shutdown => 2000, + type => worker, + modules => ['iot_zd_endpoint'] + } ], {ok, {SupFlags, pools() ++ Specs}}. diff --git a/apps/iot/src/iot_zd_endpoint.erl b/apps/iot/src/iot_zd_endpoint.erl new file mode 100644 index 0000000..b01679a --- /dev/null +++ b/apps/iot/src/iot_zd_endpoint.erl @@ -0,0 +1,232 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 06. 7月 2023 12:02 +%%%------------------------------------------------------------------- +-module(iot_zd_endpoint). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_statem). + +%% API +-export([start_link/1]). +-export([get_pid/0, forward/3, get_stat/0]). + +%% gen_statem callbacks +-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). + +%% 消息重发间隔 +-define(RETRY_INTERVAL, 5000). + +-record(state, { + mqtt_opts = [], + postman_pid :: undefined | pid(), + + %% 当前数据的游标, #north_data的id + cursor = 0 :: integer(), + %% 定时器 + timer_ref :: undefined | reference(), + %% 是否繁忙 + is_busy = false :: boolean(), + + %% 记录成功处理的消息数 + acc_num = 0 +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec get_pid() -> undefined | pid(). +get_pid() -> + whereis(?MODULE). + +-spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}). + +-spec get_stat() -> {ok, Stat :: #{}}. +get_stat() -> + gen_statem:call(?MODULE, get_stat, 5000). + +%% @doc Creates a gen_statem process which calls Module:init/1 to +%% initialize. To ensure a synchronized start-up procedure, this +%% function does not return until Module:init/1 has returned. +start_link(Opts) when is_list(Opts) -> + gen_statem:start_link({local, ?MODULE}, ?MODULE, [Opts], []). + +%%%=================================================================== +%%% gen_statem callbacks +%%%=================================================================== + +%% @private +%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or +%% gen_statem:start_link/[3,4], this function is called by the new +%% process to initialize. +init([Opts]) -> + erlang:process_flag(trap_exit, true), + %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 + erlang:start_timer(0, self(), create_postman), + + {ok, disconnected, #state{mqtt_opts = Opts, postman_pid = undefined}}. + +%% @private +%% @doc This function is called by a gen_statem when it needs to find out +%% the callback mode of the callback module. +callback_mode() -> + handle_event_function. + +%% @private +%% @doc There should be one instance of this function for each possible +%% state name. If callback_mode is state_functions, one of these +%% functions is called when gen_statem receives and event from +%% call/2, cast/2, or as a normal process message. + +handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> + mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), + %% 避免不必要的内部消息 + Actions = case StateName =:= connected andalso not IsBusy of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + {keep_state, State, Actions}; + +%% 触发读取下一条数据 +handle_event(info, fetch_next, disconnected, State) -> + lager:debug("[iot_zd_endpoint] fetch_next postman offline, data in queue"), + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{is_busy = true}) -> + {keep_state, State}; +handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPid, cursor = Cursor}) -> + case mnesia_queue:dirty_fetch_next(Cursor) of + {ok, NCursor, NorthData} -> + lager:debug("[iot_zd_endpoint] fetch_next success, north data is: ~p", [NorthData]), + do_post(PostmanPid, NorthData), + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + + {keep_state, State#state{cursor = NCursor, timer_ref = TimerRef, is_busy = true}}; + '$end_of_table' -> + {keep_state, State} + end; + +%% 收到确认消息 +handle_event(info, {ack, Id}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum}) -> + ok = mnesia_queue:delete(Id), + lager:debug("[iot_zd_endpoint] get ack: ~p", [Id]), + Actions = case StateName =:= connected of + true -> [{next_event, info, fetch_next}]; + false -> [] + end, + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + + {keep_state, State#state{timer_ref = undefined, acc_num = AccNum + 1, is_busy = false}, Actions}; + +%% 收到重发过期请求 +handle_event(info, {timeout, _, {repost_ticker, NorthData}}, connected, State = #state{postman_pid = PostmanPid}) -> + lager:debug("[iot_zd_endpoint] repost data: ~p", [NorthData]), + do_post(PostmanPid, NorthData), + TimerRef = erlang:start_timer(?RETRY_INTERVAL, self(), {repost_ticker, NorthData}), + + {keep_state, State#state{timer_ref = TimerRef}}; + +%% 离线时,忽略超时逻辑 +handle_event(info, {timeout, _, {repost_ticker, _}}, disconnected, State) -> + {keep_state, State}; + +handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mqtt_opts = Opts}) -> + lager:debug("[iot_zd_endpoint] create postman"), + try + {ok, PostmanPid} = create_postman(Opts), + {next_state, connected, State#state{postman_pid = PostmanPid, timer_ref = undefined, is_busy = false}, [{next_event, info, fetch_next}]} + catch _:Error:Stack -> + lager:warning("[iot_zd_endpoint] config: ~p, create postman get error: ~p, stack: ~p", [Opts, Error, Stack]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {keep_state, State#state{postman_pid = undefined}} + end; + +%% 获取当前统计信息 +handle_event({call, From}, get_stat, StateName, State = #state{acc_num = AccNum}) -> + Stat = #{ + <<"acc_num">> => AccNum, + <<"queue_num">> => mnesia_queue:table_size(), + <<"state_name">> => atom_to_binary(StateName) + }, + {keep_state, State, [{reply, From, Stat}]}; + +%% postman进程挂掉时,重新建立新的 +handle_event(info, {'EXIT', PostmanPid, Reason}, connected, State = #state{timer_ref = TimerRef, postman_pid = PostmanPid}) -> + lager:warning("[iot_zd_endpoint] postman exited with reason: ~p", [Reason]), + is_reference(TimerRef) andalso erlang:cancel_timer(TimerRef), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + + {next_state, disconnected, State#state{timer_ref = undefined, postman_pid = undefined}}; + +%% @private +%% @doc If callback_mode is handle_event_function, then whenever a +%% gen_statem receives an event from call/2, cast/2, or as a normal +%% process message, this function is called. +handle_event(EventType, Event, StateName, State) -> + lager:warning("[iot_zd_endpoint] unknown message, event_type: ~p, event: ~p, state_name: ~p, state: ~p", [EventType, Event, StateName, State]), + {keep_state, State}. + +%% @private +%% @doc This function is called by a gen_statem when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_statem terminates with +%% Reason. The return value is ignored. +terminate(Reason, _StateName, #state{}) -> + lager:debug("[iot_zd_endpoint] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +code_change(_OldVsn, StateName, State = #state{}, _Extra) -> + {ok, StateName, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +%% 对mqtt协议的支持, 只需要建立单个链接 +create_postman(Opts) -> + Host = proplists:get_value(host, Opts), + Port = proplists:get_value(port, Opts), + Username = proplists:get_value(username, Opts), + Password = proplists:get_value(password, Opts), + Topic = proplists:get_value(topic, Opts), + Qos = proplists:get_value(qos, Opts), + + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-zhongdian_mqtt">>, + PostmanOpts = [ + {clientid, ClientId}, + {host, Host}, + {port, Port}, + {tcp_opts, []}, + {username, Username}, + {password, Password}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + mqtt_postman:start_link(PostmanOpts, Topic, Qos). + +-spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). +do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> + Data = #{ + <<"version">> => <<"1.0">>, + <<"location_code">> => LocationCode, + <<"ts">> => Timestamp, + <<"properties">> => Fields + }, + Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), + PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, + ok. \ No newline at end of file diff --git a/apps/iot/src/mnesia/mnesia_endpoint.erl b/apps/iot/src/mnesia/mnesia_endpoint.erl deleted file mode 100644 index e787ae5..0000000 --- a/apps/iot/src/mnesia/mnesia_endpoint.erl +++ /dev/null @@ -1,157 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 11:31 -%%%------------------------------------------------------------------- --module(mnesia_endpoint). --author("aresei"). --include_lib("stdlib/include/qlc.hrl"). --include("iot.hrl"). - -%% API --export([get_keys/0, get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]). --export([to_map/1, config_equals/2]). --export([import_static_endpoints/0]). - -%% 导入静态的配置信息 --spec import_static_endpoints() -> no_return(). -import_static_endpoints() -> - zhongdian_mqtt_endpoint(). - -zhongdian_mqtt_endpoint() -> - Mapper0 = "fun(LocationCode, Fields, Timestamp) -> - Data = #{ - <<\"version\">> => <<\"1.0\">>, - <<\"location_code\">> => LocationCode, - <<\"ts\">> => Timestamp, - <<\"properties\">> => Fields - }, - {ok, iolist_to_binary(jiffy:encode(Data, [force_utf8]))} - end.", - Mapper = list_to_binary(Mapper0), - - {ok, MapperFun} = iot_util:parse_mapper(Mapper), - - mnesia_endpoint:insert(#endpoint{ - name = <<"zhongdian_mqtt">>, - title = <<"中电mqtt北向数据"/utf8>>, - matcher = <<".*">>, - mapper = Mapper, - mapper_fun = MapperFun, - config = #mqtt_endpoint{ - host = <<"172.30.6.161">>, - port = 1883, - username = <<"admin">>, - password = <<"123456">>, - topic = <<"CET/NX/upload">>, - qos = 2 - }, - created_at = iot_util:timestamp_of_seconds() - }). - --spec get_keys() -> [Name :: binary()]. -get_keys() -> - mnesia:dirty_all_keys(endpoint). - --spec get_all_endpoints() -> [#endpoint{}]. -get_all_endpoints() -> - Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(endpoint)]), - qlc:e(Q) - end, - case mnesia:transaction(Fun) of - {atomic, Endpoints} -> - Endpoints; - {aborted, _} -> - [] - end. - --spec get_endpoint(Name :: binary()) -> undefined | {ok, #endpoint{}}. -get_endpoint(Name) when is_binary(Name) -> - case mnesia:dirty_read(endpoint, Name) of - [Endpoint | _] -> - {ok, Endpoint}; - [] -> - undefined - end. - --spec insert(Endpoint :: #endpoint{}) -> ok | {error, Reason :: any()}. -insert(Endpoint = #endpoint{}) -> - case mnesia:transaction(fun() -> mnesia:write(endpoint, Endpoint, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - --spec delete(Name :: binary()) -> ok | {error, Reason :: any()}. -delete(Name) when is_binary(Name) -> - case mnesia:transaction(fun() -> mnesia:delete(endpoint, Name, write) end) of - {atomic, ok} -> - ok; - {aborted, Reason} -> - {error, Reason} - end. - -%% 判断2个endpoint的配置项是否相同 --spec config_equals(any(), any()) -> boolean(). -config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> - true; -config_equals(#ws_endpoint{url = Url}, #ws_endpoint{url = Url}) -> - true; -config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, - #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> - true; -config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, - #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> - true; -config_equals(_, _) -> - false. - -to_map(#endpoint{name = Name, title = Title, matcher = Matcher, mapper = Mapper, config = Config, updated_at = UpdatedAt, created_at = CreatedAt}) -> - #{ - <<"name">> => Name, - <<"title">> => Title, - <<"matcher">> => Matcher, - <<"mapper">> => Mapper, - <<"config">> => config_map(Config), - <<"updated_at">> => UpdatedAt, - <<"created_at">> => CreatedAt - }. - -config_map(#http_endpoint{url = Url}) -> - #{<<"protocol">> => <<"http">>, <<"args">> => #{ - <<"url">> => Url - }}; -config_map(#ws_endpoint{url = Url}) -> - #{<<"protocol">> => <<"ws">>, <<"args">> => #{ - <<"url">> => Url - }}; -config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> - #{<<"protocol">> => <<"kafka">>, <<"args">> => #{ - <<"username">> => Username, - <<"password">> => Password, - <<"bootstrap_servers">> => BootstrapServers, - <<"topic">> => Topic - }}; -config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> - #{<<"protocol">> => <<"mqtt">>, <<"args">> => #{ - <<"host">> => Host, - <<"port">> => Port, - <<"username">> => Username, - <<"password">> => Password, - <<"topic">> => Topic, - <<"qos">> => Qos - }}; -config_map(#mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName}) -> - #{<<"protocol">> => <<"mysql">>, <<"args">> => #{ - <<"host">> => Host, - <<"port">> => Port, - <<"username">> => Username, - <<"password">> => Password, - <<"database">> => Database, - <<"table_name">> => TableName - }}. diff --git a/apps/iot/src/mnesia/mnesia_kv.erl b/apps/iot/src/mnesia/mnesia_kv.erl deleted file mode 100644 index a30f94a..0000000 --- a/apps/iot/src/mnesia/mnesia_kv.erl +++ /dev/null @@ -1,949 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2021, -%%% @doc -%%% -%%% @end -%%% Created : 21. 1月 2021 下午2:17 -%%%------------------------------------------------------------------- --module(mnesia_kv). --author("licheng5"). --include_lib("stdlib/include/qlc.hrl"). --include("iot.hrl"). - -%% 错误类型 --define(WRONG_KIND, <<"Operation against a key holding the wrong kind of value">>). - --type(wrong_kind() :: binary()). --type(redis_nil() :: none). - -%% API --export([all_expireable_keys/0, all_expired_keys/1, clean_expired_keys/0]). --export([del/1, exists/1, expire/2, keys/1, persist/1, ttl/1, type/1]). --export([get/1, set/2, setnx/2]). --export([hexists/2, hdel/2, hkeys/1, hget/2, hmget/2, hset/3, hmset/2, hgetall/1, hlen/1]). --export([sadd/2, scard/1, sdiff/2, sismember/2, smembers/1, sinter/2, sunion/2, spop/1, srandmember/2, srem/2]). --export([lindex/2, linsert/4, llen/1, lpop/1, lpush/2, lpushx/2, lrange/3, lrem/3, lset/3, ltrim/3, rpop/1, rpush/2, rpushx/2]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% 扫描整个表,用于构建过期机制 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 清理掉过期的keys --spec clean_expired_keys() -> ok | {error, Reason :: any()}. -clean_expired_keys() -> - %NowSecond = iot_util:timestamp_of_seconds(), - %case redis_mnesia_kv:all_expired_keys(NowSecond) of - % {ok, []} -> - % ok; - % {ok, Keys} -> - % lists:foreach(fun(Key) -> mnesia:transaction(fun() -> mnesia:delete(kv, Key, write) end) end, Keys), - % ok - %end. - ok. - -%% 获取全部可过期的keys --spec all_expireable_keys() -> {ok, [{Key :: binary(), ExpireAt :: integer()}]} | {error, Reason :: any()}. -all_expireable_keys() -> - Fun = fun() -> - Q = qlc:q([{E#kv.key, E#kv.expire_at} || E <- mnesia:table(kv), E#kv.expire_at > 0]), - qlc:e(Q) - end, - case mnesia:transaction(Fun) of - {atomic, Items} -> - {ok, Items}; - {aborted, Reason} -> - {error, Reason} - end. - --spec all_expired_keys(ExpireAt :: integer()) -> {ok, [Key :: binary()]} | {error, Reason :: any()}. -all_expired_keys(ExpireAt) when is_integer(ExpireAt) -> - Fun = fun() -> - Q = qlc:q([E#kv.key || E <- mnesia:table(kv), - E#kv.expire_at > 0, E#kv.expire_at =< ExpireAt]), - qlc:e(Q) - end, - case mnesia:transaction(Fun) of - {atomic, Items} -> - {ok, Items}; - {aborted, Reason} -> - {error, Reason} - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Key管理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 命令用于删除已存在的键。不存在的 key 会被忽略 --spec del(Key :: binary()) -> 0 | 1. -del(Key) when is_binary(Key) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [#kv{expire_at = ExpireAt}] -> - ok = mnesia:delete(kv, Key, write), - {1, ExpireAt} - end - end, - case mnesia:transaction(Fun) of - {atomic, {N, _ExpireAt}} -> - N; - {atomic, N} when is_integer(N) -> - N; - {aborted, _Reason} -> - 0 - end. - -%% 若 key 存在返回 1 ,否则返回 0 --spec exists(Key :: binary()) -> 0 | 1. -exists(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - 0; - [#kv{}] -> - 1 - end. - -% 设置成功返回 1 。 当 key 不存在或者不能为 key 设置过期时间时(比如在低于 2.1.3 版本的 Redis 中你尝试更新 key 的过期时间)返回 0 --spec expire(Key :: binary(), Second :: integer()) -> 0 | 1. -expire(Key, Second) when is_binary(Key), is_integer(Second) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV] -> - NExpireAt = iot_util:timestamp_of_seconds() + Second, - ok = mnesia:write(kv, KV#kv{expire_at = NExpireAt}, write), - 1 - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, _Reason} -> - 0 - end. - -%% 只支持前缀的匹配 --spec keys(Pattern :: binary()) -> Keys :: list(). -keys(Pattern) when is_binary(Pattern) -> - Keys = mnesia:dirty_all_keys(kv), - case Pattern of - <<"*">> -> - Keys; - _ -> - case binary:split(Pattern, <<"*">>) of - [<<>> | _] -> - []; - [Prefix | _] -> - Len = byte_size(Prefix), - lists:filter(fun(Key) -> - case Key of - <> -> - true; - _ -> - false - end - end, Keys) - end - end. - -%% 当过期时间移除成功时,返回 1 。 如果 key 不存在或 key 没有设置过期时间,返回 0 --spec persist(Key :: binary()) -> 0 | 1. -persist(Key) when is_binary(Key) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [#kv{expire_at = 0}] -> - 0; - [KV] -> - ok = mnesia:write(kv, KV#kv{expire_at = 0}, write), - 1 - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, _} -> - 0 - end. - -%% 当 key 不存在时,返回 -2 。 当 key 存在但没有设置剩余生存时间时,返回 -1 。 否则,以秒为单位,返回 key 的剩余生存时间。 --spec ttl(Key :: binary()) -> TTL :: integer(). -ttl(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - -2; - [#kv{expire_at = 0}] -> - -1; - [#kv{expire_at = ExpireAt}] -> - NowSeconds = iot_util:timestamp_of_seconds(), - ExpireAt - NowSeconds - end. - -%% 获取key的类型 -%%-spec type(Key :: binary()) -> None :: redis_nil() | Type :: atom(). -type(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - none; - [#kv{type = Type}] -> - Type - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% 字符串处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%%-spec get(Key :: binary()) -> redis_nil() | Val :: binary() | {error, Reason :: binary()} . -get(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - none; - [#kv{val = Val, type = string}] -> - Val; - _ -> - {error, ?WRONG_KIND} - end. - --spec set(Key :: binary(), Val :: binary()) -> boolean(). -set(Key, Val) when is_binary(Key), is_binary(Val) -> - KV = #kv{key = Key, val = Val, type = string}, - case mnesia:transaction(fun() -> mnesia:write(kv, KV, write) end) of - {atomic, ok} -> - true; - {aborted, _} -> - false - end. - --spec setnx(Key :: binary(), Val :: binary()) -> boolean(). -setnx(Key, Val) when is_binary(Key), is_binary(Val) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - KV = #kv{key = Key, val = Val, type = string}, - ok = mnesia:write(kv, KV, write), - 1; - [#kv{}] -> - 0 - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% HashTable处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 如果字段是哈希表中的一个新建字段,并且值设置成功,返回 1 。 如果哈希表中域字段已经存在且旧值已被新值覆盖,返回 0 --spec hset(Key :: binary(), Field :: binary(), Val :: binary()) -> N :: integer() | {error, Reason :: binary()}. -hset(Key, Field, Val) when is_binary(Key), is_binary(Field), is_binary(Val) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - KV = #kv{key = Key, val = #{Field => Val}, type = hash}, - ok = mnesia:write(kv, KV, write), - 1; - [KV = #kv{val = Map0, type = hash}] -> - IsKey = maps:is_key(Field, Map0), - Map = maps:put(Field, Val, Map0), - ok = mnesia:write(kv, KV#kv{key = Key, val = Map}, write), - case IsKey of - true -> 0; - false -> 1 - end; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - --spec hmset(Key :: binary(), Map :: map()) -> ok | {error, Reason :: binary()}. -hmset(Key, Map) when is_binary(Key), is_map(Map) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - KV = #kv{key = Key, val = Map, type = hash}, - mnesia:write(kv, KV, write); - [KV = #kv{val = Map0, type = hash}] -> - Map1 = maps:merge(Map, Map0), - mnesia:write(kv, KV#kv{key = Key, val = Map1}, write); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, ok} -> - <<"OK">>; - {aborted, Reason} -> - {error, Reason} - end. - -%% 返回给定字段的值。如果给定的字段或 key 不存在时,返回 nil --spec hget(Key :: binary(), Field :: binary()) -> redis_nil() | {ok, Val :: any()} | {error, Reason :: binary()}. -hget(Key, Field) when is_binary(Key), is_binary(Field) -> - case mnesia:dirty_read(kv, Key) of - [] -> - none; - [#kv{val = #{Field := Val}, type = hash}] -> - {ok, Val}; - _ -> - {error, ?WRONG_KIND} - end. - --spec hmget(Key :: binary(), [binary()]) -> list() | {error, Reason :: binary()}. -hmget(Key, Fields) when is_binary(Key), is_list(Fields) -> - case mnesia:dirty_read(kv, Key) of - [] -> - [none || _ <- Fields]; - [#kv{val = Map0, type = hash}] -> - [maps:get(Field, Map0, none) || Field <- Fields]; - _ -> - {error, ?WRONG_KIND} - end. - --spec hdel(Key :: binary(), [Field :: binary()]) -> Num :: integer() | {error, Reason :: binary()}. -hdel(Key, Fields) when is_binary(Key), is_list(Fields) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV = #kv{val = Map, type = hash}] -> - Map1 = lists:foldl(fun(Field, Map0) -> maps:remove(Field, Map0) end, Map, Fields), - ok = mnesia:write(kv, KV#kv{key = Key, val = Map1}, write), - map_size(Map) - map_size(Map1); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 如果哈希表含有给定字段,返回 1 。 如果哈希表不含有给定字段,或 key 不存在,返回 0 --spec hexists(Key :: binary(), Field :: binary()) -> 0 | 1 | {error, Reason :: binary()}. -hexists(Key, Field) when is_binary(Key), is_binary(Field) -> - case mnesia:dirty_read(kv, Key) of - [] -> - 0; - [#kv{val = Map0, type = hash}] -> - case maps:is_key(Field, Map0) of - true -> 1; - false -> 0 - end; - _ -> - {error, ?WRONG_KIND} - end. - -%% 以列表形式返回哈希表的字段及字段值。 若 key 不存在,返回空列表。 --spec hgetall(Key :: binary()) -> Map :: map() | {error, Reason :: binary()}. -hgetall(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - []; - [#kv{val = Map, type = hash}] -> - lists:foldl(fun({Field, Val}, Acc) -> [Field, Val | Acc] end, [], maps:to_list(Map)); - _ -> - {error, ?WRONG_KIND} - end. - -%% 包含哈希表中所有域(field)列表。 当 key 不存在时,返回一个空列表。 --spec hkeys(Key :: binary()) -> Keys :: list() | {error, Reason :: binary()}. -hkeys(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - []; - [#kv{val = Map, type = hash}] -> - maps:keys(Map); - _ -> - {error, ?WRONG_KIND} - end. - -%% 哈希表中字段的数量。 当 key 不存在时,返回 0 --spec hlen(Key :: binary()) -> 0 | 1 | {error, Reason :: binary()}. -hlen(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - 0; - [#kv{val = Map, type = hash}] -> - map_size(Map); - _ -> - {error, ?WRONG_KIND} - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% set处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 被添加到集合中的新元素的数量,不包括被忽略的元素。 --spec sadd(Key :: binary(), Members :: list()) -> Num :: integer() | {error, Reason :: binary()}. -sadd(Key, Members) when is_binary(Key), is_list(Members) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - S = sets:from_list(Members), - KV = #kv{key = Key, val = S, type = set}, - ok = mnesia:write(kv, KV, write), - sets:size(S); - [KV = #kv{val = Set0, type = set}] -> - Set1 = lists:foldl(fun(E, S0) -> sets:add_element(E, S0) end, Set0, Members), - ok = mnesia:write(kv, KV#kv{key = Key, val = Set1}, write), - sets:size(Set1) - sets:size(Set0); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 集合的数量。 当集合 key 不存在时,返回 0 --spec scard(Key :: binary()) -> Num :: integer() | {error, Reason :: wrong_kind()}. -scard(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - 0; - [#kv{val = Set0, type = set}] -> - sets:size(Set0); - _ -> - {error, ?WRONG_KIND} - end. - -%% 如果成员元素是集合的成员,返回 1 。 如果成员元素不是集合的成员,或 key 不存在,返回 0 --spec sismember(Key :: binary(), Member :: binary()) -> boolean() | {error, wrong_kind()}. -sismember(Key, Member) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - 0; - [#kv{val = S, type = set}] -> - case sets:is_element(Member, S) of - true -> 1; - false -> 0 - end; - _ -> - {error, ?WRONG_KIND} - end. - --spec sdiff(Key1 :: binary(), Key2 :: binary()) -> list() | {error, wrong_kind()}. -sdiff(Key1, Key2) when is_binary(Key1), is_binary(Key2) -> - case {mnesia:dirty_read(kv, Key1), mnesia:dirty_read(kv, Key2)} of - {[#kv{val = S1, type = set}], [#kv{val = S2, type = set}]} -> - sets:to_list(S1) -- sets:to_list(S2); - {[#kv{val = S1, type = set}], []} -> - sets:to_list(S1); - {[], [#kv{type = set}]} -> - []; - {[], []} -> - []; - _ -> - {error, ?WRONG_KIND} - end. - --spec sinter(Key1 :: binary(), Key2 :: binary()) -> list() | {error, wrong_kind()}. -sinter(Key1, Key2) when is_binary(Key1), is_binary(Key2) -> - case {mnesia:dirty_read(kv, Key1), mnesia:dirty_read(kv, Key2)} of - {[#kv{val = S1, type = set}], [#kv{val = S2, type = set}]} -> - sets:to_list(sets:intersection(S1, S2)); - {[#kv{type = set}], []} -> - []; - {[], [#kv{type = set}]} -> - []; - {[], []} -> - []; - _ -> - {error, ?WRONG_KIND} - end. - -%% 集合中的所有的成员。 不存在的集合 key 被视为空集合 --spec smembers(Key :: binary()) -> list() | {error, wrong_kind()}. -smembers(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [#kv{val = S, type = set}] -> - sets:to_list(S); - [] -> - []; - _ -> - {error, ?WRONG_KIND} - end. - -%% 被移除的随机元素。 当集合不存在或是空集时,返回 nil -%%-spec spop(Key :: binary()) -> redis_nil() | Member :: binary() | {error, wrong_kind()}. -spop(Key) when is_binary(Key) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - none; - [KV = #kv{val = S0, type = set}] -> - case sets:size(S0) of - 0 -> - none; - Size -> - E = lists:nth(rand:uniform(Size), sets:to_list(S0)), - S1 = sets:del_element(E, S0), - ok = mnesia:write(kv, KV#kv{val = S1}, write), - E - end; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, E} -> - E; - {aborted, Reason} -> - {error, Reason} - end. - -%% 只提供集合 key 参数时,返回一个元素;如果集合为空,返回 nil 。 如果提供了 count 参数,那么返回一个数组;如果集合为空,返回空数组。 --spec srandmember(Key :: binary(), Count :: integer()) -> list() | {error, wrong_kind()}. -srandmember(Key, Count) when is_binary(Key), is_integer(Count), Count > 0 -> - case mnesia:dirty_read(kv, Key) of - [] -> - []; - [#kv{val = S, type = set}] -> - Size = sets:size(S), - L = sets:to_list(S), - case Size =< Count of - true -> - L; - false -> - lists:sublist(L, rand:uniform(Size - Count + 1), Count) - end; - _ -> - {error, ?WRONG_KIND} - end. - -%% 被成功移除的元素的数量,不包括被忽略的元素 --spec srem(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}. -srem(Key, Members) when is_binary(Key), is_list(Members) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV = #kv{val = S, type = set}] -> - Size = sets:size(S), - S1 = lists:foldl(fun(E, S0) -> sets:del_element(E, S0) end, S, Members), - ok = mnesia:write(kv, KV#kv{val = S1}, write), - Size - sets:size(S1); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 命令返回给定集合的并集。不存在的集合 key 被视为空集 --spec sunion(Key :: binary(), Key2 :: binary()) -> Members :: list() | {error, wrong_kind()}. -sunion(Key1, Key2) when is_binary(Key1), is_binary(Key2) -> - case {mnesia:dirty_read(kv, Key1), mnesia:dirty_read(kv, Key2)} of - {[#kv{val = S1, type = set}], [#kv{val = S2, type = set}]} -> - sets:to_list(sets:union(S1, S2)); - {[#kv{val = S1, type = set}], []} -> - sets:to_list(S1); - {[], [#kv{val = S2, type = set}]} -> - sets:to_list(S2); - {[], []} -> - []; - {_, _} -> - {error, ?WRONG_KIND} - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% List 处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,列表中下标为指定索引值的元素。 如果指定索引值不在列表的区间范围内,返回 nil -%%-spec lindex(Key :: binary(), Idx :: integer()) -> redis_nil() | Member :: binary() | {error, wrong_kind()}. -lindex(Key, Idx) when is_binary(Key), is_integer(Idx) -> - case mnesia:dirty_read(kv, Key) of - [] -> - none; - [#kv{val = L, type = list}] -> - Idx1 = fix_pos(Idx, length(L)), - case Idx1 >= 1 andalso Idx1 =< length(L) of - true -> - lists:nth(fix_pos(Idx, length(L)), L); - false -> - none - end; - _ -> - {error, ?WRONG_KIND} - end. - --spec llen(Key :: binary()) -> Size :: integer() | {error, wrong_kind()}. -llen(Key) when is_binary(Key) -> - case mnesia:dirty_read(kv, Key) of - [] -> - 0; - [#kv{val = L, type = list}] -> - length(L); - _ -> - {error, ?WRONG_KIND} - end. - -%% 列表的第一个元素。 当列表 key 不存在时,返回 nil -%%-spec lpop(Key :: binary()) -> redis_nil() | E :: binary() | {error, wrong_kind()}. -lpop(Key) when is_binary(Key) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - none; - [#kv{val = [], type = list}] -> - none; - [KV = #kv{val = [H | Tail], type = list}] -> - ok = mnesia:write(kv, KV#kv{val = Tail}, write), - H; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, E} -> - E; - {aborted, Reason} -> - {error, Reason} - end. - -%% 列表的最后一个元素,返回值为移除的元素 -%%-spec rpop(Key :: binary()) -> redis_nil() | E :: binary() | {error, wrong_kind()}. -rpop(Key) when is_binary(Key) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - none; - [#kv{val = [], type = list}] -> - none; - [KV = #kv{val = L0, type = list}] -> - [H | Tail] = lists:reverse(L0), - ok = mnesia:write(kv, KV#kv{val = lists:reverse(Tail)}, write), - H; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, E} -> - E; - {aborted, Reason} -> - {error, Reason} - end. - -%% 命令将一个或多个值插入到列表头部。 如果 key 不存在,一个空列表会被创建并执行 LPUSH 操作。 当 key 存在但不是列表类型时,返回一个错误 -%% 执行 LPUSH 命令后,列表的长度 --spec lpush(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}. -lpush(Key, Members) when is_binary(Key), is_list(Members) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - ok = mnesia:write(kv, #kv{key = Key, val = Members, type = list}, write), - length(Members); - [KV = #kv{val = L0, type = list}] -> - L = Members ++ L0, - ok = mnesia:write(kv, KV#kv{val = L, type = list}, write), - length(L); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 将一个值插入到已存在的列表头部,列表不存在时操作无效 -%% LPUSHX 命令执行之后,列表的长度 --spec lpushx(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}. -lpushx(Key, Members) when is_binary(Key), is_list(Members) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV = #kv{val = L0, type = list}] -> - L = Members ++ L0, - ok = mnesia:write(kv, KV#kv{val = L}, write), - length(L); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 将一个或多个值插入到列表的尾部(最右边)。 -%% 如果列表不存在,一个空列表会被创建并执行 RPUSH 操作。 当列表存在但不是列表类型时,返回一个错误 -%% 执行 RPUSH 操作后,列表的长度 --spec rpush(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}. -rpush(Key, Members) when is_binary(Key), is_list(Members) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - ok = mnesia:write(kv, #kv{key = Key, val = Members, type = list}, write), - length(Members); - [KV = #kv{val = L0, type = list}] -> - L = L0 ++ Members, - ok = mnesia:write(kv, KV#kv{val = L}, write), - length(L); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 将一个或多个值插入到列表的尾部(最右边)。 -%% 当列表存在但不是列表类型时,返回一个错误 -%% 执行 RPUSH 操作后,列表的长度 --spec rpushx(Key :: binary(), Members :: list()) -> Num :: integer() | {error, wrong_kind()}. -rpushx(Key, Members) when is_binary(Key), is_list(Members) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV = #kv{val = L0, type = list}] -> - L = L0 ++ Members, - ok = mnesia:write(kv, KV#kv{val = L, type = list}, write), - length(L); - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 返回列表中指定区间内的元素,区间以偏移量 START 和 END 指定。 其中 0 表示列表的第一个元素, 1 表示列表的第二个元素,以此类推。 -%% 你也可以使用负数下标,以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推。 --spec lrange(Key :: binary(), Start :: integer(), End :: integer()) -> list() | {error, wrong_kind()}. -lrange(Key, Start, End) when is_binary(Key), is_integer(Start), is_integer(End) -> - case mnesia:dirty_read(kv, Key) of - [] -> - []; - [#kv{val = L, type = list}] -> - Len = length(L), - Start1 = fix_pos(Start, Len), - End1 = fix_pos(End, Len), - case Start1 =< End1 of - true -> - lists:sublist(L, Start1, End1 - Start1 + 1); - false -> - [] - end; - _ -> - {error, ?WRONG_KIND} - end. - -%% 根据参数 COUNT 的值,移除列表中与参数 VALUE 相等的元素。 -%% COUNT 的值可以是以下几种: -%% count > 0 : 从表头开始向表尾搜索,移除与 VALUE 相等的元素,数量为 COUNT 。 -%% count < 0 : 从表尾开始向表头搜索,移除与 VALUE 相等的元素,数量为 COUNT 的绝对值。 -%% count = 0 : 移除表中所有与 VALUE 相等的值 -%% 被移除元素的数量。 列表不存在时返回 0 --spec lrem(Key :: binary(), Count :: integer(), Val :: binary()) -> Num :: integer() | {error, wrong_kind()}. -lrem(Key, Count, Val) when is_binary(Key), is_integer(Count) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV = #kv{val = L0, type = list}] -> - if - Count > 0 -> - L1 = lists:foldl(fun(_, L) -> lists:delete(Val, L) end, L0, lists:seq(1, Count)), - ok = mnesia:write(kv, KV#kv{val = L1}, write), - length(L0) - length(L1); - Count =:= 0 -> - {DeletedVals, L1} = lists:partition(fun(E) -> E =:= Val end, L0), - case DeletedVals =/= [] of - true -> - ok = mnesia:write(kv, KV#kv{val = L1}, write); - false -> - ok - end, - length(DeletedVals); - Count < 0 -> - L1 = lists:foldl(fun(_, L) -> - lists:delete(Val, L) end, lists:reverse(L0), lists:seq(1, abs(Count))), - ok = mnesia:write(kv, KV#kv{val = lists:reverse(L1)}, write), - length(L0) - length(L1) - end; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%% 当索引参数超出范围,或对一个空列表进行 LSET 时,返回一个错误 -%% 操作成功返回 ok ,否则返回错误信息 --spec lset(Key :: binary(), Idx :: integer(), Val :: binary()) -> ok | {error, wrong_kind()}. -lset(Key, Idx, Val) when is_binary(Key), is_integer(Idx) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - mnesia:abort(?WRONG_KIND); - [KV = #kv{val = L0, type = list}] -> - case length(L0) < Idx of - true -> - mnesia:abort(<<"Index out of bounds">>); - false -> - L1 = lists_update(L0, Idx, Val), - mnesia:write(kv, KV#kv{val = L1}, write) - end; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, ok} -> - <<"OK">>; - {aborted, Reason} -> - {error, Reason} - end. - -%% 让列表只保留指定区间内的元素,不在指定区间之内的元素都将被删除。 -%% 下标 0 表示列表的第一个元素,以 1 表示列表的第二个元素,以此类推。 -%% 以 -1 表示列表的最后一个元素, -2 表示列表的倒数第二个元素,以此类推 -%% 命令执行成功时,返回 ok --spec ltrim(Key :: binary(), Start :: integer(), End :: integer()) -> ok | {error, wrong_kind()}. -ltrim(Key, Start, End) when is_binary(Key), is_integer(Start), is_integer(End) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - ok; - [KV = #kv{val = L0, type = list}] -> - Len = length(L0), - Start1 = fix_pos(Start, Len), - End1 = fix_pos(End, Len), - case Start1 =< End1 of - true -> - L1 = lists:sublist(L0, Start1, End1 - Start1 + 1), - mnesia:write(kv, KV#kv{val = L1}, write); - false -> - mnesia:write(kv, KV#kv{val = []}, write) - end - end - end, - case mnesia:transaction(Fun) of - {atomic, ok} -> - <<"OK">>; - {aborted, Reason} -> - {error, Reason} - end. - -%% 命令用于在列表的元素前或者后插入元素。当指定元素不存在于列表中时,不执行任何操作。 -%% 当列表不存在时,被视为空列表,不执行任何操作。 -%% 如果 key 不是列表类型,返回一个错误 -%% 如果命令执行成功,返回插入操作完成之后,列表的长度。 如果没有找到指定元素 ,返回 -1 。 如果 key 不存在或为空列表,返回 0 --spec linsert(Key :: binary(), Position :: binary(), Pivot :: binary(), Value :: binary()) -> - Num :: integer() | - {error, wrong_kind()}. -linsert(Key, Position, Pivot, Value) when is_binary(Key), is_binary(Position) -> - Fun = fun() -> - case mnesia:read(kv, Key) of - [] -> - 0; - [KV = #kv{val = L0, type = list}] -> - L = case Position of - <<"BEFORE">> -> - lists_insert_before(L0, Pivot, Value); - <<"AFTER">> -> - lists_insert_after(L0, Pivot, Value) - end, - ok = mnesia:write(kv, KV#kv{val = L}, write), - case L0 =:= L of - true -> -1; - false -> length(L) - end; - _ -> - mnesia:abort(?WRONG_KIND) - end - end, - case mnesia:transaction(Fun) of - {atomic, N} -> - N; - {aborted, Reason} -> - {error, Reason} - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 计算位置 -fix_pos(Pos, Len) when is_integer(Pos), is_integer(Len) -> - case Pos >= 1 of - true -> Pos; - false -> Len + Pos - end. - -%% 更新数组中的元素 --spec lists_update(L :: list(), N :: integer(), Val :: any()) -> L1 :: list(). -lists_update(L, N, Val) when is_integer(N), N > 0, is_list(L) -> - case length(L) < N of - true -> L; - false -> lists_update0(L, N, Val) - end. -lists_update0([_ | Tail], 1, Val) -> - [Val | Tail]; -lists_update0([Hd | Tail], N, Val) -> - [Hd | lists_update0(Tail, N - 1, Val)]. - -%% 在元素前插入 -lists_insert_before(L, Pivot, Val) when is_list(L) -> - lists_insert_before0(L, Pivot, Val). -lists_insert_before0([], _Pivot, _Val) -> - []; -lists_insert_before0([Pivot | Tail], Pivot, Val) -> - [Val, Pivot | Tail]; -lists_insert_before0([H | Tail], Pivot, Val) -> - [H | lists_insert_before0(Tail, Pivot, Val)]. - -%% 在元素后插入 -lists_insert_after(L, Pivot, Val) when is_list(L) -> - lists_insert_after0(L, Pivot, Val). -lists_insert_after0([], _Pivot, _Val) -> - []; -lists_insert_after0([Pivot | Tail], Pivot, Val) -> - [Pivot, Val | Tail]; -lists_insert_after0([H | Tail], Pivot, Val) -> - [H | lists_insert_after0(Tail, Pivot, Val)]. diff --git a/apps/iot/src/mnesia/mnesia_queue.erl b/apps/iot/src/mnesia/mnesia_queue.erl index 0d15267..d328478 100644 --- a/apps/iot/src/mnesia/mnesia_queue.erl +++ b/apps/iot/src/mnesia/mnesia_queue.erl @@ -10,56 +10,42 @@ -author("aresei"). -include("iot.hrl"). +-define(TAB_NAME, 'queue_data:zhongdian'). + %% API --export([insert/2, delete/2, ensure_queue/1, table_size/1, dirty_fetch_next/2]). +-export([insert/1, delete/1, table_size/0, dirty_fetch_next/1]). --spec insert(Tab :: atom(), #north_data{}) -> ok | {error, Reason :: any()}. -insert(Tab, Item = #north_data{}) -> - Id = mnesia_id_generator:next_id(Tab), +-spec insert(#north_data{}) -> ok | {error, Reason :: any()}. +insert(Item = #north_data{}) -> + Id = mnesia_id_generator:next_id(?TAB_NAME), NItem = Item#north_data{id = Id}, - case mnesia:transaction(fun() -> mnesia:write(Tab, NItem, write) end) of + case mnesia:transaction(fun() -> mnesia:write(?TAB_NAME, NItem, write) end) of {atomic, ok} -> ok; {aborted, Reason} -> {error, Reason} end. --spec delete(Tab :: atom(), Key :: any()) -> ok | {error, Reason :: any()}. -delete(Tab, Key) when is_atom(Tab) -> - case mnesia:transaction(fun() -> mnesia:delete(Tab, Key, write) end) of +-spec delete(Key :: any()) -> ok | {error, Reason :: any()}. +delete(Key) when is_integer(Key) -> + case mnesia:transaction(fun() -> mnesia:delete(?TAB_NAME, Key, write) end) of {atomic, ok} -> ok; {aborted, Reason} -> {error, Reason} end. -%% 确保数据已经建立 --spec ensure_queue(Name :: atom()) -> no_return(). -ensure_queue(Name) when is_atom(Name) -> - Tables = mnesia:system_info(tables), - case lists:member(Name, Tables) of - true -> - mnesia:wait_for_tables([Name], infinity); - false -> - mnesia:create_table(Name, [ - {attributes, record_info(fields, north_data)}, - {record_name, north_data}, - {disc_copies, [node()]}, - {type, ordered_set} - ]) - end. +-spec table_size() -> integer(). +table_size() -> + mnesia:table_info(?TAB_NAME, size). --spec table_size(Tab :: atom()) -> integer(). -table_size(Tab) when is_atom(Tab) -> - mnesia:table_info(Tab, size). - --spec dirty_fetch_next(Tab :: atom(), Cursor :: integer()) -> +-spec dirty_fetch_next(Cursor :: integer()) -> {ok, NCursor :: integer(), Item :: any()} | '$end_of_table'. -dirty_fetch_next(Tab, Cursor) when is_atom(Tab), is_integer(Cursor) -> - case mnesia:dirty_next(Tab, Cursor) of +dirty_fetch_next(Cursor) when is_integer(Cursor) -> + case mnesia:dirty_next(?TAB_NAME, Cursor) of '$end_of_table' -> '$end_of_table'; NextKey -> - [Item] = mnesia:dirty_read(Tab, NextKey), + [Item] = mnesia:dirty_read(?TAB_NAME, NextKey), {ok, NextKey, Item} - end. + end. \ No newline at end of file diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index 2b20efe..c5493c3 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -13,16 +13,7 @@ %% API -export([rsa_encode/1]). -export([insert_services/1]). --export([insert_endpoints/0, test_http/0, test_mysql/0, test_mqtt/0, test_influxdb/0]). - -fun_x(LocationCode, Fields, Timestamp) -> - Data = #{ - <<"version">> => <<"1.0">>, - <<"location_code">> => LocationCode, - <<"ts">> => Timestamp, - <<"properties">> => Fields - }, - {ok, iolist_to_binary(jiffy:encode(Data, [force_utf8]))}. +-export([test_mqtt/0, test_influxdb/0]). test_influxdb() -> UUID = <<"device123123">>, @@ -38,112 +29,13 @@ test_influxdb() -> end) end, lists:seq(1, 100)). -test_http() -> - Name = <<"zhongguodianli">>, - Pid = iot_endpoint:get_pid(Name), - - lists:foreach(fun(Id0) -> - Id = integer_to_binary(Id0), - Fields = [ - #{ - <<"key">> => <<"test:", Id/binary>>, - <<"value">> => Id, - <<"unit">> => <<"cm">> - } - ], - iot_endpoint:forward(Pid, <<"location_code:", Id/binary>>, Fields, iot_util:timestamp_of_seconds()) - end, lists:seq(1, 10000)). - -test_mysql() -> - lists:foreach(fun(_) -> - iot_router:route(<<"mysql123">>, [ - #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, - #{<<"key">> => <<"age">>, <<"value">> => 30}, - #{<<"key">> => <<"flow">>, <<"value">> => 30} - ], iot_util:timestamp_of_seconds()) - end, lists:seq(1, 10000)). - test_mqtt() -> - iot_router:route(<<"test123">>, [ + iot_zd_endpoint:forward(<<"location_code_test123">>, [ #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, #{<<"key">> => <<"age">>, <<"value">> => 30}, #{<<"key">> => <<"flow">>, <<"value">> => 30} ], iot_util:timestamp_of_seconds()). -insert_endpoints() -> - {ok, Mapper, F} = simple_mapper(), - mnesia_endpoint:insert(#endpoint{ - name = <<"zhongguodianli">>, - title = <<"中国电力"/utf8>>, - matcher = <<"test12*">>, - mapper = Mapper, - mapper_fun = F, - config = #http_endpoint{url = <<"http://localhost:18080/test/receiver">>}, - created_at = iot_util:timestamp_of_seconds() - }), - - mnesia_endpoint:insert(#endpoint{ - name = <<"mytest">>, - title = <<"测试数据"/utf8>>, - matcher = <<"test*">>, - mapper = Mapper, - mapper_fun = F, - config = #mqtt_endpoint{ - host = <<"39.98.184.67">>, - port = 1883, - username = <<"test">>, - password = <<"test1234">>, - topic = <<"CET/NX/${location_code}/upload">>, - qos = 2 - }, - created_at = iot_util:timestamp_of_seconds() - }), - - {ok, MysqlMapper, MysqlF} = mysql_mapper(), - mnesia_endpoint:insert(#endpoint{ - name = <<"mysql">>, - title = <<"测试mysql"/utf8>>, - matcher = <<"mysql*">>, - mapper = MysqlMapper, - mapper_fun = MysqlF, - config = #mysql_endpoint{ - host = <<"localhost">>, - port = 3306, - username = <<"php_an">>, - password = <<"123456">>, - database = <<"iot">>, - table_name = <<"north_data">> - }, - created_at = iot_util:timestamp_of_seconds() - }), - - {Mapper, F}. - -simple_mapper() -> - Mapper0 = "fun(LocationCode, Fields) -> - Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields), - Fields2 = maps:from_list(Fields1), - Bin = jiffy:encode(Fields2#{<<\"location_code\">> => LocationCode}, [force_utf8]), - {ok, iolist_to_binary(Bin)} - end.", - - Mapper = list_to_binary(Mapper0), - {ok, F} = iot_util:parse_mapper(Mapper), - - {ok, Mapper, F}. - -mysql_mapper() -> - Mapper0 = "fun(LocationCode, Fields, Timestamp) -> - Fields1 = lists:map(fun(#{<<\"key\">> := Key, <<\"value\">> := Val}) -> {Key, Val} end, Fields), - Content = jiffy:encode(maps:from_list(Fields1), [force_utf8]), - {ok, [{<<\"location_code\">>, LocationCode}, {<<\"content\">>, Content}, {<<\"created_ts\">>, Timestamp}]} - end.", - - Mapper = list_to_binary(Mapper0), - {ok, F} = iot_util:parse_mapper(Mapper), - - {ok, Mapper, F}. - insert_services(Num) -> lists:foreach(fun(Id) -> Res = mysql_pool:insert(mysql_iot, <<"micro_service">>, diff --git a/apps/iot/src/redis/redis_client.erl b/apps/iot/src/redis/redis_client.erl new file mode 100755 index 0000000..bf0d341 --- /dev/null +++ b/apps/iot/src/redis/redis_client.erl @@ -0,0 +1,21 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2017, +%%% @doc +%%% +%%% @end +%%% Created : 21. 四月 2017 13:33 +%%%------------------------------------------------------------------- +-module(redis_client). +-author("aresei"). + +%% API +-export([hget/2]). + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% HashTable处理 +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% + +-spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}. +hget(Key, Field) when is_binary(Key), is_binary(Field) -> + poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end). \ No newline at end of file diff --git a/apps/iot/src/redis/redis_handler.erl b/apps/iot/src/redis/redis_handler.erl deleted file mode 100644 index 001352e..0000000 --- a/apps/iot/src/redis/redis_handler.erl +++ /dev/null @@ -1,197 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2021, -%%% @doc -%%% -%%% @end -%%% Created : 21. 1月 2021 上午11:23 -%%%------------------------------------------------------------------- --module(redis_handler). --author("licheng5"). - -%% API --export([handle/1]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% Key管理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -handle([<<"DEL">>, Key]) when is_binary(Key) -> - N = mnesia_kv:del(Key), - {reply, N}; - -handle([<<"EXISTS">>, Key]) when is_binary(Key) -> - N = mnesia_kv:exists(Key), - {reply, N}; - -handle([<<"EXPIRE">>, Key, Second0]) when is_binary(Key), is_binary(Second0) -> - Second = binary_to_integer(Second0), - N = mnesia_kv:expire(Key, Second), - {reply, N}; - -handle([<<"KEYS">>, Pattern]) when is_binary(Pattern) andalso Pattern =/= <<>> -> - Keys = mnesia_kv:keys(Pattern), - {reply, Keys}; - -handle([<<"PERSIST">>, Key]) when is_binary(Key) -> - N = mnesia_kv:persist(Key), - {reply, N}; - -handle([<<"TTL">>, Key]) when is_binary(Key) -> - TTL = mnesia_kv:ttl(Key), - {reply, TTL}; - -handle([<<"TYPE">>, Key]) when is_binary(Key) -> - Type = mnesia_kv:type(Key), - {reply, atom_to_binary(Type)}; - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% 字符串处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -handle([<<"GET">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:get(Key)}; - -handle([<<"SET">>, Key, Val]) when is_binary(Key), is_binary(Val) -> - case mnesia_kv:set(Key, Val) of - true -> - {reply, <<"OK">>}; - false -> - {reply, <<"FAILED">>} - end; - -handle([<<"SETNX">>, Key, Val]) when is_binary(Key), is_binary(Val) -> - {reply, mnesia_kv:setnx(Key, Val)}; - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% HashTable处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -handle([<<"HSET">>, Key, Field, Val]) when is_binary(Key), is_binary(Field), is_binary(Val) -> - {reply, mnesia_kv:hset(Key, Field, Val)}; - -handle([<<"HMSET">>, Key | KvPairs]) when is_binary(Key), length(KvPairs) rem 2 =:= 0 -> - {reply, mnesia_kv:hmset(Key, lists_to_map(KvPairs))}; - -handle([<<"HGET">>, Key, Field]) when is_binary(Key), is_binary(Field) -> - {reply, mnesia_kv:hget(Key, Field)}; - -handle([<<"HMGET">>, Key | Fields]) when is_binary(Key), is_list(Fields) -> - {reply, mnesia_kv:hmget(Key, Fields)}; - -handle([<<"HDEL">>, Key | Fields]) when is_binary(Key), is_list(Fields) -> - {reply, mnesia_kv:hdel(Key, Fields)}; - -handle([<<"HEXISTS">>, Key, Field]) when is_binary(Key), is_binary(Field) -> - {reply, mnesia_kv:hexists(Key, Field)}; - -handle([<<"HGETALL">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:hgetall(Key)}; - -handle([<<"HKEYS">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:hkeys(Key)}; - -handle([<<"HLEN">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:hlen(Key)}; - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% set处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -handle([<<"SADD">>, Key | Members]) when is_binary(Key), is_list(Members) -> - {reply, mnesia_kv:sadd(Key, Members)}; - -handle([<<"SCARD">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:scard(Key)}; - -handle([<<"SISMEMBER">>, Key, Member]) when is_binary(Key), is_binary(Member) -> - {reply, mnesia_kv:sismember(Key, Member)}; - -handle([<<"SDIFF">>, Key1, Key2]) when is_binary(Key1), is_binary(Key2) -> - {reply, mnesia_kv:sdiff(Key1, Key2)}; - -handle([<<"SINTER">>, Key1, Key2]) when is_binary(Key1), is_binary(Key2) -> - {reply, mnesia_kv:sinter(Key1, Key2)}; - -handle([<<"SUNION">>, Key1, Key2]) when is_binary(Key1), is_binary(Key2) -> - {reply, mnesia_kv:sunion(Key1, Key2)}; - -handle([<<"SMEMBERS">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:smembers(Key)}; - -handle([<<"SPOP">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:spop(Key)}; - -handle([<<"SRANDMEMBER">>, Key, Count0]) when is_binary(Key), is_binary(Count0) -> - Count = binary_to_integer(Count0), - {reply, mnesia_kv:srandmember(Key, Count)}; - -handle([<<"SREM">>, Key | Members]) when is_binary(Key), is_list(Members) -> - {reply, mnesia_kv:srem(Key, Members)}; - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% List 处理 -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -handle([<<"LINDEX">>, Key, Idx0]) when is_binary(Key), is_binary(Idx0) -> - Idx = binary_to_integer(Idx0), - {reply, mnesia_kv:lindex(Key, Idx + 1)}; - -handle([<<"LLEN">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:llen(Key)}; - -handle([<<"LPOP">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:lpop(Key)}; - -handle([<<"RPOP">>, Key]) when is_binary(Key) -> - {reply, mnesia_kv:rpop(Key)}; - -handle([<<"LPUSH">>, Key | Members]) when is_binary(Key) -> - {reply, mnesia_kv:lpush(Key, Members)}; - -handle([<<"LPUSHX">>, Key | Members]) when is_binary(Key) -> - {reply, mnesia_kv:lpushx(Key, Members)}; - -handle([<<"RPUSH">>, Key | Members]) when is_binary(Key) -> - {reply, mnesia_kv:rpush(Key, Members)}; - -handle([<<"RPUSHX">>, Key | Members]) when is_binary(Key) -> - {reply, mnesia_kv:rpushx(Key, Members)}; - -handle([<<"LRANGE">>, Key, Start0, End0]) when is_binary(Key), is_binary(Start0), is_binary(End0) -> - Start = binary_to_integer(Start0), - End = binary_to_integer(End0), - {reply, mnesia_kv:lrange(Key, Start + 1, End + 1)}; - -handle([<<"LREM">>, Key, Count0, Val]) when is_binary(Key), is_binary(Count0), is_binary(Val) -> - Count = binary_to_integer(Count0), - {reply, mnesia_kv:lrem(Key, Count, Val)}; - -handle([<<"LSET">>, Key, Idx0, Val]) when is_binary(Key), is_binary(Idx0), is_binary(Val) -> - Idx = binary_to_integer(Idx0), - {reply, mnesia_kv:lset(Key, Idx + 1, Val)}; - -handle([<<"LTRIM">>, Key, Start0, End0]) when is_binary(Key), is_binary(Start0), is_binary(End0) -> - Start = binary_to_integer(Start0), - End = binary_to_integer(End0), - {reply, mnesia_kv:ltrim(Key, Start + 1, End + 1)}; - -handle([<<"LINSERT">>, Key, Position, Pivot, Val]) when is_binary(Key), Position =:= <<"BEFORE">>; Position =:= <<"AFTER">> -> - {reply, mnesia_kv:linsert(Key, Position, Pivot, Val)}; - -handle(_) -> - {reply, {error, <<"Unsuported Command">>}}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 将数组转换成map -lists_to_map(L) when is_list(L) -> - lists_to_map(L, #{}). -lists_to_map([], Map) -> - Map; -lists_to_map([K, V | Tail], Map) -> - lists_to_map(Tail, Map#{K => V}). - - diff --git a/apps/iot/src/redis/redis_protocol.erl b/apps/iot/src/redis/redis_protocol.erl deleted file mode 100644 index 5f49695..0000000 --- a/apps/iot/src/redis/redis_protocol.erl +++ /dev/null @@ -1,106 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2020, -%%% @doc -%%% -%%% @end -%%% Created : 10. 12月 2020 上午11:17 -%%%------------------------------------------------------------------- --module(redis_protocol). --author("licheng5"). - -%% API --export([start_link/2, init/2]). - --record(command, { - data = <<>>, - stage = parse_arg_num, - arg_num = 0, - args = [] -}). - -%%-------------------------------------------------------------------- -%% esockd callback -%%-------------------------------------------------------------------- - -start_link(Transport, Sock) -> - {ok, spawn_link(?MODULE, init, [Transport, Sock])}. - -init(Transport, Sock) -> - {ok, NewSock} = Transport:wait(Sock), - loop(Transport, NewSock, #command{data = <<>>, arg_num = 0, args = []}). - -loop(Transport, Sock, Command = #command{data = Data}) -> - Transport:setopts(Sock, [{active, once}]), - receive - {tcp, _, Packet} -> - %% 收到数据的第一个包,才开始记录处理时间, redis基于长连接,请求不是连续处理的 - NData = <>, - case parse(Command#command{data = NData}) of - {ok, #command{args = [Method0|Args]}} -> - Method = string:uppercase(Method0), - lager:debug("[redis_protocol] get a command: ~p", [[Method|Args]]), - {reply, Reply} = redis_handler:handle([Method | Args]), - Transport:send(Sock, encode(Reply)), - %% 等待下一次请求 - loop(Transport, Sock, #command{}); - {more_data, NCommand} -> - %% 请求的数据包过大,一次接受不完整 - loop(Transport, Sock, NCommand) - end; - {tcp_error, _} -> - exit(normal); - {tcp_closed, _} -> - exit(normal) - end. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 解析请求的包, 支持请求不在一个包里面的情况, 基于状态机 -parse(Command = #command{stage = parse_arg_num, data = <<$*, Rest/binary>>}) -> - [ArgNum0, ArgBin] = binary:split(Rest, <<$\r, $\n>>), - ArgNum = binary_to_integer(ArgNum0), - parse(Command#command{arg_num = ArgNum, data = ArgBin, stage = parse_arg}); -%% 解析请求的参数 -parse(Command = #command{stage = parse_arg, args = Args, arg_num = 0, data = <<>>}) -> - {ok, Command#command{args = lists:reverse(Args)}}; -parse(Command = #command{stage = parse_arg, args = Args, arg_num = ArgNum, data = ArgBin}) -> - case binary:split(ArgBin, <<$\r, $\n>>) of - [<<"$", ArgLen0/binary>>, RestArgBin] -> - ArgLen = binary_to_integer(ArgLen0), - case RestArgBin of - <> -> - parse(Command#command{arg_num = ArgNum - 1, args = [Arg | Args], data = RestArgBin1}); - _ -> - {more_data, Command} - end; - _ -> - {more_data, Command} - end. - -%% redis数据返回格式化 --spec encode(tuple() | binary() | list()) -> iolist(). -encode({single_line, Arg}) when is_binary(Arg) -> - [<<$+>>, Arg, <<$\r, $\n>>]; -encode({error, Arg}) when is_binary(Arg) -> - [<<$->>, Arg, <<$\r, $\n>>]; -encode(Arg) when is_integer(Arg) -> - [<<$:>>, integer_to_list(Arg), <<$\r, $\n>>]; -encode(Arg) when is_binary(Arg) -> - [<<$$>>, integer_to_list(iolist_size(Arg)), <<$\r, $\n>>, Arg, <<$\r, $\n>>]; -encode(Args) when is_list(Args) -> - ArgCount = [<<$*>>, integer_to_list(length(Args)), <<$\r, $\n>>], - ArgsBin = lists:map(fun encode/1, lists:map(fun to_binary/1, Args)), - [ArgCount, ArgsBin]. - -%% 将数据转换成binary -to_binary(X) when is_list(X) -> - unicode:characters_to_binary(X); -to_binary(X) when is_atom(X) -> - list_to_binary(atom_to_list(X)); -to_binary(X) when is_binary(X) -> - X; -to_binary(X) when is_integer(X) -> - list_to_binary(integer_to_list(X)). \ No newline at end of file diff --git a/config/sys-dev.config b/config/sys-dev.config index f8ef5c9..c38304f 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -34,6 +34,16 @@ {<<"test">>, <<"iot2023">>} ]}, + %% 配置中电的数据转发, mqtt协议 + {zhongdian, [ + {host, "39.98.184.67"}, + {port, 1883}, + {username, "test"}, + {password, "test1234"}, + {topic, "CET/NX/upload"}, + {qos, 2} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, @@ -50,13 +60,22 @@ ] }, + %% redis连接池 + {redis_pool, + [{size, 10}, {max_overflow, 20}, {worker_module, eredis}], + [ + {host, "127.0.0.1"}, + {port, 26379} + ] + }, + %% influxdb数据库配置, 测试环境的: 用户名: iot; password: password1234 {influx_pool, [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], [ {host, "39.98.184.67"}, {port, 8086}, - {token, <<"0DGZaV_wk6OCt_Bazk5W_9L3Zo-y-bGzwPS6NDGAqy7iSzPVTgC0paINNJ6V3y3Eo_JYfWAHhXI8OZTDnW3IRQ==">>} + {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} ] } diff --git a/config/sys-prod.config b/config/sys-prod.config index 187de25..f55cc58 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -18,22 +18,21 @@ {port, 18080} ]}, - %% 目标服务器地址 - %{emqx_server, [ - % {host, {39, 98, 184, 67}}, - % {port, 1883}, - % {tcp_opts, []}, - % {username, "test"}, - % {password, "test1234"}, - % {keepalive, 86400}, - % {retry_interval, 5} - %]}, - %% 权限检验时的预埋token {pre_tokens, [ {<<"test">>, <<"iot2023">>} ]}, + %% 配置中电的数据转发, mqtt协议 + {zhongdian, [ + {host, "172.30.6.161"}, + {port, 1883}, + {username, "admin"}, + {password, "123456"}, + {topic, "CET/NX/upload"}, + {qos, 2} + ]}, + {pools, [ %% mysql连接池配置 {mysql_iot, @@ -50,6 +49,15 @@ ] }, + %% redis连接池 + {redis_pool, + [{size, 10}, {max_overflow, 20}, {worker_module, eredis}], + [ + {host, "172.19.0.7"}, + {port, 26379} + ] + }, + %% influxdb数据库配置 {influx_pool, [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], diff --git a/rebar.config b/rebar.config index e077e67..8ac5bcc 100644 --- a/rebar.config +++ b/rebar.config @@ -7,6 +7,7 @@ {esockd, ".*", {git, "https://github.com/emqx/esockd.git", {tag, "v5.7.3"}}}, {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}}, + {eredis, ".*", {git, "https://github.com/wooga/eredis.git", {tag, "v1.2.0"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, {lager, ".*", {git,"https://github.com/erlang-lager/lager.git", {tag, "3.9.2"}}} ]}. diff --git a/rebar.lock b/rebar.lock index 0ab5093..de8977c 100644 --- a/rebar.lock +++ b/rebar.lock @@ -8,6 +8,10 @@ {git,"https://github.com/ninenines/cowlib", {ref,"cc04201c1d0e1d5603cd1cde037ab729b192634c"}}, 1}, + {<<"eredis">>, + {git,"https://github.com/wooga/eredis.git", + {ref,"9ad91f149310a7d002cb966f62b7e2c3330abb04"}}, + 0}, {<<"esockd">>, {git,"https://github.com/emqx/esockd.git", {ref,"d9ce4024cc42a65e9a05001997031e743442f955"}},