From 5c9d069330139ea4b46c395a3aa6620e55acb602 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Mon, 18 Aug 2025 14:16:10 +0800 Subject: [PATCH] remove mysql endpoint --- apps/iot/include/endpoint.hrl | 12 +----------- .../iot/src/{endpoint => database}/endpoint_bo.erl | 9 --------- apps/iot/src/endpoint/endpoint.erl | 14 +++++--------- apps/iot/src/endpoint/endpoint_http.erl | 8 ++++---- apps/iot/src/endpoint/endpoint_mqtt.erl | 4 ++-- apps/iot/src/iot_host.erl | 4 ++-- .../iot/src/endpoint => backup}/endpoint_mysql.erl | 8 ++++---- 7 files changed, 18 insertions(+), 41 deletions(-) rename apps/iot/src/{endpoint => database}/endpoint_bo.erl (86%) rename {apps/iot/src/endpoint => backup}/endpoint_mysql.erl (95%) diff --git a/apps/iot/include/endpoint.hrl b/apps/iot/include/endpoint.hrl index 1825578..b94405f 100644 --- a/apps/iot/include/endpoint.hrl +++ b/apps/iot/include/endpoint.hrl @@ -30,16 +30,6 @@ topic = <<>> :: binary() }). --record(mysql_endpoint, { - host = <<>> :: binary(), - port = 0 :: integer(), - username = <<>> :: binary(), - password = <<>> :: binary(), - database = <<>> :: binary(), - table_name = <<>> :: binary(), - fields_map = #{} :: map() -}). - -record(endpoint, { id :: integer(), %% 全局唯一,在路由规则中通过名称来指定 @@ -47,7 +37,7 @@ %% 标题描述 title = <<>> :: binary(), %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} - config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{} | #mysql_endpoint{}, + config = #http_endpoint{} :: #http_endpoint{} | #mqtt_endpoint{} | #kafka_endpoint{}, status = 0, %% 更新时间 updated_at = 0 :: integer(), diff --git a/apps/iot/src/endpoint/endpoint_bo.erl b/apps/iot/src/database/endpoint_bo.erl similarity index 86% rename from apps/iot/src/endpoint/endpoint_bo.erl rename to apps/iot/src/database/endpoint_bo.erl index 535d86b..8514147 100644 --- a/apps/iot/src/endpoint/endpoint_bo.erl +++ b/apps/iot/src/database/endpoint_bo.erl @@ -45,15 +45,6 @@ endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"t error end. -parse_config(<<"mysql">>, #{<<"host">> := Host, <<"port">> := Port, <<"username">> := Username, <<"password">> := Password, <<"database">> := Database, <<"table_name">> := TableName}) -> - #mysql_endpoint{ - host = Host, - port = Port, - username = Username, - password = Password, - database = Database, - table_name = TableName - }; parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> #mqtt_endpoint{ host = Host, diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index da0c9a3..d7b7c76 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -12,7 +12,7 @@ %% API -export([start_link/1]). --export([get_name/1, get_pid/1, forward/4, reload/2, clean_up/1]). +-export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). -export([get_alias_pid/1]). -export([config_equals/2]). @@ -28,11 +28,7 @@ start_link(Endpoint = #endpoint{id = Id, name = Name, config = #http_endpoint{}} start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mqtt_endpoint{}}) -> LocalName = get_name(Id), AliasName = get_alias_name(Name), - endpoint_mqtt:start_link(LocalName, AliasName, Endpoint); -start_link(Endpoint = #endpoint{id = Id, name = Name, config = #mysql_endpoint{}}) -> - LocalName = get_name(Id), - AliasName = get_alias_name(Name), - endpoint_mysql:start_link(LocalName, AliasName, Endpoint). + endpoint_mqtt:start_link(LocalName, AliasName, Endpoint). -spec get_name(Id :: integer()) -> atom(). get_name(Id) when is_integer(Id) -> @@ -50,9 +46,9 @@ get_alias_name(Name) when is_binary(Name) -> get_alias_pid(Name) when is_binary(Name) -> iot_name_server:whereis_alias(get_alias_name(Name)). --spec forward(Pid :: pid(), ServiceId :: binary(), Format :: binary(), Metric :: binary()) -> no_return(). -forward(Pid, ServiceId, Format, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Format), is_binary(Metric) -> - gen_server:cast(Pid, {forward, ServiceId, Format, Metric}). +-spec forward(Pid :: pid(), ServiceId :: binary(), Metric :: binary()) -> no_return(). +forward(Pid, ServiceId, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Metric) -> + gen_server:cast(Pid, {forward, ServiceId, Metric}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). diff --git a/apps/iot/src/endpoint/endpoint_http.erl b/apps/iot/src/endpoint/endpoint_http.erl index 906536a..536a87b 100644 --- a/apps/iot/src/endpoint/endpoint_http.erl +++ b/apps/iot/src/endpoint/endpoint_http.erl @@ -69,8 +69,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Format, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Format, Metric}, Buffer), +handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), {noreply, State#state{buffer = NBuffer}}; handle_cast(cleanup, State = #state{buffer = Buffer}) -> @@ -83,9 +83,9 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({next_data, Id, {ServiceId, Format, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> +handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> Headers = [ - {<<"Content-Type">>, <<"text/", Format/binary>>}, + {<<"Content-Type">>, <<"application/octet-stream">>}, {<<"Service-Id">>, ServiceId} ], case hackney:request(post, Url, Headers, Metric) of diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index 86a315f..d501921 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -127,11 +127,11 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {keep_state, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, {ServiceId, Format, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, +handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]), - lager:debug("[mqtt_postman] will publish topic: ~p, format: ~p, metric: ~p, qos: ~p", [Topic, Format, Metric, Qos]), + lager:debug("[mqtt_postman] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of ok -> NBuffer = endpoint_buffer:ack(Id, Buffer), diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index eedef03..eea40d5 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -338,7 +338,7 @@ handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #stat end; %% todo -handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, format = Format, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) -> +handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) -> lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]), case DeviceUUID =/= <<"">> of true -> @@ -354,7 +354,7 @@ handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = D undefined -> ok; EndpointPid -> - endpoint:forward(EndpointPid, ServiceId, Format, Metric) + endpoint:forward(EndpointPid, ServiceId, Metric) end, NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE), {keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}}; diff --git a/apps/iot/src/endpoint/endpoint_mysql.erl b/backup/endpoint_mysql.erl similarity index 95% rename from apps/iot/src/endpoint/endpoint_mysql.erl rename to backup/endpoint_mysql.erl index 45812b6..c9b8a93 100644 --- a/apps/iot/src/endpoint/endpoint_mysql.erl +++ b/backup/endpoint_mysql.erl @@ -127,10 +127,10 @@ handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED, handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {noreply, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, ServiceId, Format, Metric}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer, +handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer, endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table, fields_map = FieldsMap}}}) -> - case insert_sql(Table, ServiceId, Format, FieldsMap, Metric) of + case insert_sql(Table, ServiceId, FieldsMap, Metric) of {ok, InsertSql, Values} -> case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of ok -> @@ -182,9 +182,9 @@ code_change(_OldVsn, State = #state{}, _Extra) -> retry_connect() -> erlang:start_timer(?RETRY_INTERVAL, self(), create_postman). --spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> +-spec insert_sql(Table :: binary(), ServiceId :: binary(), FieldsMap :: map(), Metric :: binary()) -> error | {ok, Sql :: binary(), Values :: list()}. -insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) -> +insert_sql(Table, ServiceId, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) -> case line_format:parse(Metric) of error -> error;