remove mysql endpoint
This commit is contained in:
parent
b1eac4ff16
commit
5c9d069330
@ -30,16 +30,6 @@
|
|||||||
topic = <<>> :: binary()
|
topic = <<>> :: binary()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
-record(mysql_endpoint, {
|
|
||||||
host = <<>> :: binary(),
|
|
||||||
port = 0 :: integer(),
|
|
||||||
username = <<>> :: binary(),
|
|
||||||
password = <<>> :: binary(),
|
|
||||||
database = <<>> :: binary(),
|
|
||||||
table_name = <<>> :: binary(),
|
|
||||||
fields_map = #{} :: map()
|
|
||||||
}).
|
|
||||||
|
|
||||||
-record(endpoint, {
|
-record(endpoint, {
|
||||||
id :: integer(),
|
id :: integer(),
|
||||||
%% 全局唯一,在路由规则中通过名称来指定
|
%% 全局唯一,在路由规则中通过名称来指定
|
||||||
@ -47,7 +37,7 @@
|
|||||||
%% 标题描述
|
%% 标题描述
|
||||||
title = <<>> :: binary(),
|
title = <<>> :: binary(),
|
||||||
%% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
|
%% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
|
||||||
config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{},
|
config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{},
|
||||||
status = 0,
|
status = 0,
|
||||||
%% 更新时间
|
%% 更新时间
|
||||||
updated_at = 0 :: integer(),
|
updated_at = 0 :: integer(),
|
||||||
|
|||||||
@ -45,15 +45,6 @@ endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"t
|
|||||||
error
|
error
|
||||||
end.
|
end.
|
||||||
|
|
||||||
parse_config(<<"mysql">>, #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}) ->
|
|
||||||
#mysql_endpoint{
|
|
||||||
host = Host,
|
|
||||||
port = Port,
|
|
||||||
username = Username,
|
|
||||||
password = Password,
|
|
||||||
database = Database,
|
|
||||||
table_name = TableName
|
|
||||||
};
|
|
||||||
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
|
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
|
||||||
#mqtt_endpoint{
|
#mqtt_endpoint{
|
||||||
host = Host,
|
host = Host,
|
||||||
@ -12,7 +12,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/1]).
|
-export([start_link/1]).
|
||||||
-export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]).
|
-export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]).
|
||||||
-export([get_alias_pid/1]).
|
-export([get_alias_pid/1]).
|
||||||
-export([config_equals/2]).
|
-export([config_equals/2]).
|
||||||
|
|
||||||
@ -28,11 +28,7 @@ start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}}
|
|||||||
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) ->
|
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) ->
|
||||||
LocalName = get_name(Id),
|
LocalName = get_name(Id),
|
||||||
AliasName = get_alias_name(Name),
|
AliasName = get_alias_name(Name),
|
||||||
endpoint_mqtt:start_link(LocalName, AliasName, Endpoint);
|
endpoint_mqtt:start_link(LocalName, AliasName, Endpoint).
|
||||||
start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mysql_endpoint{}}) ->
|
|
||||||
LocalName = get_name(Id),
|
|
||||||
AliasName = get_alias_name(Name),
|
|
||||||
endpoint_mysql:start_link(LocalName, AliasName, Endpoint).
|
|
||||||
|
|
||||||
-spec get_name(Id :: integer()) -> atom().
|
-spec get_name(Id :: integer()) -> atom().
|
||||||
get_name(Id) when is_integer(Id) ->
|
get_name(Id) when is_integer(Id) ->
|
||||||
@ -50,9 +46,9 @@ get_alias_name(Name) when is_binary(Name) ->
|
|||||||
get_alias_pid(Name) when is_binary(Name) ->
|
get_alias_pid(Name) when is_binary(Name) ->
|
||||||
iot_name_server:whereis_alias(get_alias_name(Name)).
|
iot_name_server:whereis_alias(get_alias_name(Name)).
|
||||||
|
|
||||||
-spec forward(Pid :: pid(), ServiceId :: binary(), Format :: binary(), Metric :: binary()) -> no_return().
|
-spec forward(Pid :: pid(), ServiceId :: binary(), Metric :: binary()) -> no_return().
|
||||||
forward(Pid, ServiceId, Format, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Format), is_binary(Metric) ->
|
forward(Pid, ServiceId, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Metric) ->
|
||||||
gen_server:cast(Pid, {forward, ServiceId, Format, Metric}).
|
gen_server:cast(Pid, {forward, ServiceId, Metric}).
|
||||||
|
|
||||||
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
|
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
|
||||||
gen_statem:cast(Pid, {reload, NEndpoint}).
|
gen_statem:cast(Pid, {reload, NEndpoint}).
|
||||||
|
|||||||
@ -69,8 +69,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_cast({forward, ServiceId, Format, Metric}, State = #state{buffer = Buffer}) ->
|
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
|
||||||
NBuffer = endpoint_buffer:append({ServiceId, Format, Metric}, Buffer),
|
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer),
|
||||||
{noreply, State#state{buffer = NBuffer}};
|
{noreply, State#state{buffer = NBuffer}};
|
||||||
|
|
||||||
handle_cast(cleanup, State = #state{buffer = Buffer}) ->
|
handle_cast(cleanup, State = #state{buffer = Buffer}) ->
|
||||||
@ -83,9 +83,9 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({next_data, Id, {ServiceId, Format, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
|
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
|
||||||
Headers = [
|
Headers = [
|
||||||
{<<"Content-Type">>, <<"text/", Format/binary>>},
|
{<<"Content-Type">>, <<"application/octet-stream">>},
|
||||||
{<<"Service-Id">>, ServiceId}
|
{<<"Service-Id">>, ServiceId}
|
||||||
],
|
],
|
||||||
case hackney:request(post, Url, Headers, Metric) of
|
case hackney:request(post, Url, Headers, Metric) of
|
||||||
|
|||||||
@ -127,11 +127,11 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
|
|||||||
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
|
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
|
||||||
{keep_state, State};
|
{keep_state, State};
|
||||||
%% 发送数据到mqtt服务器
|
%% 发送数据到mqtt服务器
|
||||||
handle_info({next_data, Id, {ServiceId, Format, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
|
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
|
||||||
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
|
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
|
||||||
|
|
||||||
Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]),
|
Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]),
|
||||||
lager:debug("[mqtt_postman] will publish topic: ~p, format: ~p, metric: ~p, qos: ~p", [Topic, Format, Metric, Qos]),
|
lager:debug("[mqtt_postman] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]),
|
||||||
case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of
|
case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of
|
||||||
ok ->
|
ok ->
|
||||||
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
NBuffer = endpoint_buffer:ack(Id, Buffer),
|
||||||
|
|||||||
@ -338,7 +338,7 @@ handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #stat
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% todo
|
%% todo
|
||||||
handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, format = Format, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) ->
|
handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) ->
|
||||||
lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]),
|
lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]),
|
||||||
case DeviceUUID =/= <<"">> of
|
case DeviceUUID =/= <<"">> of
|
||||||
true ->
|
true ->
|
||||||
@ -354,7 +354,7 @@ handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = D
|
|||||||
undefined ->
|
undefined ->
|
||||||
ok;
|
ok;
|
||||||
EndpointPid ->
|
EndpointPid ->
|
||||||
endpoint:forward(EndpointPid, ServiceId, Format, Metric)
|
endpoint:forward(EndpointPid, ServiceId, Metric)
|
||||||
end,
|
end,
|
||||||
NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE),
|
NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE),
|
||||||
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}};
|
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}};
|
||||||
|
|||||||
@ -127,10 +127,10 @@ handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED,
|
|||||||
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
|
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
%% 发送数据到mqtt服务器
|
%% 发送数据到mqtt服务器
|
||||||
handle_info({next_data, Id, ServiceId, Format, Metric}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer,
|
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer,
|
||||||
endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table, fields_map = FieldsMap}}}) ->
|
endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table, fields_map = FieldsMap}}}) ->
|
||||||
|
|
||||||
case insert_sql(Table, ServiceId, Format, FieldsMap, Metric) of
|
case insert_sql(Table, ServiceId, FieldsMap, Metric) of
|
||||||
{ok, InsertSql, Values} ->
|
{ok, InsertSql, Values} ->
|
||||||
case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of
|
case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of
|
||||||
ok ->
|
ok ->
|
||||||
@ -182,9 +182,9 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
retry_connect() ->
|
retry_connect() ->
|
||||||
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman).
|
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman).
|
||||||
|
|
||||||
-spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) ->
|
-spec insert_sql(Table :: binary(), ServiceId :: binary(), FieldsMap :: map(), Metric :: binary()) ->
|
||||||
error | {ok, Sql :: binary(), Values :: list()}.
|
error | {ok, Sql :: binary(), Values :: list()}.
|
||||||
insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) ->
|
insert_sql(Table, ServiceId, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) ->
|
||||||
case line_format:parse(Metric) of
|
case line_format:parse(Metric) of
|
||||||
error ->
|
error ->
|
||||||
error;
|
error;
|
||||||
Loading…
x
Reference in New Issue
Block a user