diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 74f18c9..633a746 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -60,9 +60,12 @@ %%%% 主动推送的消息类型子分类, 需要返回值 -define(PUSH_DEPLOY, 16#01). --define(PUSH_SERVICE_CONFIG, 16#02). --define(PUSH_INVOKE, 16#03). --define(PUSH_TASK_LOG, 16#04). +-define(PUSH_START_SERVICE, 16#02). +-define(PUSH_STOP_SERVICE, 16#03). + +-define(PUSH_SERVICE_CONFIG, 16#04). +-define(PUSH_INVOKE, 16#05). +-define(PUSH_TASK_LOG, 16#05). %% 缓存数据库表 -record(kv, { diff --git a/apps/iot/src/http/service_handler.erl b/apps/iot/src/http/service_handler.erl index da1ce50..295533c 100644 --- a/apps/iot/src/http/service_handler.erl +++ b/apps/iot/src/http/service_handler.erl @@ -73,6 +73,7 @@ handle_request("POST", "/service/delete_config", _, #{<<"service_id">> := Servic {ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)} end; +%% 部署微服务 handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> @@ -80,7 +81,26 @@ handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; Pid when is_pid(Pid) -> - case iot_host:async_deploy(Pid, TaskId, ServiceId, TarUrl) of + case iot_host:deploy_service(Pid, TaskId, ServiceId, TarUrl) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +%% 启动服务 +handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:deploy_service(Pid, TaskId, ServiceId, TarUrl) of {ok, Ref} -> case iot_host:await_reply(Ref, 5000) of {ok, Result} -> @@ -101,7 +121,7 @@ handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id {ok, 200, iot_util:json_error(404, <<"host not found">>)}; Pid when is_pid(Pid) -> Timeout = Timeout0 * 1000, - case iot_host:async_invoke(Pid, ServiceId, Payload, Timeout) of + case iot_host:invoke_service(Pid, ServiceId, Payload, Timeout) of {ok, Ref} -> case iot_host:await_reply(Ref, Timeout) of {ok, Result} -> @@ -119,7 +139,7 @@ handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id" undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; Pid when is_pid(Pid) -> - case iot_host:async_task_log(Pid, TaskId) of + case iot_host:task_log(Pid, TaskId) of {ok, Ref} -> case iot_host:await_reply(Ref, 5000) of {ok, Result} -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index f124829..d631030 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -25,7 +25,7 @@ -export([get_metric/1, get_status/1]). %% 通讯相关 -export([pub/3, attach_channel/2, command/3]). --export([async_deploy/4, async_invoke/4, async_service_config/4, async_task_log/2, await_reply/2]). +-export([deploy_service/4, start_service/2, stop_service/2, invoke_service/4, async_service_config/4, task_log/2, await_reply/2]). %% 设备管理 -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -94,18 +94,26 @@ async_service_config(Pid, ServiceId, ConfigJson, Timeout) when is_pid(Pid), is_b ConfigBin = message_pb:encode_msg(#push_service_config{service_id = ServiceId, config_json = ConfigJson, timeout = Timeout}), gen_statem:call(Pid, {async_call, self(), ?PUSH_SERVICE_CONFIG, ConfigBin}). --spec async_deploy(Pid :: pid(), TaskId :: integer(), ServiceId :: binary(), TarUrl :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -async_deploy(Pid, TaskId, ServiceId, TarUrl) when is_pid(Pid), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> +-spec deploy_service(Pid :: pid(), TaskId :: integer(), ServiceId :: binary(), TarUrl :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +deploy_service(Pid, TaskId, ServiceId, TarUrl) when is_pid(Pid), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> PushBin = message_pb:encode_msg(#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl}), gen_statem:call(Pid, {async_call, self(), ?PUSH_DEPLOY, PushBin}). --spec async_invoke(Pid :: pid(), ServiceId :: binary(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -async_invoke(Pid, ServiceId, Payload, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout) -> +-spec start_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +start_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) -> + gen_statem:call(Pid, {async_call, self(), ?PUSH_START_SERVICE, ServiceId}). + +-spec stop_service(Pid :: pid(), ServiceId :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +stop_service(Pid, ServiceId) when is_pid(Pid), is_binary(ServiceId) -> + gen_statem:call(Pid, {async_call, self(), ?PUSH_STOP_SERVICE, ServiceId}). + +-spec invoke_service(Pid :: pid(), ServiceId :: binary(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +invoke_service(Pid, ServiceId, Payload, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout) -> InvokeBin = message_pb:encode_msg(#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}), gen_statem:call(Pid, {async_call, self(), ?PUSH_INVOKE, InvokeBin}). --spec async_task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. -async_task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) -> +-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) -> TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}), gen_statem:call(Pid, {async_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).