diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index ccd9e99..11e3c2d 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -178,8 +178,6 @@ retry_connect() -> check_produce_result(ok) -> true; -check_produce_result({ok, _}) -> - true; check_produce_result({ok, _}) -> false. diff --git a/apps/iot/src/http_handlers/container_handler.erl b/apps/iot/src/http_handlers/container_handler.erl index d4e91ec..fa1439e 100644 --- a/apps/iot/src/http_handlers/container_handler.erl +++ b/apps/iot/src/http_handlers/container_handler.erl @@ -15,17 +15,17 @@ %% 下发config.json, 微服务接受后,保存服务配置 handle_request("POST", "/container/push_config", _, - #{<<"uuid">> := UUID, <<"container_name">> := ContainerName, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) - when is_binary(UUID), is_binary(ContainerName), is_binary(ConfigJson), is_integer(Timeout0) -> + #{<<"uuid">> := UUID, <<"container_name">> := ContainerName, <<"config">> := Config, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ContainerName), is_map(Config), is_integer(Timeout0) -> %% 检查ConfigJson是否是合法的json字符串 - true = iot_util:is_json(ConfigJson), + true = iot_util:is_json(Config), case iot_host:get_pid(UUID) of undefined -> {ok, 200, iot_util:json_error(-1, <<"host not found">>)}; Pid when is_pid(Pid) -> Timeout = Timeout0 * 1000, - case iot_host:config_container(Pid, ContainerName, ConfigJson) of + case iot_host:config_container(Pid, ContainerName, Config) of {ok, Ref} -> case iot_host:await_reply(Ref, Timeout) of {ok, Result} -> @@ -40,13 +40,14 @@ handle_request("POST", "/container/push_config", _, %% 部署微服务 handle_request("POST", "/container/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"config">> := Config}) - when is_binary(UUID), is_integer(TaskId), is_binary(Config) -> + when is_binary(UUID), is_integer(TaskId), is_map(Config) -> case iot_host:get_pid(UUID) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; Pid when is_pid(Pid) -> - case iot_host:deploy_container(Pid, TaskId, Config) of + ConfigBin = jiffy:encode(Config, [force_utf8]), + case iot_host:deploy_container(Pid, TaskId, ConfigBin) of {ok, Ref} -> case iot_host:await_reply(Ref, 5000) of {ok, Result} -> diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index c0f0cfe..3689a34 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -19,7 +19,7 @@ start(_StartType, _StartArgs) -> start_http_server(), %% 启动tcp服务 - tcp_server:start(), + start_tcp_server(), iot_sup:start_link(). @@ -63,6 +63,30 @@ start_http_server() -> lager:debug("[http_server] the http server start at: ~p, pid is: ~p", [Port, Pid]). +%% 启动tcp服务 +start_tcp_server() -> + {ok, Props} = application:get_env(iot, tcp_server), + Acceptors = proplists:get_value(acceptors, Props, 50), + MaxConnections = proplists:get_value(max_connections, Props, 10240), + Backlog = proplists:get_value(backlog, Props, 1024), + Port = proplists:get_value(port, Props), + + TransOpts = [ + {tcp_options, [ + binary, + {reuseaddr, true}, + {active, false}, + {packet, 4}, + {nodelay, false}, + {backlog, Backlog} + ]}, + {acceptors, Acceptors}, + {max_connections, MaxConnections} + ], + {ok, _} = esockd:open('iot/tcp_server', Port, TransOpts, {tcp_channel, start_link, []}), + + lager:debug("[iot_app] the tcp server start at: ~p", [Port]). + -spec ensure_mnesia_schema() -> any(). ensure_mnesia_schema() -> case mnesia:system_info(use_dir) of diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 36767b7..b683927 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -97,7 +97,7 @@ config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(Con -spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_binary(Config) -> EncDeployBin = message_codec:encode(?MESSAGE_RPC_DEPLOY, #rpc_deploy{task_id = TaskId, config = Config}), - gen_statem:call(Pid, {rpc_call, self(), ?PUSH_DEPLOY, EncDeployBin}). + gen_statem:call(Pid, {rpc_call, self(), EncDeployBin}). -spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) -> diff --git a/apps/iot/src/tcp/tcp_server.erl b/apps/iot/src/tcp/tcp_server.erl deleted file mode 100644 index 95ec474..0000000 --- a/apps/iot/src/tcp/tcp_server.erl +++ /dev/null @@ -1,37 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 08. 5月 2025 12:58 -%%%------------------------------------------------------------------- --module(tcp_server). --author("anlicheng"). - -%% API --export([start/0]). - -%% 启动tcp服务 -start() -> - {ok, Props} = application:get_env(iot, tcp_server), - Acceptors = proplists:get_value(acceptors, Props, 50), - MaxConnections = proplists:get_value(max_connections, Props, 10240), - Backlog = proplists:get_value(backlog, Props, 1024), - Port = proplists:get_value(port, Props), - - TransOpts = [ - {tcp_options, [ - binary, - {reuseaddr, true}, - {active, false}, - {packet, 4}, - {nodelay, false}, - {backlog, Backlog} - ]}, - {acceptors, Acceptors}, - {max_connections, MaxConnections} - ], - {ok, _} = esockd:open('iot/tcp_server', Port, TransOpts, {tcp_channel, start_link, []}), - - lager:debug("[iot_app] the tcp server start at: ~p", [Port]). \ No newline at end of file