diff --git a/apps/iot/src/http/host_handler.erl b/apps/iot/src/http/host_handler.erl index 39bbf4a..02a436a 100644 --- a/apps/iot/src/http/host_handler.erl +++ b/apps/iot/src/http/host_handler.erl @@ -67,47 +67,86 @@ handle_request("POST", "/host/delete", _, #{<<"uuid">> := UUID}) when is_binary( {ok, 200, iot_util:json_error(404, <<"error">>)} end; -%% 下发参数 -handle_request("POST", "/host/publish_command", _, - PostParams = #{<<"uuid">> := UUID, <<"command_type">> := CommandType, <<"task_id">> := TaskId, <<"timeout">> := Timeout, <<"params">> := Params}) - when is_binary(UUID), is_integer(TaskId), is_integer(Timeout), is_integer(CommandType) -> +%% 下发config.json +handle_request("POST", "/host/async_service_config", _, + PostParams = #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> - lager:debug("[http_host_handler] publish_command body is: ~p", [PostParams]), + lager:debug("[http_host_handler] async_service_config body is: ~p", [PostParams]), case iot_host:get_pid(UUID) of undefined -> {ok, 200, iot_util:json_error(404, <<"host not found">>)}; Pid when is_pid(Pid) -> - Reply = #{ - <<"t_id">> => integer_to_binary(TaskId), - <<"t">> => Timeout, - <<"ts">> => iot_util:current_time(), - <<"m">> => iolist_to_binary(jiffy:encode(Params, [force_utf8])) - }, - - Reply1 = append_service_name(PostParams, Reply), - lager:debug("[http_host_handler] publish message is: ~p", [Reply1]), - BinReply = iolist_to_binary(jiffy:encode(Reply1, [force_utf8])), - - case iot_host:publish_message(Pid, CommandType, {aes, BinReply}, Timeout * 1000) of - {error, timeout} -> - lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]), - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), - {ok, 200, iot_util:json_error(401, <<"命令执行超时, 请重试"/utf8>>)}; + Timeout = Timeout0 * 1000, + case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; {error, Reason} when is_binary(Reason) -> - task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), - {ok, 200, iot_util:json_error(400, Reason)}; - ok -> - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK), - {ok, 200, iot_util:json_data(<<"success">>)}; - {ok, Response} -> - case jiffy:decode(Response, [return_maps]) of - #{<<"code">> := 1} -> - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK), - {ok, 200, iot_util:json_data(<<"success">>)}; - #{<<"code">> := 0, <<"message">> := Message} when is_binary(Message) -> - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), - {ok, 200, iot_util:json_error(401, <<"操作失败: "/utf8, Message/binary>>)} - end + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("POST", "/host/async_deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) + when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:async_deploy(Pid, TaskId, ServiceId, TarUrl) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("POST", "/host/async_invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0}) + when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) -> + + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + Timeout = Timeout0 * 1000, + case iot_host:async_invoke(Pid, ServiceId, Payload, Timeout) of + {ok, Ref} -> + case iot_host:await_reply(Ref, Timeout) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} + end + end; + +handle_request("POST", "/host/async_task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> + case iot_host:get_pid(UUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"host not found">>)}; + Pid when is_pid(Pid) -> + case iot_host:async_task_log(Pid, TaskId) of + {ok, Ref} -> + case iot_host:await_reply(Ref, 5000) of + {ok, Result} -> + {ok, 200, iot_util:json_data(Result)}; + {error, Reason} -> + {ok, 200, iot_util:json_error(400, Reason)} + end; + {error, Reason} when is_binary(Reason) -> + {ok, 200, iot_util:json_error(400, Reason)} end end; @@ -143,10 +182,4 @@ handle_request(_, Path, _, _) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 追加service_name参数 -append_service_name(#{<<"service_name">> := ServiceName}, Reply) when is_binary(ServiceName), ServiceName =/= <<"">> -> - Reply#{<<"to">> => ServiceName}; -append_service_name(_, Reply) -> - Reply. \ No newline at end of file +%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 64a863b..3cb8c64 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -99,7 +99,7 @@ async_deploy(Pid, TaskId, ServiceId, TarUrl) when is_pid(Pid), is_integer(TaskId PushBin = message_pb:encode_msg(#deploy{task_id = TaskId, service_id = ServiceId, tar_url = TarUrl}), gen_statem:call(Pid, {async_call, self(), ?PUSH_DEPLOY, PushBin}). --spec async_invoke(Pid :: pid(), ServiceId :: integer(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. +-spec async_invoke(Pid :: pid(), ServiceId :: binary(), Payload :: binary(), Timeout :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}. async_invoke(Pid, ServiceId, Payload, Timeout) when is_pid(Pid), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout) -> InvokeBin = message_pb:encode_msg(#invoke{service_id = ServiceId, payload = Payload, timeout = Timeout}), gen_statem:call(Pid, {async_call, self(), ?PUSH_INVOKE, InvokeBin}). @@ -109,13 +109,15 @@ async_task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) -> TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}), gen_statem:call(Pid, {async_call, self(), ?PUSH_TASK_LOG, TaskLogBin}). --spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Reply :: #async_call_reply{}} | {error, timeout}. +-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}. await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> receive - {async_call_reply, Ref, Reply} -> - {ok, Reply} + {async_call_reply, Ref, #async_call_reply{code = 1, result = Result}} -> + {ok, Result}; + {async_call_reply, Ref, #async_call_reply{code = 0, message = Message}} -> + {error, Message} after Timeout -> - {error, timeout} + {error, <<"timeout">>} end. -spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}. @@ -339,8 +341,8 @@ handle_event(cast, {handle, {data, DataBin}}, ?STATE_ACTIVATED, State = #state{u {ok, Device} -> case iot_device:is_activated(Device) of true -> - EndpointPid = endpoint:get_pid(1), - endpoint:forward(EndpointPid, ServiceId, DeviceMap, Metric), + %EndpointPid = endpoint:get_pid(1), + %endpoint:forward(EndpointPid, ServiceId, DeviceMap, Metric), NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE), {keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}}; diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index b45aa0e..be2544c 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -17,7 +17,7 @@ -export([start_link/2]). %% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3, terminate/2]). +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, code_change/3, terminate/2]). -record(state, { transport, @@ -111,20 +111,20 @@ handle_info({tcp, Socket, < %% 建立到host的monitor erlang:monitor(process, HostPid), - AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = <<"ok">>}), + AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 0, message = <<"ok">>}), Transport:send(Socket, <>), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {denied, Reason} when is_binary(Reason) -> erlang:monitor(process, HostPid), - AuthReplyBin = message_pb:encode_msg(#auth_reply{code = -1, message = Reason}), + AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]), {noreply, State#state{uuid = UUID, host_pid = HostPid}}; {error, Reason} when is_binary(Reason) -> - AuthReplyBin = message_pb:encode_msg(#auth_reply{code = -2, message = Reason}), + AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 2, message = Reason}), Transport:send(Socket, <>), lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),