调整command reply

This commit is contained in:
anlicheng 2023-08-10 10:12:26 +08:00
parent 38d8167b11
commit 069e4cac02
3 changed files with 39 additions and 46 deletions

View File

@ -83,28 +83,25 @@ handle_request("POST", "/host/publish_command", _,
lager:debug("[http_host_handler] publish message is: ~p", [Reply1]),
BinReply = iolist_to_binary(jiffy:encode(Reply1, [force_utf8])),
case iot_host:publish_message(Pid, CommandType, {aes, BinReply}) of
case iot_host:publish_message(Pid, CommandType, {aes, BinReply}, Timeout * 1000) of
{error, timeout} ->
lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]),
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED),
{ok, 200, iot_util:json_error(401, <<"命令执行超时, 请重试"/utf8>>)};
{error, Reason} when is_binary(Reason) ->
task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED),
{ok, 200, iot_util:json_error(400, Reason)};
{ok, Ref} ->
receive
{response, Ref} ->
ok ->
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK),
{ok, 200, iot_util:json_data(<<"success">>)};
{ok, Response} ->
case jiffy:decode(Response, [return_maps]) of
#{<<"code">> := 1} ->
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK),
{ok, 200, iot_util:json_data(<<"success">>)};
{response, Ref, Response} ->
case jiffy:decode(Response, [return_maps]) of
#{<<"code">> := 1} ->
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_OK),
{ok, 200, iot_util:json_data(<<"success">>)};
#{<<"code">> := 0, <<"message">> := Message} when is_binary(Message) ->
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED),
{ok, 200, iot_util:json_error(401, <<"操作失败: "/utf8, Message/binary>>)}
end
after Timeout * 1000 ->
lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID]),
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED),
{ok, 200, iot_util:json_error(401, <<"命令执行超时, 请重试"/utf8>>)}
#{<<"code">> := 0, <<"message">> := Message} when is_binary(Message) ->
{ok, _} = task_logs_bo:change_status(TaskId, ?TASK_STATUS_FAILED),
{ok, 200, iot_util:json_error(401, <<"操作失败: "/utf8, Message/binary>>)}
end
end
end;
@ -121,17 +118,8 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> :=
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
ok = iot_host:activate(Pid, true),
case iot_host:publish_message(Pid, 8, BinReply) of
{ok, Ref} ->
receive
{response, Ref, Response} ->
lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, get response: ~p", [UUID, Response])
after Timeout * 1000 ->
lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID])
end;
{error, Reason} ->
lager:debug("[iot_host] host_id uuid: ~p, publish command get error: ~p", [UUID, Reason])
end,
CmdResult = iot_host:publish_message(Pid, 8, BinReply, Timeout * 1000),
lager:debug("[iot_host_handler] host_id uuid: ~p, activate result is: ~p", [UUID, CmdResult]),
{ok, 200, iot_util:json_data(<<"success">>)}
end;
@ -147,17 +135,9 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> :=
lager:debug("[host_handler] activate host_id: ~p, start", [UUID]),
BinReply = jiffy:encode(#{<<"auth">> => false}, [force_utf8]),
case iot_host:publish_message(Pid, 8, BinReply) of
{ok, Ref} ->
receive
{response, Ref, Response} ->
lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, get response: ~p", [UUID, Response])
after Timeout * 1000 ->
lager:debug("[iot_host_handler] host_id uuid: ~p, publish topic success, but get ack timeout", [UUID])
end;
{error, Reason} ->
lager:debug("[iot_host] host_id uuid: ~p, publish command get error: ~p", [UUID, Reason])
end,
CmdResult = iot_host:publish_message(Pid, 8, BinReply, Timeout * 1000),
lager:debug("[iot_host_handler] host_id uuid: ~p, inactivate result is: ~p", [UUID, CmdResult]),
{ok, 200, iot_util:json_data(<<"success">>)}
end;

View File

@ -14,7 +14,7 @@
%% API
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
-export([get_metric/1, publish_message/3, get_aes/1, rsa_encode/3]).
-export([get_metric/1, publish_message/4, get_aes/1, rsa_encode/3]).
-export([has_session/1, create_session/2, attach_channel/2]).
%% gen_statem callbacks
@ -93,10 +93,23 @@ create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) ->
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}).
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}) ->
{ok, Command :: binary()} | {error, Reason :: any()}.
publish_message(Pid, CommandType, Params) when is_pid(Pid), is_integer(CommandType) ->
gen_statem:call(Pid, {publish_message, self(), CommandType, Params}).
%%
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) ->
case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of
{ok, Ref} ->
receive
{ws_response, Ref} ->
ok;
{ws_response, Ref, Response} ->
{ok, Response}
after Timeout ->
{error, timeout}
end;
{error, Reason} ->
{error, Reason}
end.
%% @doc Creates a gen_statem process which calls Module:init/1 to
%% initialize. To ensure a synchronized start-up procedure, this

View File

@ -122,9 +122,9 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true when Body == <<>> ->
ReceiverPid ! {response, Ref};
ReceiverPid ! {ws_response, Ref};
true ->
ReceiverPid ! {response, Ref, Body};
ReceiverPid ! {ws_response, Ref, Body};
false ->
lager:warning("[ws_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is dead", [Body, PacketId])
end,