%%%------------------------------------------------------------------- %%% @author aresei %%% @copyright (C) 2023, %%% @doc %%% %%% @end %%% Created : 06. 7月 2023 12:02 %%%------------------------------------------------------------------- -module(endpoint). -include("endpoint.hrl"). %% API -export([start_link/1]). -export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]). %%%=================================================================== %%% API %%%=================================================================== -spec start_link(Endpoint :: #endpoint{}) -> {'ok', pid()} | 'ignore' | {'error', term()}. start_link(Endpoint = #endpoint{id = Id, config = #http_endpoint{}}) -> Name = get_name(Id), endpoint_http:start_link(Name, Endpoint); start_link(Endpoint = #endpoint{id = Id, config = #mqtt_endpoint{}}) -> Name = get_name(Id), endpoint_mqtt:start_link(Name, Endpoint); start_link(Endpoint = #endpoint{id = Id, config = #mysql_endpoint{}}) -> Name = get_name(Id), endpoint_mysql:start_link(Name, Endpoint). -spec get_name(Id :: integer()) -> atom(). get_name(Id) when is_integer(Id) -> list_to_atom("endpoint:" ++ integer_to_list(Id)). -spec get_pid(Id :: integer()) -> undefined | pid(). get_pid(Id) when is_integer(Id) -> whereis(get_name(Id)). -spec forward(Pid :: undefined | pid(), LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). forward(undefined, _, _, _) -> ok; forward(Pid, LocationCode, Fields, Timestamp) when is_pid(Pid), is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> gen_server:cast(Pid, {forward, LocationCode, Fields, Timestamp}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). -spec clean_up(Pid :: pid()) -> ok. clean_up(Pid) when is_pid(Pid) -> gen_server:call(Pid, clean_up, 5000). -spec config_equals(any(), any()) -> boolean(). config_equals(#http_endpoint{url = Url}, #http_endpoint{url = Url}) -> true; config_equals(#kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}, #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic}) -> true; config_equals(#mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}, #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password, topic = Topic, qos = Qos}) -> true; config_equals(_, _) -> false.