增加消息的qos

This commit is contained in:
anlicheng 2025-11-27 16:25:33 +08:00
parent 5d143be3ef
commit e1256552b9
6 changed files with 47 additions and 26 deletions

View File

@ -1070,15 +1070,23 @@ json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage)
**Method**`POST` **Method**`POST`
#### 请求参数 #### 请求参数
| 参数名 | 类型 | 必填 | 说明 | | 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------| |---------|-----------------|------|------------------|
| uuid | binary (string) | ✅ | 主机唯一标识符 | | uuid | binary (string) | ✅ | 主机唯一标识符 |
| topic | binary (string) | ✅ | 事件主题 | | topic | binary (string) | ✅ | 事件主题 |
| content | binary (string) | ✅ | 发布内容 | | qos | integer | ✅ | 消息的qosqos = 0,1 |
| content | binary (string) | ✅ | 发布内容 |
### 请求示例 ### 请求示例
```json ```json
{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "topic": "/device/1234/all", "content": "this is a topic payload", "timeout": 10} {
"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm",
"topic": "/device/1234/all",
"qos": 1,
"content": "this is a topic payload",
"timeout": 10
}
``` ```
#### 响应参数 #### 响应参数

View File

@ -55,6 +55,7 @@
-record(pub, { -record(pub, {
topic :: binary(), topic :: binary(),
qos = 0 :: integer(),
content :: binary() content :: binary()
}). }).

View File

@ -80,15 +80,16 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> :=
end; end;
%% %%
handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"content">> := Content}) handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"qos">> := Qos0, <<"content">> := Content})
when is_binary(UUID), is_binary(Topic), is_binary(Content) -> when is_binary(UUID), is_binary(Topic), is_binary(Content), is_integer(Qos0) ->
Qos = case Qos0 > 0 of true -> 1; false -> 0 end,
case iot_host_sup:ensured_host_started(UUID) of case iot_host_sup:ensured_host_started(UUID) of
{error, Reason} -> {error, Reason} ->
lager:debug("[host_handler] pub host_id: ~p, topic: ~p, failed with reason: ~p", [UUID, Topic, Reason]), lager:debug("[host_handler] pub host_id: ~p, topic: ~p, failed with reason: ~p", [UUID, Topic, Reason]),
{ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, 200, iot_util:json_error(400, <<"host not found">>)};
{ok, Pid} when is_pid(Pid) -> {ok, Pid} when is_pid(Pid) ->
ok = iot_host:pub(Pid, Topic, Content), ok = iot_host:pub(Pid, Topic, Qos, Content),
{ok, 200, iot_util:json_data(<<"success">>)} {ok, 200, iot_util:json_data(<<"success">>)}
end; end;

View File

@ -24,7 +24,7 @@
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
-export([get_metric/1, get_status/1, kill/1]). -export([get_metric/1, get_status/1, kill/1]).
%% %%
-export([pub/3, attach_channel/2, command/3]). -export([pub/4, attach_channel/2, command/3]).
-export([deploy_container/3, start_container/2, stop_container/2, remove_container/2, kill_container/2, config_container/3, get_containers/1, await_reply/2]). -export([deploy_container/3, start_container/2, stop_container/2, remove_container/2, kill_container/2, config_container/3, get_containers/1, await_reply/2]).
-export([heartbeat/1]). -export([heartbeat/1]).
@ -145,9 +145,9 @@ await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
{error, <<"timeout">>} {error, <<"timeout">>}
end. end.
-spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}. -spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> ok | {error, Reason :: any()}.
pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) ->
gen_statem:call(Pid, {pub, Topic, Content}). gen_statem:call(Pid, {pub, Topic, Qos, Content}).
-spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}. -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}.
command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) -> command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) ->
@ -231,12 +231,12 @@ handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #sta
end; end;
%% , pub/sub %% , pub/sub
handle_event({call, From}, {pub, Topic, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> handle_event({call, From}, {pub, Topic, Qos, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
case HasSession andalso is_pid(ChannelPid) of case HasSession andalso is_pid(ChannelPid) of
true -> true ->
lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]), lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]),
%% websocket发送请求 %% websocket发送请求
tcp_channel:pub(ChannelPid, Topic, Content), tcp_channel:pub(ChannelPid, Topic, Qos, Content),
{keep_state, State, [{reply, From, ok}]}; {keep_state, State, [{reply, From, ok}]};
false -> false ->

