From 4495fd07740bd87656a9320f8b1d741c35652116 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 16 Aug 2025 23:50:51 +0800 Subject: [PATCH] fix service config --- apps/iot/src/database/micro_service_bo.erl | 16 ++ apps/iot/src/endpoint/endpoint_mysql.erl | 9 +- .../iot/src/http_handlers/service_handler.erl | 30 +-- apps/iot/src/iot_app.erl | 1 - apps/iot/src/tcp/tcp_channel.erl | 4 +- .../service_config_model.erl | 0 backup/service_handler.erl | 176 ++++++++++++++++++ 7 files changed, 202 insertions(+), 34 deletions(-) create mode 100644 apps/iot/src/database/micro_service_bo.erl rename {apps/iot/src/mnesia => backup}/service_config_model.erl (100%) create mode 100644 backup/service_handler.erl diff --git a/apps/iot/src/database/micro_service_bo.erl b/apps/iot/src/database/micro_service_bo.erl new file mode 100644 index 0000000..8f54008 --- /dev/null +++ b/apps/iot/src/database/micro_service_bo.erl @@ -0,0 +1,16 @@ +%%%------------------------------------------------------------------- +%%% @author aresei +%%% @copyright (C) 2023, +%%% @doc +%%% +%%% @end +%%% Created : 16. 5月 2023 12:48 +%%%------------------------------------------------------------------- +-module(micro_service_bo). +-author("aresei"). +-export([get_service_config/1]). + +%% API +-spec get_service_config(ServiceId :: binary()) -> {ok, DeviceInfo :: map()} | undefined. +get_service_config(ServiceId) when is_binary(ServiceId) -> + mysql_pool:get_row(mysql_iot, <<"SELECT * FROM micro_service WHERE id = ? LIMIT 1">>, [ServiceId]). \ No newline at end of file diff --git a/apps/iot/src/endpoint/endpoint_mysql.erl b/apps/iot/src/endpoint/endpoint_mysql.erl index 9ec12aa..077f43c 100644 --- a/apps/iot/src/endpoint/endpoint_mysql.erl +++ b/apps/iot/src/endpoint/endpoint_mysql.erl @@ -114,11 +114,11 @@ handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED, NBuffer = endpoint_buffer:trigger_n(Buffer), {noreply, State#state{pool_pid = PoolPid, buffer = NBuffer, status = ?CONNECTED}}; ignore -> - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + retry_connect(), {noreply, State}; {error, Reason} -> lager:warning("[mqtt_postman] start connect pool, get error: ~p", [Reason]), - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + retry_connect(), {noreply, State} end; @@ -142,7 +142,7 @@ handle_info({next_data, Id, ServiceId, Format, Metric}, State = #state{status = %% postman进程挂掉时,重新建立新的 handle_info({'EXIT', PoolPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, pool_pid = PoolPid}) -> lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + retry_connect(), {noreply, disconnected, State#state{pool_pid = undefined, status = ?DISCONNECTED}}; handle_info(Info, State = #state{status = Status}) -> @@ -173,6 +173,9 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +retry_connect() -> + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman). + -spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> {ok, Sql :: binary(), Values :: list()}. insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) -> case line_format:parse(Metric) of diff --git a/apps/iot/src/http_handlers/service_handler.erl b/apps/iot/src/http_handlers/service_handler.erl index f37bb55..a2a80a7 100644 --- a/apps/iot/src/http_handlers/service_handler.erl +++ b/apps/iot/src/http_handlers/service_handler.erl @@ -15,7 +15,7 @@ %% 下发config.json, 微服务接受后,保存服务配置 handle_request("POST", "/service/push_config", _, - #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"last_edit_user">> := LastEditUser, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) + #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> %% 检查ConfigJson是否是合法的json字符串 @@ -30,14 +30,7 @@ handle_request("POST", "/service/push_config", _, {ok, Ref} -> case iot_host:await_reply(Ref, Timeout) of {ok, Result} -> - %% 更新配置信息到数据库 - case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of - ok -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]), - {ok, 200, iot_util:json_error(-1, <<"set service config failed">>)} - end; + {ok, 200, iot_util:json_data(Result)}; {error, Reason} -> {ok, 200, iot_util:json_error(-1, Reason)} end; @@ -49,25 +42,6 @@ handle_request("POST", "/service/push_config", _, {ok, 200, iot_util:json_error(-1, <<"config is invalid json">>)} end; -%% 获取服务配置信息 -handle_request("GET", "/service/get_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) -> - case service_config_model:get_config(ServiceId) of - error -> - {ok, 200, iot_util:json_error(-1, <<"service config not found">>)}; - {ok, Config} -> - {ok, 200, iot_util:json_data(service_config_model:as_map(Config))} - end; - -%% 删除对应的主机信息 -handle_request("POST", "/service/delete_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) -> - case service_config_model:delete(ServiceId) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]), - {ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)} - end; - %% 部署微服务 handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 0f32755..4e65d50 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -36,7 +36,6 @@ start_mnesia() -> Tables = mnesia:system_info(tables), lager:debug("[iot_app] tables: ~p", [Tables]), %% 创建数据库表 - not lists:member(service_config, Tables) andalso service_config_model:create_table(), ok. -spec ensure_mnesia_schema() -> any(). diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 0cae4e9..64de10e 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -140,10 +140,10 @@ handle_info({tcp, Socket, <>}, State = #state{transport = Transport, socket = Socket}) -> lager:debug("[ws_channel] service_config request service_id: ~p", [ServiceId]), - case service_config_model:get_config(ServiceId) of + case micro_service_bo:get_service_config(ServiceId) of error -> Transport:send(Socket, <>); - {ok, #service_config{config_json = ConfigJson}} when is_binary(ConfigJson) -> + {ok, ConfigJson} when is_binary(ConfigJson) -> Transport:send(Socket, <>) end, {noreply, State}; diff --git a/apps/iot/src/mnesia/service_config_model.erl b/backup/service_config_model.erl similarity index 100% rename from apps/iot/src/mnesia/service_config_model.erl rename to backup/service_config_model.erl diff --git a/backup/service_handler.erl b/backup/service_handler.erl new file mode 100644 index 0000000..f37bb55 --- /dev/null +++ b/backup/service_handler.erl @@ -0,0 +1,176 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(service_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%% 下发config.json, 微服务接受后,保存服务配置 +handle_request("POST", "/service/push_config", _, + #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"last_edit_user">> := LastEditUser, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> + + %% 检查ConfigJson是否是合法的json字符串 + case iot_util:is_json(ConfigJson) of + true -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(-1, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Timeout = Timeout0 * 1000, + case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + %% 更新配置信息到数据库 + case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of + ok -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]), + {ok, 200, iot_util:json_error(-1, <<"set service config failed">>)} + end; + {error, Reason} -> + {ok, 200, iot_util:json_error(-1, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(-1, Reason)} + end + end; + false -> + {ok, 200, iot_util:json_error(-1, <<"config is invalid json">>)} + end; + +%% 获取服务配置信息 +handle_request("GET", "/service/get_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) -> + case service_config_model:get_config(ServiceId) of + error -> + {ok, 200, iot_util:json_error(-1, <<"service config not found">>)}; + {ok, Config} -> + {ok, 200, iot_util:json_data(service_config_model:as_map(Config))} + end; + +%% 删除对应的主机信息 +handle_request("POST", "/service/delete_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) -> + case service_config_model:delete(ServiceId) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]), + {ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)} + end; + +%% 部署微服务 +handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) + when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:deploy_service(Pid, TaskId, ServiceId, TarUrl) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +%% 启动服务 +handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:start_service(Pid, ServiceId) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +%% 停止服务 +handle_request("POST", "/service/stop", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:stop_service(Pid, ServiceId) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +%% 远程调用微服务, 返回值的格式为json +handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) -> + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Timeout = Timeout0 * 1000, + case iot_host:invoke_service(Pid, ServiceId, Payload, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:task_log(Pid, TaskId) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request(_, Path, _, _) -> + Path1 = list_to_binary(Path), + {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file