From 9418ae70bd5ba3e9944ec608750db97025a1b2a6 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Tue, 20 May 2025 11:49:01 +0800 Subject: [PATCH] fix service --- apps/iot/include/iot.hrl | 3 + apps/iot/src/http/host_handler.erl | 119 ---------------------- apps/iot/src/http/http_server.erl | 1 + apps/iot/src/http/service_handler.erl | 141 ++++++++++++++++++++++++++ apps/iot/src/iot_app.erl | 19 +++- apps/iot/src/iot_host.erl | 3 +- apps/iot/src/redis/redis_client.erl | 1 - apps/iot/src/tcp/tcp_channel.erl | 10 +- config/sys-dev.config | 5 +- 9 files changed, 174 insertions(+), 128 deletions(-) create mode 100644 apps/iot/src/http/service_handler.erl diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 377630d..74f18c9 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -37,6 +37,9 @@ -define(PACKET_ASYNC_CALL, 16#05). -define(PACKET_ASYNC_CALL_REPLY, 16#06). +%% ping包,客户端主动发起 +-define(PACKET_PING, 16#FF). + %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%% 二级分类定义 %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% diff --git a/apps/iot/src/http/host_handler.erl b/apps/iot/src/http/host_handler.erl index dfa6a21..9b0358b 100644 --- a/apps/iot/src/http/host_handler.erl +++ b/apps/iot/src/http/host_handler.erl @@ -67,125 +67,6 @@ handle_request("POST", "/host/delete", _, #{<<"uuid">> := UUID}) when is_binary( {ok, 200, iot_util:json_error(404, <<"error">>)} end; -%% 保存服务配置 -handle_request("POST", "/host/set_service_config", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"last_edit_user">> := LastEditUser}) - when is_binary(ServiceId), is_binary(ConfigJson), is_integer(LastEditUser) -> - lager:debug("[service_handler] set_service_config service_id: ~p, config_json: ~p, last_edit_user:~p", [ServiceId, ConfigJson, LastEditUser]), - - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]), - {ok, 200, iot_util:json_error(-1, <<"set service config failed">>)} - end - end; - -%% 下发config.json -handle_request("POST", "/host/push_service_config", _, - PostParams = #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) - when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> - - lager:debug("[http_host_handler] async_service_config body is: ~p", [PostParams]), - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - Timeout = Timeout0 * 1000, - case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of - {ok, Ref} -> - case iot_host:await_reply(Ref, Timeout) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -handle_request("GET", "/host/get_service_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) -> - case service_config_model:get_config(ServiceId) of - error -> - {ok, 200, iot_util:json_error(-1, <<"service config not found">>)}; - {ok, Config} -> - {ok, 200, iot_util:json_data(service_config_model:as_map(Config))} - end; - -%% 删除对应的主机信息 -handle_request("POST", "/host/delete_service_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) -> - case service_config_model:delete(ServiceId) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]), - {ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)} - end; - -handle_request("POST", "/host/async_deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) - when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> - - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case iot_host:async_deploy(Pid, TaskId, ServiceId, TarUrl) of - {ok, Ref} -> - case iot_host:await_reply(Ref, 5000) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -handle_request("POST", "/host/async_invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0}) - when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) -> - - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - Timeout = Timeout0 * 1000, - case iot_host:async_invoke(Pid, ServiceId, Payload, Timeout) of - {ok, Ref} -> - case iot_host:await_reply(Ref, Timeout) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -handle_request("POST", "/host/async_task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case iot_host:async_task_log(Pid, TaskId) of - {ok, Ref} -> - case iot_host:await_reply(Ref, 5000) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - %% 处理主机的授权的激活 handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := true}) when is_binary(UUID) -> case iot_host_sup:ensured_host_started(UUID) of diff --git a/apps/iot/src/http/http_server.erl b/apps/iot/src/http/http_server.erl index f45413e..0f1c535 100644 --- a/apps/iot/src/http/http_server.erl +++ b/apps/iot/src/http/http_server.erl @@ -24,6 +24,7 @@ start() -> Dispatcher = cowboy_router:compile([ {'_', [ {"/host/[...]", ?MODULE, [host_handler]}, + {"/service/[...]", ?MODULE, [service_handler]}, {"/device/[...]", ?MODULE, [device_handler]} ]} ]), diff --git a/apps/iot/src/http/service_handler.erl b/apps/iot/src/http/service_handler.erl new file mode 100644 index 0000000..da1ce50 --- /dev/null +++ b/apps/iot/src/http/service_handler.erl @@ -0,0 +1,141 @@ +%%%------------------------------------------------------------------- +%%% @author licheng5 +%%% @copyright (C) 2020, +%%% @doc +%%% +%%% @end +%%% Created : 26. 4月 2020 3:36 下午 +%%%------------------------------------------------------------------- +-module(service_handler). +-author("licheng5"). +-include("iot.hrl"). + +%% API +-export([handle_request/4]). + +%% 保存服务配置 +handle_request("POST", "/service/set_config", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"last_edit_user">> := LastEditUser}) + when is_binary(ServiceId), is_binary(ConfigJson), is_integer(LastEditUser) -> + lager:debug("[service_handler] set_service_config service_id: ~p, config_json: ~p, last_edit_user:~p", [ServiceId, ConfigJson, LastEditUser]), + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]), + {ok, 200, iot_util:json_error(-1, <<"set service config failed">>)} + end + end; + +%% 下发config.json +handle_request("POST", "/service/push_config", _, + PostParams = #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> + + lager:debug("[http_host_handler] async_service_config body is: ~p", [PostParams]), + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Timeout = Timeout0 * 1000, + case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("GET", "/service/get_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) -> + case service_config_model:get_config(ServiceId) of + error -> + {ok, 200, iot_util:json_error(-1, <<"service config not found">>)}; + {ok, Config} -> + {ok, 200, iot_util:json_data(service_config_model:as_map(Config))} + end; + +%% 删除对应的主机信息 +handle_request("POST", "/service/delete_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) -> + case service_config_model:delete(ServiceId) of + ok -> + {ok, 200, iot_util:json_data(<<"success">>)}; + {error, Reason} -> + lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]), + {ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)} + end; + +handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) + when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:async_deploy(Pid, TaskId, ServiceId, TarUrl) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) -> + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Timeout = Timeout0 * 1000, + case iot_host:async_invoke(Pid, ServiceId, Payload, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:async_task_log(Pid, TaskId) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request(_, Path, _, _) -> + Path1 = list_to_binary(Path), + {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. + +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% +%% helper methods +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index f1776da..0f32755 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -30,10 +30,27 @@ stop(_State) -> %% 启动内存数据库 start_mnesia() -> + ok = ensure_mnesia_schema(), %% 启动数据库 ok = mnesia:start(), Tables = mnesia:system_info(tables), lager:debug("[iot_app] tables: ~p", [Tables]), %% 创建数据库表 not lists:member(service_config, Tables) andalso service_config_model:create_table(), - ok. \ No newline at end of file + ok. + +-spec ensure_mnesia_schema() -> any(). +ensure_mnesia_schema() -> + case mnesia:system_info(use_dir) of + true -> + ok; + false -> + mnesia:stop(), + case mnesia:create_schema([node()]) of + ok -> ok; + {error, {_, {already_exists, _}}} -> ok; + Error -> + lager:debug("[iot_app] create mnesia schema failed with error: ~p", [Error]), + throw({init_schema, Error}) + end + end. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 3cb8c64..f124829 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -453,7 +453,8 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> <<"name">> => <<"主机状态"/utf8>>, <<"timestamp">> => Timestamp }], - iot_router:route_uuid(UUID, FieldsList, Timestamp), + %% todo 这里需要实现新的机制 + % iot_router:route_uuid(UUID, FieldsList, Timestamp), lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]). %% 将当前的state转换成map diff --git a/apps/iot/src/redis/redis_client.erl b/apps/iot/src/redis/redis_client.erl index ea29396..cfbddac 100755 --- a/apps/iot/src/redis/redis_client.erl +++ b/apps/iot/src/redis/redis_client.erl @@ -31,7 +31,6 @@ hgetall(Key) when is_binary(Key) -> end end). - to_map(Items) when is_list(Items), length(Items) rem 2 == 0 -> to_map(Items, #{}). to_map([], Target) -> diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index be2544c..f859ae6 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -112,20 +112,20 @@ handle_info({tcp, Socket, <>}), - Transport:send(Socket, <>), + Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = Reason}), - Transport:send(Socket, <>), + Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 2, message = Reason}), - Transport:send(Socket, <>), + Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {stop, State} @@ -174,6 +174,10 @@ handle_info({tcp, Socket, <>}, State = #state{socket = Socket}) -> + {noreply, State}; + handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) -> lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]), {stop, normal, State}; diff --git a/config/sys-dev.config b/config/sys-dev.config index 07da233..29bdc1c 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -54,8 +54,7 @@ {connect_mode, synchronous}, {keep_alive, true}, {password, "r3a-7Qrh#3Q"}, - {database, "nannong"}, - {queries, [<<"set names utf8">>]} + {database, "nannong_demo"} ] }, @@ -63,7 +62,7 @@ {redis_pool, [{size, 10}, {max_overflow, 20}, {worker_module, eredis}], [ - {host, "39.98.184.67"}, + {host, "127.0.0.1"}, {port, 26379}, {database, 1} ]