View File

@ -10,8 +10,10 @@
-author("anlicheng"). -author("anlicheng").
-include("message.hrl"). -include("message.hrl").
-define(I32, 1). -define(I8, 1).
-define(Bytes, 2). -define(I16, 2).
-define(I32, 3).
-define(Bytes, 4).
%% API %% API
-export([encode/2, decode/1]). -export([encode/2, decode/1]).
@ -43,9 +45,10 @@ encode0(#jsonrpc_reply{result = undefined, error = Error}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?Bytes, ResultBin) marshal(?Bytes, ResultBin)
]); ]);
encode0(#pub{topic = Topic, content = Content}) -> encode0(#pub{topic = Topic, qos = Qos, content = Content}) ->
iolist_to_binary([ iolist_to_binary([
marshal(?Bytes, Topic), marshal(?Bytes, Topic),
marshal(?I8, Qos),
marshal(?Bytes, Content) marshal(?Bytes, Content)
]); ]);
encode0(#command{command_type = CommandType, command = Command}) -> encode0(#command{command_type = CommandType, command = Command}) ->
@ -90,8 +93,8 @@ decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
_ -> _ ->
error error
end; end;
decode0(?MESSAGE_PUB, [Topic, Content]) -> decode0(?MESSAGE_PUB, [Topic, Qos, Content]) ->
{ok, #pub{topic = Topic, content = Content}}; {ok, #pub{topic = Topic, qos = Qos, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
{ok, #command{command_type = CommandType, command = Command}}; {ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
@ -111,6 +114,10 @@ decode0(_, _) ->
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec marshal(Type :: integer(), Field :: any()) -> binary(). -spec marshal(Type :: integer(), Field :: any()) -> binary().
marshal(?I8, Field) when is_integer(Field) ->
<<?I8, Field:8>>;
marshal(?I16, Field) when is_integer(Field) ->
<<?I16, Field:16>>;
marshal(?I32, Field) when is_integer(Field) -> marshal(?I32, Field) when is_integer(Field) ->
<<?I32, Field:32>>; <<?I32, Field:32>>;
marshal(?Bytes, Field) when is_binary(Field) -> marshal(?Bytes, Field) when is_binary(Field) ->
@ -122,6 +129,10 @@ unmarshal(Bin) when is_binary(Bin) ->
unmarshal(Bin, []). unmarshal(Bin, []).
unmarshal(<<>>, Acc) -> unmarshal(<<>>, Acc) ->
{ok, lists:reverse(Acc)}; {ok, lists:reverse(Acc)};
unmarshal(<<?I8, F:8, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(<<?I16, F:16, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(<<?I32, F:32, Rest/binary>>, Acc) -> unmarshal(<<?I32, F:32, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]); unmarshal(Rest, [F|Acc]);
unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) -> unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) ->

View File

@ -12,7 +12,7 @@
-behaviour(ranch_protocol). -behaviour(ranch_protocol).
%% API %% API
-export([pub/3, jsonrpc_call/3, command/3]). -export([pub/4, jsonrpc_call/3, command/3]).
-export([start_link/3, stop/2]). -export([start_link/3, stop/2]).
%% gen_server callbacks %% gen_server callbacks
@ -33,9 +33,9 @@
}). }).
%% %%
-spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> no_return(). -spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> no_return().
pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) ->
gen_server:cast(Pid, {pub, Topic, Content}). gen_server:cast(Pid, {pub, Topic, Qos, Content}).
%% %%
-spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return(). -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return().
@ -74,8 +74,8 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}. {reply, ok, State}.
%% , pub/sub机制 %% , pub/sub机制
handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> handle_cast({pub, Topic, Qos, Content}, State = #state{transport = Transport, socket = Socket}) ->
EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, qos = Qos, content = Content}),
Transport:send(Socket, <<?PACKET_CAST, EncPub/binary>>), Transport:send(Socket, <<?PACKET_CAST, EncPub/binary>>),
{noreply, State}; {noreply, State};