diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index aef5e2f..04a9300 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -29,9 +29,6 @@ -define(METHOD_FEEDBACK_STEP, 16#05). -define(METHOD_FEEDBACK_RESULT, 16#06). -%% 北向数据 --define(METHOD_NORTH_DATA, 16#08). - %% 消息体类型 -define(PACKET_REQUEST, 16#01). -define(PACKET_RESPONSE, 16#02). @@ -46,6 +43,31 @@ type :: atom() }). +-record(http_endpoint, { + url = <<>> :: binary(), + pool_size = 10 :: integer() +}). + +-record(ws_endpoint, { + url = <<>> :: binary() +}). + +-record(mqtt_endpoint, { + host = <<>> :: binary(), + port = 0 :: integer(), + username = <<>> :: binary(), + password = <<>> :: binary(), + topic = <<>> :: binary(), + qos = 0 :: integer() +}). + +-record(kafka_endpoint, { + username = <<>> :: binary(), + password = <<>> :: binary(), + bootstrap_servers = [] :: [binary()], + topic = <<>> :: binary() +}). + %% 对端配置 -record(endpoint, { %% 不同的对端名字要唯一 @@ -58,7 +80,7 @@ %% 数据转换规则,基于function mapper_fun = fun(_, Data) -> Data end :: fun((binary(), any()) -> any()), %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} - config = #{} :: #{}, + config = #http_endpoint{} :: #http_endpoint{} | #ws_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{}, %% 更新时间 updated_at = 0 :: integer(), %% 创建时间 diff --git a/apps/iot/src/http_handler/endpoint_handler.erl b/apps/iot/src/http_handler/endpoint_handler.erl index d9c69ab..6efa354 100644 --- a/apps/iot/src/http_handler/endpoint_handler.erl +++ b/apps/iot/src/http_handler/endpoint_handler.erl @@ -46,8 +46,6 @@ handle_request("POST", "/endpoint/create", _, Params = #{<<"name">> := Name}) -> Endpoint = Endpoint0#endpoint{created_at = iot_util:timestamp_of_seconds()}, case mnesia_endpoint:insert(Endpoint) of ok -> - {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint), - {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> lager:debug("[endpoint_handler] create router, get error is: ~p", [Reason]), @@ -70,13 +68,6 @@ handle_request("POST", "/endpoint/update", _, Params = #{<<"name">> := Name}) wh case mnesia_endpoint:insert(NEndpoint1) of ok -> - case iot_endpoint:get_pid(Name) of - undefined -> - %% 重新启动endpoint - {ok, _} = iot_endpoint_sup:ensured_endpoint_started(NEndpoint1); - Pid when is_pid(Pid) -> - iot_endpoint:reload(Pid, NEndpoint1) - end, {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> lager:debug("[endpoint_handler] update endpoint, get error is: ~p", [Reason]), @@ -139,27 +130,27 @@ make_endpoint([{<<"mapper">>, Mapper} | Params], Endpoint) when is_binary(Mapper make_endpoint([{<<"mapper">>, _} | _], _) -> throw(<<"invalid mapper">>); -make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = Config}); -make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"https">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = Config}); -make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = Config}); -make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint) +make_endpoint([{<<"config">>, #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> + make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}}); +make_endpoint([{<<"config">>, #{<<"protocol">> := <<"https">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> + make_endpoint(Params, Endpoint#endpoint{config = #http_endpoint{url = Url}}); +make_endpoint([{<<"config">>, #{<<"protocol">> := <<"ws">>, <<"args">> := #{<<"url">> := Url}}} | Params], Endpoint) when Url /= <<>> -> + make_endpoint(Params, Endpoint#endpoint{config = #ws_endpoint{url = Url}}); +make_endpoint([{<<"config">>, #{<<"protocol">> := <<"kafka">>, <<"args">> := #{<<"username">> := Username, <<"password">> := Password, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}}} | Params], Endpoint) when is_binary(Username) andalso Username /= <<>> andalso is_binary(Password) andalso Password /= <<>> andalso is_list(BootstrapServers) andalso length(BootstrapServers) > 0 andalso is_binary(Topic) andalso Topic /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = Config}); + make_endpoint(Params, Endpoint#endpoint{config = #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}}); -make_endpoint([{<<"config">>, Config = #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint) +make_endpoint([{<<"config">>, #{<<"protocol">> := <<"mqtt">>, <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}} | Params], Endpoint) when is_binary(Username) andalso Username /= <<>> andalso is_binary(Password) andalso Password /= <<>> andalso is_binary(Host) andalso Host /= <<>> andalso is_integer(Port) andalso Port > 0 andalso (Qos == 0 orelse Qos == 1 orelse Qos == 2) andalso is_binary(Topic) andalso Topic /= <<>> -> - make_endpoint(Params, Endpoint#endpoint{config = Config}); + make_endpoint(Params, Endpoint#endpoint{config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}); make_endpoint([{<<"config">>, Config} | _], _) -> lager:warning("[endpoint_handler] unsupport config: ~p", [Config]), diff --git a/apps/iot/src/iot_endpoint.erl b/apps/iot/src/iot_endpoint.erl index efef8cc..97e2ba1 100644 --- a/apps/iot/src/iot_endpoint.erl +++ b/apps/iot/src/iot_endpoint.erl @@ -49,11 +49,11 @@ get_name(EndpointName) when is_binary(EndpointName) -> get_pid(Name) when is_binary(Name) -> whereis(get_name(Name)). --spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Data :: binary()) -> no_return(). +-spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list()) -> no_return(). forward(undefined, _, _) -> ok; -forward(Pid, LocationCode, Data) when is_pid(Pid), is_binary(LocationCode) -> - gen_statem:cast(Pid, {forward, LocationCode, Data}). +forward(Pid, LocationCode, Fields) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields) -> + gen_statem:cast(Pid, {forward, LocationCode, Fields}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). @@ -100,18 +100,24 @@ handle_event(cast, {reload, NEndpoint}, disconnected, State = #state{endpoint = lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]), {keep_state, State#state{endpoint = NEndpoint}}; -handle_event(cast, {reload, NEndpoint}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}}) -> - lager:warning("[iot_endpoint] reload endpoint, old: ~p, new: ~p", [Endpoint, Endpoint, NEndpoint]), - %% 解除和postman的link关系 - unlink(PostmanPid), - %% 关闭postman进程 - PostmanPid ! stop, - %% 未确认的消息需要暂存 - NState = stash(State), - %% 重新建立新的postman - erlang:start_timer(0, self(), recreate_postman), +handle_event(cast, {reload, NEndpoint = #endpoint{name = Name}}, connected, State = #state{endpoint = Endpoint, postman = {_, PostmanPid}}) -> + lager:debug("[iot_endpoint] reload endpoint, old: ~p~n, new: ~p", [Endpoint, NEndpoint]), + case mnesia_endpoint:config_equals(NEndpoint#endpoint.config, Endpoint#endpoint.config) of + true -> + lager:debug("[iot_endpoint] reload endpoint: ~p, config equals", [Name]), + {keep_state, State#state{endpoint = NEndpoint}}; + false -> + %% 解除和postman的link关系 + unlink(PostmanPid), + %% 关闭postman进程 + PostmanPid ! stop, + %% 未确认的消息需要暂存 + NState = stash(State), + %% 重新建立新的postman + erlang:start_timer(0, self(), recreate_postman), - {next_state, disconnected, NState#state{endpoint = NEndpoint, postman = undefined}}; + {next_state, disconnected, NState#state{endpoint = NEndpoint, postman = undefined}} + end; handle_event(cast, {forward, LocationCode, Data}, _, State = #state{endpoint = Endpoint = #endpoint{mapper_fun = MapperFun}}) -> try @@ -232,48 +238,32 @@ stash(State = #state{ack_map = AckMap, timer_map = TimerMap, q = Q}) -> State#state{q = Q1, ack_map = #{}, postman = undefined}. %% 对http和https协议的支持 -create_postman(#endpoint{name = Name, config = Config = #{<<"protocol">> := <<"http">>, <<"args">> := #{<<"url">> := Url}}}) -> - PoolSize = maps:get(<<"pool_size">>, Config, 10), +create_postman(#endpoint{name = Name, config = #http_endpoint{url = Url, pool_size = PoolSize}}) -> PoolName = binary_to_atom(<<"http_pool:", Name/binary>>), {ok, PostmanPid} = http_postman:start_link(self(), Url, PoolName, PoolSize), {ok, {http, PostmanPid}}; %% 对mqtt协议的支持 -create_postman(#endpoint{name = Name, config = Config = #{<<"protocol">> := <<"mqtt">>, - <<"args">> := #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}}}) -> - - ClientId = case maps:is_key(<<"client_id">>, Config) of - true -> - maps:get(<<"client_id">>, Config); - false -> - Node = atom_to_binary(node()), - <<"mqtt-client-", Node/binary, "-", Name/binary>> - end, - Keepalive = maps:get(<<"keepalive">>, Config, 86400), - RetryInterval = maps:get(<<"retry_interval">>, Config, 5), +create_postman(#endpoint{name = Name, config = #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}}) -> + Node = atom_to_binary(node()), + ClientId = <<"mqtt-client-", Node/binary, "-", Name/binary>>, Opts = [ {clientid, ClientId}, - {host, as_string(Host)}, + {host, binary_to_list(Host)}, {port, Port}, {tcp_opts, []}, - {username, as_string(Username)}, - {password, as_string(Password)}, - {keepalive, Keepalive}, + {username, unicode:characters_to_list(Username)}, + {password, unicode:characters_to_list(Password)}, + {keepalive, 86400}, {auto_ack, true}, {connect_timeout, 5000}, {proto_ver, v5}, - {retry_interval, RetryInterval} + {retry_interval, 5} ], {ok, PostmanPid} = mqtt_postman:start_link(self(), Opts, Topic, Qos), {ok, {mqtt, PostmanPid}}; create_postman(#endpoint{}) -> - throw(<<"not supported">>). - -%% 转出成string -as_string(S) when is_list(S) -> - S; -as_string(S) when is_binary(S) -> - unicode:characters_to_list(S). \ No newline at end of file + throw(<<"not supported">>). \ No newline at end of file diff --git a/apps/iot/src/iot_endpoint_monitor.erl b/apps/iot/src/iot_endpoint_monitor.erl new file mode 100644 index 0000000..a6c9e85 --- /dev/null +++ b/apps/iot/src/iot_endpoint_monitor.erl @@ -0,0 +1,134 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 25. 7月 2023 16:39 +%%%------------------------------------------------------------------- +-module(iot_endpoint_monitor). +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). +-export([get_all_endpoints/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +get_all_endpoints() -> + case ets:lookup(endpoint_cache, endpoints) of + [] -> + []; + [{_, Endpoints} | _] -> + Endpoints + end. + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?SERVER}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + mnesia:subscribe({table, endpoint, simple}), + ets:new(endpoint_cache, [public, set, named_table, {keypos, 1}]), + %% 加载信息到缓存中 + load_endpoints(), + {ok, #state{}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Request, _From, State = #state{}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({mnesia_table_event, {delete, {endpoint, Name}, Tid}}, State = #state{}) -> + lager:debug("[iot_endpoint_monitor] delete_event endpoint name: ~p, tid: ~p", [Name, Tid]), + iot_endpoint_sup:delete_endpoint(Name), + load_endpoints(), + + {noreply, State}; +handle_info({mnesia_table_event, {write, Endpoint = #endpoint{name = Name}, Tid}}, State = #state{}) -> + lager:debug("[iot_endpoint_monitor] write_event new endpoint: ~p, tid: ~p", [Endpoint, Tid]), + case iot_endpoint:get_pid(Name) of + undefined -> + %% 重新启动endpoint + {ok, _} = iot_endpoint_sup:ensured_endpoint_started(Endpoint); + Pid when is_pid(Pid) -> + iot_endpoint:reload(Pid, Endpoint) + end, + load_endpoints(), + + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec load_endpoints() -> no_return(). +load_endpoints() -> + Endpoints = mnesia_endpoint:get_all_endpoints(), + true = ets:insert(endpoint_cache, {endpoints, Endpoints}). \ No newline at end of file diff --git a/apps/iot/src/iot_endpoint_sup.erl b/apps/iot/src/iot_endpoint_sup.erl index 9b6aced..0fd35e2 100644 --- a/apps/iot/src/iot_endpoint_sup.erl +++ b/apps/iot/src/iot_endpoint_sup.erl @@ -39,6 +39,7 @@ stat() -> delete_endpoint(Name) when is_binary(Name) -> Id = iot_endpoint:get_name(Name), + supervisor:terminate_child(?MODULE, Id), supervisor:delete_child(?MODULE, Id). child_spec(Endpoint) -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 9f66cc1..9b9a9ab 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -239,57 +239,39 @@ handle_event({call, From}, {create_session, PubKey}, _StateName, State = #state{ {next_state, session, State#state{status = ?HOST_STATUS_ONLINE}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]}; -%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理 +%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) -> PlainData = iot_cipher_aes:decrypt(AES, Data), case catch jiffy:decode(PlainData, [return_maps]) of - Infos when is_list(Infos) -> - lager:debug("[iot_host] the data is: ~p", [Infos]), - %% 记录数据, tags里面包含了 <<"device_uuid">> 信息 - lists:foreach(fun(Info = #{<<"service_name">> := ServiceName, <<"fields">> := FieldsList, <<"tags">> := Tags}) when is_binary(ServiceName) -> - Timestamp = maps:get(<<"at">>, Info, iot_util:timestamp()), - NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, - Measurement = <<"metric">>, + Info = #{<<"service_name">> := ServiceName, <<"at">> := Timestamp, <<"fields">> := FieldsList, <<"tags">> := Tags} when is_binary(ServiceName) -> + %% 查找终端设备对应的点位信息 + RouterUUID = router_uuid(Info, UUID), + case mnesia_kv:hget(RouterUUID, <<"location_code">>) of + none -> + lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, not found", [UUID, RouterUUID]); + {error, Reason} -> + lager:debug("[iot_host] the north_data hget location_code uuid: ~p, router_uuid: ~p, get error: ~p", [UUID, RouterUUID, Reason]); + {ok, LocationCode} -> + iot_router:route(LocationCode, FieldsList) + end, - Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList), - Precision = influx_client:get_precision(Timestamp), + %% 数据写入influxdb + NTags = case maps:is_key(<<"device_uuid">>, Info) of + true -> + Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => maps:get(<<"device_uuid">>, Info)}; + false -> + Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName} + end, + Measurement = <<"metric">>, + Points = lists:map(fun(Fields) -> influx_point:new(Measurement, NTags, Fields, Timestamp) end, FieldsList), + Precision = influx_client:get_precision(Timestamp), - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end) - end, Infos); + poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end); Other -> lager:debug("[iot_host] the data is invalid json: ~p", [Other]) end, {keep_state, State}; -%% TODO 处理微服务的北向数据 -handle_event(cast, {handle, {north_data, Data}}, session, State = #state{uuid = UUID, aes = AES}) -> - PlainData = iot_cipher_aes:decrypt(AES, Data), - lager:debug("[iot_host] the north_data is: ~p", [PlainData]), - %% 查找主机对应的点位信息 - case mnesia_kv:hget(UUID, <<"location_code">>) of - none -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, not found", [UUID]); - {error, Reason} -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p", [UUID, Reason]); - {ok, LocationCode} -> - iot_router:route(LocationCode, Data) - end, - {keep_state, State}; - -handle_event(cast, {handle, {north_data, {DeviceUUID, Data}}}, session, State = #state{uuid = UUID, aes = AES}) -> - PlainData = iot_cipher_aes:decrypt(AES, Data), - lager:debug("[iot_host] the north_data uuid: ~p, device_uuid: ~p, is: ~p", [UUID, DeviceUUID, PlainData]), - %% 查找终端设备对应的点位信息 - case mnesia_kv:hget(DeviceUUID, <<"location_code">>) of - none -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, device_uuid: ~p, not found", [UUID, DeviceUUID]); - {error, Reason} -> - lager:debug("[iot_host] the north_data hget location_code uuid: ~p, device_uuid: ~p, get error: ~p", [UUID, DeviceUUID, Reason]); - {ok, LocationCode} -> - iot_router:route(LocationCode, Data) - end, - {keep_state, State}; - handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) -> MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), case catch jiffy:decode(MetricsInfo, [return_maps]) of @@ -396,4 +378,10 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== %%% Internal functions -%%%=================================================================== \ No newline at end of file +%%%=================================================================== + +%% 获取到分发的路由 +router_uuid(#{<<"device_uuid">> := DeviceUUID}, _) when is_binary(DeviceUUID), DeviceUUID /= <<>> -> + DeviceUUID; +router_uuid(_, UUID) -> + UUID. diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index 9c9e1a2..ad99e0b 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -13,21 +13,21 @@ %% API -export([route/2]). --spec route(LocationCode :: binary(), Data :: binary()) -> ok. -route(LocationCode, Data) when is_binary(LocationCode), is_binary(Data) -> - Endpoints = mnesia_endpoint:get_all_endpoints(), - router0(Endpoints, LocationCode, Data). +-spec route(LocationCode :: binary(), Fields :: list()) -> ok. +route(LocationCode, Fields) when is_binary(LocationCode), is_list(Fields) -> + Endpoints = iot_endpoint_monitor:get_all_endpoints(), + router0(Endpoints, LocationCode, Fields). router0([], _, _) -> ok; -router0([#endpoint{matcher = Regexp, name = Name}|Endpoints], LocationCode, Data) -> +router0([#endpoint{matcher = Regexp, name = Name} | Endpoints], LocationCode, Fields) -> {ok, MP} = re:compile(Regexp), case re:run(LocationCode, MP, [{capture, all, list}]) of nomatch -> - router0(Endpoints, LocationCode, Data); + router0(Endpoints, LocationCode, Fields); {match, _} -> lager:debug("[iot_router] match endpoint: ~p", [Name]), Pid = iot_endpoint:get_pid(Name), - iot_endpoint:forward(Pid, LocationCode, Data), + iot_endpoint:forward(Pid, LocationCode, Fields), %% 继续匹配其他的Endpoint - router0(Endpoints, LocationCode, Data) + router0(Endpoints, LocationCode, Fields) end. \ No newline at end of file diff --git a/apps/iot/src/iot_sup.erl b/apps/iot/src/iot_sup.erl index d9946c3..b45a160 100644 --- a/apps/iot/src/iot_sup.erl +++ b/apps/iot/src/iot_sup.erl @@ -28,6 +28,15 @@ start_link() -> init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, ChildSpecs = [ + #{ + id => 'iot_endpoint_monitor', + start => {'iot_endpoint_monitor', start_link, []}, + restart => permanent, + shutdown => 2000, + type => supervisor, + modules => ['iot_endpoint_monitor'] + }, + #{ id => 'iot_endpoint_sup', start => {'iot_endpoint_sup', start_link, []}, diff --git a/apps/iot/src/mnesia/mnesia_endpoint.erl b/apps/iot/src/mnesia/mnesia_endpoint.erl index 2265bfb..126e0e3 100644 --- a/apps/iot/src/mnesia/mnesia_endpoint.erl +++ b/apps/iot/src/mnesia/mnesia_endpoint.erl @@ -13,7 +13,7 @@ %% API -export([get_all_endpoints/0, get_endpoint/1, insert/1, delete/1]). --export([to_map/1]). +-export([to_map/1, config_equals/2]). -spec get_all_endpoints() -> [#endpoint{}]. get_all_endpoints() -> @@ -56,14 +56,37 @@ delete(Name) when is_binary(Name) -> {error, Reason} end. +%% 判断2个endpoint的配置项是否相同 +-spec config_equals(any(), any()) -> boolean(). +config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> + true; +config_equals(#ws_endpoint{url = Url}, #ws_endpoint{url = Url}) -> + true; +config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, + #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> + true; +config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, + #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> + true; +config_equals(_, _) -> + false. + to_map(#endpoint{name = Name, title = Title, matcher = Matcher, mapper = Mapper, config = Config, updated_at = UpdatedAt, created_at = CreatedAt}) -> #{ <<"name">> => Name, <<"title">> => Title, <<"matcher">> => Matcher, <<"mapper">> => Mapper, - <<"config">> => Config, + <<"config">> => config_map(Config), <<"updated_at">> => UpdatedAt, <<"created_at">> => CreatedAt }. +config_map(#http_endpoint{url = Url}) -> + #{<<"protocol">> => <<"http">>, <<"args">> => #{<<"url">> => Url}}; +config_map(#ws_endpoint{url = Url}) -> + #{<<"protocol">> => <<"ws">>, <<"args">> => #{<<"url">> => Url}}; +config_map(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> + #{<<"protocol">> => <<"kafka">>, <<"args">> => #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}}; +config_map(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> + #{<<"protocol">> => <<"mqtt">>, <<"args">> => #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}. diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index ab6bf51..155c9a0 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -16,9 +16,8 @@ -export([insert_endpoints/0]). insert_endpoints() -> - Mapper0 = "fun(LocationCode, Data) -> - Json = jiffy:decode(Data, [return_maps]), - Bin = jiffy:encode(Json#{<<\"location_code\">> => LocationCode}, [force_utf8]), + Mapper0 = "fun(LocationCode, Fields) -> + Bin = jiffy:encode(Fields#{<<\"location_code\">> => LocationCode}, [force_utf8]), iolist_to_binary(Bin) end.", @@ -30,7 +29,7 @@ insert_endpoints() -> matcher = <<"test12*">>, mapper = Mapper, mapper_fun = F, - config = #{<<"protocol">> => <<"http">>, <<"args">> => #{<<"url">> => <<"http://localhost:18080/test/receiver">>}}, + config = #http_endpoint{url = <<"http://localhost:18080/test/receiver">>}, created_at = iot_util:timestamp_of_seconds() }), @@ -40,16 +39,13 @@ insert_endpoints() -> matcher = <<"test*">>, mapper = Mapper, mapper_fun = F, - config = #{ - <<"protocol">> => <<"mqtt">>, - <<"args">> => #{ - <<"host">> => <<"39.98.184.67">>, - <<"port">> => 1883, - <<"username">> => <<"test">>, - <<"password">> => <<"test1234">>, - <<"topic">> => <<"CET/NX/${location_code}/upload">>, - <<"qos">> => 2 - } + config = #mqtt_endpoint{ + host = <<"39.98.184.67">>, + port = 1883, + username = <<"test">>, + password = <<"test1234">>, + topic = <<"CET/NX/${location_code}/upload">>, + qos = 2 }, created_at = iot_util:timestamp_of_seconds() }), diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index c745f05..a9ebdce 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -92,17 +92,6 @@ websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] north_data uuid: ~p, data: ~p", [UUID, Data]), - iot_host:handle(HostPid, {north_data, Data}), - {ok, State}; - -websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> - lager:debug("[ws_channel] north_data uuid: ~p, data: ~p", [UUID, Data]), - iot_host:handle(HostPid, {north_data, {DeviceUUID, Data}}), - {ok, State}; - websocket_handle({binary, <>}, State = #state{uuid = UUID, host_pid = HostPid}) when is_pid(HostPid) -> lager:debug("[ws_channel] ping uuid: ~p", [UUID]), iot_host:handle(HostPid, {ping, CipherMetric}),