diff --git a/HTTP_API_README.md b/HTTP_API_README.md index 49a61e5..627a637 100644 --- a/HTTP_API_README.md +++ b/HTTP_API_README.md @@ -1070,15 +1070,23 @@ json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) **Method**:`POST` #### 请求参数 -| 参数名 | 类型 | 必填 | 说明 | -|--------|------|------|------| -| uuid | binary (string) | ✅ | 主机唯一标识符 | -| topic | binary (string) | ✅ | 事件主题 | -| content | binary (string) | ✅ | 发布内容 | +| 参数名 | 类型 | 必填 | 说明 | +|---------|-----------------|------|------------------| +| uuid | binary (string) | ✅ | 主机唯一标识符 | +| topic | binary (string) | ✅ | 事件主题 | +| qos | integer | ✅ | 消息的qos,qos = 0,1 | +| content | binary (string) | ✅ | 发布内容 | ### 请求示例 + ```json -{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "topic": "/device/1234/all", "content": "this is a topic payload", "timeout": 10} +{ + "uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "topic": "/device/1234/all", + "qos": 1, + "content": "this is a topic payload", + "timeout": 10 +} ``` #### 响应参数 diff --git a/apps/iot/include/message.hrl b/apps/iot/include/message.hrl index 5bc6e2b..6673fa3 100644 --- a/apps/iot/include/message.hrl +++ b/apps/iot/include/message.hrl @@ -55,6 +55,7 @@ -record(pub, { topic :: binary(), + qos = 0 :: integer(), content :: binary() }). diff --git a/apps/iot/src/http_handlers/host_handler.erl b/apps/iot/src/http_handlers/host_handler.erl index 3dde9c6..3013a1e 100644 --- a/apps/iot/src/http_handlers/host_handler.erl +++ b/apps/iot/src/http_handlers/host_handler.erl @@ -80,15 +80,16 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := end; %% 主机事件发布 -handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"content">> := Content}) - when is_binary(UUID), is_binary(Topic), is_binary(Content) -> +handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"qos">> := Qos0, <<"content">> := Content}) + when is_binary(UUID), is_binary(Topic), is_binary(Content), is_integer(Qos0) -> + Qos = case Qos0 > 0 of true -> 1; false -> 0 end, case iot_host_sup:ensured_host_started(UUID) of {error, Reason} -> lager:debug("[host_handler] pub host_id: ~p, topic: ~p, failed with reason: ~p", [UUID, Topic, Reason]), {ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, Pid} when is_pid(Pid) -> - ok = iot_host:pub(Pid, Topic, Content), + ok = iot_host:pub(Pid, Topic, Qos, Content), {ok, 200, iot_util:json_data(<<"success">>)} end; diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 3eea328..ebd6f14 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -24,7 +24,7 @@ -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). -export([get_metric/1, get_status/1, kill/1]). %% 通讯相关 --export([pub/3, attach_channel/2, command/3]). +-export([pub/4, attach_channel/2, command/3]). -export([deploy_container/3, start_container/2, stop_container/2, remove_container/2, kill_container/2, config_container/3, get_containers/1, await_reply/2]). -export([heartbeat/1]). @@ -145,9 +145,9 @@ await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> {error, <<"timeout">>} end. --spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}. -pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> - gen_statem:call(Pid, {pub, Topic, Content}). +-spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> ok | {error, Reason :: any()}. +pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) -> + gen_statem:call(Pid, {pub, Topic, Qos, Content}). -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}. command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) -> @@ -231,12 +231,12 @@ handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #sta end; %% 发送指令时, pub/sub -handle_event({call, From}, {pub, Topic, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> +handle_event({call, From}, {pub, Topic, Qos, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]), %% 通过websocket发送请求 - tcp_channel:pub(ChannelPid, Topic, Content), + tcp_channel:pub(ChannelPid, Topic, Qos, Content), {keep_state, State, [{reply, From, ok}]}; false -> diff --git a/apps/iot/src/message/message_codec.erl b/apps/iot/src/message/message_codec.erl index 3de252d..c4c6cc9 100644 --- a/apps/iot/src/message/message_codec.erl +++ b/apps/iot/src/message/message_codec.erl @@ -10,8 +10,10 @@ -author("anlicheng"). -include("message.hrl"). --define(I32, 1). --define(Bytes, 2). +-define(I8, 1). +-define(I16, 2). +-define(I32, 3). +-define(Bytes, 4). %% API -export([encode/2, decode/1]). @@ -43,9 +45,10 @@ encode0(#jsonrpc_reply{result = undefined, error = Error}) -> iolist_to_binary([ marshal(?Bytes, ResultBin) ]); -encode0(#pub{topic = Topic, content = Content}) -> +encode0(#pub{topic = Topic, qos = Qos, content = Content}) -> iolist_to_binary([ marshal(?Bytes, Topic), + marshal(?I8, Qos), marshal(?Bytes, Content) ]); encode0(#command{command_type = CommandType, command = Command}) -> @@ -90,8 +93,8 @@ decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) -> _ -> error end; -decode0(?MESSAGE_PUB, [Topic, Content]) -> - {ok, #pub{topic = Topic, content = Content}}; +decode0(?MESSAGE_PUB, [Topic, Qos, Content]) -> + {ok, #pub{topic = Topic, qos = Qos, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> {ok, #command{command_type = CommandType, command = Command}}; decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> @@ -111,6 +114,10 @@ decode0(_, _) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec marshal(Type :: integer(), Field :: any()) -> binary(). +marshal(?I8, Field) when is_integer(Field) -> + <>; +marshal(?I16, Field) when is_integer(Field) -> + <>; marshal(?I32, Field) when is_integer(Field) -> <>; marshal(?Bytes, Field) when is_binary(Field) -> @@ -122,6 +129,10 @@ unmarshal(Bin) when is_binary(Bin) -> unmarshal(Bin, []). unmarshal(<<>>, Acc) -> {ok, lists:reverse(Acc)}; +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) -> unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) -> diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 9e2c6ea..cf344a6 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -12,7 +12,7 @@ -behaviour(ranch_protocol). %% API --export([pub/3, jsonrpc_call/3, command/3]). +-export([pub/4, jsonrpc_call/3, command/3]). -export([start_link/3, stop/2]). %% gen_server callbacks @@ -33,9 +33,9 @@ }). %% 向通道中写入消息 --spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> no_return(). -pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> - gen_server:cast(Pid, {pub, Topic, Content}). +-spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> no_return(). +pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) -> + gen_server:cast(Pid, {pub, Topic, Qos, Content}). %% 向通道中写入消息 -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return(). @@ -74,8 +74,8 @@ handle_call(_Request, _From, State) -> {reply, ok, State}. %% 发送消息, 基于pub/sub机制 -handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> - EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), +handle_cast({pub, Topic, Qos, Content}, State = #state{transport = Transport, socket = Socket}) -> + EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, qos = Qos, content = Content}), Transport:send(Socket, <>), {noreply, State};