%%%------------------------------------------------------------------- %%% @author aresei %%% @copyright (C) 2023, %%% @doc %%% %%% @end %%% Created : 06. 7月 2023 12:02 %%%------------------------------------------------------------------- -module(endpoint). -include("endpoint.hrl"). %% API -export([start_link/1]). -export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]). -export([get_alias_pid/1]). -export([config_equals/2]). %%%=================================================================== %%% API %%%=================================================================== -spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}. start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}) -> LocalName = get_name(Id), AliasName = get_alias_name(Name), endpoint_http:start_link(LocalName, AliasName, Endpoint); start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) -> LocalName = get_name(Id), AliasName = get_alias_name(Name), endpoint_mqtt:start_link(LocalName, AliasName, Endpoint); start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mysql_endpoint{}}) -> LocalName = get_name(Id), AliasName = get_alias_name(Name), endpoint_mysql:start_link(LocalName, AliasName, Endpoint). -spec get_name(Id :: integer()) -> atom(). get_name(Id) when is_integer(Id) -> list_to_atom("endpoint:" ++ integer_to_list(Id)). -spec get_pid(Id :: integer()) -> undefined | pid(). get_pid(Id) when is_integer(Id) -> whereis(get_name(Id)). -spec get_alias_name(Name :: binary()) -> atom(). get_alias_name(Name) when is_binary(Name) -> list_to_atom("endpoint:" ++ binary_to_list(Name)). -spec get_alias_pid(Name :: binary()) -> undefined | pid(). get_alias_pid(Name) when is_binary(Name) -> iot_name_server:whereis_alias(get_alias_name(Name)). -spec forward(Pid :: pid(), ServiceId :: binary(), Format :: binary(), Metric :: binary()) -> no_return(). forward(Pid, ServiceId, Format, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Format), is_binary(Metric) -> gen_server:cast(Pid, {forward, ServiceId, Format, Metric}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). -spec clean_up(Pid :: pid()) -> ok. clean_up(Pid) when is_pid(Pid) -> gen_server:call(Pid, clean_up, 5000). -spec config_equals(any(), any()) -> boolean(). config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> true; config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> true; config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> true; config_equals(_, _) -> false.