From 069e4cac027a9d7d1f7ac7bb557f6efbed3504eb Mon Sep 17 00:00:00 2001 From: anlicheng Date: Thu, 10 Aug 2023 10:12:26 +0800 Subject: [PATCH] =?UTF-8?q?=E8=B0=83=E6=95=B4command=20reply?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/http_handler/host_handler.erl | 58 +++++++--------------- apps/iot/src/iot_host.erl | 23 +++++++-- apps/iot/src/websocket/ws_channel.erl | 4 +- 3 files changed, 39 insertions(+), 46 deletions(-) diff --git a/apps/iot/src/http_handler/host_handler.erl b/apps/iot/src/http_handler/host_handler.erl index ebd02bc..dd38dea 100644 --- a/apps/iot/src/http_handler/host_handler.erl +++ b/apps/iot/src/http_handler/host_handler.erl @@ -83,28 +83,25 @@ handle_request("POST", "/host/publish_command", _, lager:debug("[http_host_handler] publish message is: ~p", [Reply1]), BinReply = iolist_to_binary(jiffy:encode(Reply1, [force_utf8])), - case iot_host:publish_message(Pid, CommandType, {aes, BinReply}) of + case iot_host:publish_message(Pid, CommandType, {aes, BinReply}, Timeout * 1000) of + {error, timeout} -> + lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]), + {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), + {ok, 200, iot_util:json_error(401, <<"命令执行超时, 请重试"/utf8>>)}; {error, Reason} when is_binary(Reason) -> task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), {ok, 200, iot_util:json_error(400, Reason)}; - {ok, Ref} -> - receive - {response, Ref} -> + ok -> + {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK), + {ok, 200, iot_util:json_data(<<"success">>)}; + {ok, Response} -> + case jiffy:decode(Response, [return_maps]) of + #{<<"code">> := 1} -> {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK), {ok, 200, iot_util:json_data(<<"success">>)}; - {response, Ref, Response} -> - case jiffy:decode(Response, [return_maps]) of - #{<<"code">> := 1} -> - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK), - {ok, 200, iot_util:json_data(<<"success">>)}; - #{<<"code">> := 0, <<"message">> := Message} when is_binary(Message) -> - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), - {ok, 200, iot_util:json_error(401, <<"操作失败: "/utf8, Message/binary>>)} - end - after Timeout * 1000 -> - lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]), - {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), - {ok, 200, iot_util:json_error(401, <<"命令执行超时, 请重试"/utf8>>)} + #{<<"code">> := 0, <<"message">> := Message} when is_binary(Message) -> + {ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED), + {ok, 200, iot_util:json_error(401, <<"操作失败: "/utf8, Message/binary>>)} end end end; @@ -121,17 +118,8 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]), ok = iot_host:activate(Pid, true), - case iot_host:publish_message(Pid, 8, BinReply) of - {ok, Ref} -> - receive - {response, Ref, Response} -> - lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, get response: ~p", [UUID, Response]) - after Timeout * 1000 -> - lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]) - end; - {error, Reason} -> - lager:debug("[iot_host] host_id uuid: ~p, publish command get error: ~p", [UUID, Reason]) - end, + CmdResult = iot_host:publish_message(Pid, 8, BinReply, Timeout * 1000), + lager:debug("[iot_host_handler] host_id uuid: ~p, activate result is: ~p", [UUID, CmdResult]), {ok, 200, iot_util:json_data(<<"success">>)} end; @@ -147,17 +135,9 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := lager:debug("[host_handler] activate host_id: ~p, start", [UUID]), BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]), - case iot_host:publish_message(Pid, 8, BinReply) of - {ok, Ref} -> - receive - {response, Ref, Response} -> - lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, get response: ~p", [UUID, Response]) - after Timeout * 1000 -> - lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]) - end; - {error, Reason} -> - lager:debug("[iot_host] host_id uuid: ~p, publish command get error: ~p", [UUID, Reason]) - end, + CmdResult = iot_host:publish_message(Pid, 8, BinReply, Timeout * 1000), + lager:debug("[iot_host_handler] host_id uuid: ~p, inactivate result is: ~p", [UUID, CmdResult]), + {ok, 200, iot_util:json_data(<<"success">>)} end; diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 8cc5ef6..c7964c1 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -14,7 +14,7 @@ %% API -export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]). --export([get_metric/1, publish_message/3, get_aes/1, rsa_encode/3]). +-export([get_metric/1, publish_message/4, get_aes/1, rsa_encode/3]). -export([has_session/1, create_session/2, attach_channel/2]). %% gen_statem callbacks @@ -93,10 +93,23 @@ create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) -> rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) -> gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}). --spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}) -> - {ok, Command :: binary()} | {error, Reason :: any()}. -publish_message(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType) -> - gen_statem:call(Pid, {publish_message, self(), CommandType, Params}). +%% 这里占用的的调用进程的时间 +-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) -> + ok | {ok, Response :: binary()} | {error, Reason :: any()}. +publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) -> + case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of + {ok, Ref} -> + receive + {ws_response, Ref} -> + ok; + {ws_response, Ref, Response} -> + {ok, Response} + after Timeout -> + {error, timeout} + end; + {error, Reason} -> + {error, Reason} + end. %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this diff --git a/apps/iot/src/websocket/ws_channel.erl b/apps/iot/src/websocket/ws_channel.erl index 351fe17..20ab89f 100644 --- a/apps/iot/src/websocket/ws_channel.erl +++ b/apps/iot/src/websocket/ws_channel.erl @@ -122,9 +122,9 @@ websocket_handle({binary, <> {{ReceiverPid, Ref}, NInflight} -> case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of true when Body == <<>> -> - ReceiverPid ! {response, Ref}; + ReceiverPid ! {ws_response, Ref}; true -> - ReceiverPid ! {response, Ref, Body}; + ReceiverPid ! {ws_response, Ref, Body}; false -> lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is dead", [Body, PacketId]) end,