diff --git a/HTTP_API_README.md b/HTTP_API_README.md index a987558..0b71f10 100644 --- a/HTTP_API_README.md +++ b/HTTP_API_README.md @@ -654,6 +654,208 @@ json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) } ``` +### 5. 向endpoint发送消息 + +**URL**:`/endpoint/publish_metric` +**Method**:`POST` + +###### **示例请求** + metric字段支持字符串,map或者数组,其他为非法数据; 如何要发送数字,可以采用数字的字符串格式, 比如: "5" + +```json +{ + "route_key": "/dhlr/warning", + "metric": { + "xyz": "test" + } +} +``` + +#### 示例响应 +```json +{ + "result": "ok" +} +``` + +#### 错误响应 +```json +{ + "error": { + "code": 404, + "message": "restart endpoint error" + } +} +``` + +### **POST /endpoint/test + 用于测试指定协议(HTTP / MQTT / Kafka)配置是否可用** + +#### **请求方式** + +``` +POST /endpoint/test +Content-Type: application/json +``` + +--- + +#### **通用参数** + +| 字段 | 类型 | 必填 | 说明 | +| -------- | ------ | -- | ------------------------------- | +| protocol | string | 是 | 选择测试类型,可为:`http`、`mqtt`、`kafka` | +| config | object | 是 | 各协议对应的配置对象 | + +--- + +#### **一、测试 HTTP Endpoint** + +###### **请求参数** + +| 字段 | 类型 | 必填 | 说明 | +| --------- | ------- | -- | ------------- | +| url | string | 是 | 要请求的 HTTP 地址 | +| pool_size | integer | 是 | 连接池大小(必须 > 0) | + +###### **示例请求** + +```json +{ + "protocol": "http", + "config": { + "url": "http://example.com/ping", + "pool_size": 5 + } +} +``` + +###### **返回结果** + +| 状态 | 内容 | +| ------- |--------------------------------------| +| success | `{"data":"ok"}` | +| failed | `{"error": -1, "msg": "url failed"}` | + +--- + +##### **二、测试 MQTT Endpoint** + +###### **请求参数** + +| 字段 | 类型 | 必填 | 说明 | +| --------- | ------ | -- | --------------- | +| host | string | 是 | MQTT 服务器地址 | +| port | int | 是 | MQTT 端口(>0) | +| client_id | string | 是 | 客户端 ID(仅用于配置校验) | +| username | string | 是 | 用户名 | +| password | string | 是 | 密码 | +| topic | string | 是 | topic 名称 | +| qos | int | 是 | 0/1/2 | + +###### **参数限制** + +* host ≠ 空 +* username/password/topic ≠ 空 +* qos ∈ {0,1,2} + +###### **示例请求** + +```json +{ + "protocol": "mqtt", + "config": { + "host": "test.mqttserver.com", + "port": 1883, + "client_id": "test-client-01", + "username": "user1", + "password": "pass123", + "topic": "device/event", + "qos": 1 + } +} +``` + +###### **返回结果** + +| 状态 | 内容 | +| ------- |------------------------------------------------------| +| success | `{"data": "ok"}` | +| failed | `{"error": -1, "msg": "connect mqtt server failed"}` | + +--- + +##### **三、测试 Kafka Endpoint** + +###### **请求参数(无认证)** + +| 字段 | 类型 | 必填 | 说明 | +| ----------------- | ------------ | -- | ---------------------- | +| bootstrap_servers | list(string) | 是 | 服务器列表,格式 `"host:port"` | +| topic | string | 是 | Topic 名称 | + +###### **请求参数(SASL 认证)** + +当需要 SASL 认证时,增加: + +| 字段 | 类型 | 必填 | 说明 | +| --------------------- | ------ | -- | --------------------------------- | +| sasl_config.username | string | 是 | 用户名 | +| sasl_config.password | string | 是 | 密码 | +| sasl_config.mechanism | string | 是 | `"plain"` `"sha_256"` `"sha_512"` | + +###### **合法 mechanism 值** + +* `plain` → SASL/PLAIN +* `sha_256` → SCRAM-SHA-256 +* `sha_512` → SCRAM-SHA-512 + +###### **bootstrap_servers 校验规则** + +* 必须是 list(binary()) +* 每项格式必须为 `"host:port"` +* port 必须为整数并 > 0 + +--- + +###### **示例 1:无认证 Kafka** + +```json +{ + "protocol": "kafka", + "config": { + "bootstrap_servers": ["broker1:9092", "broker2:9092"], + "topic": "device-log" + } +} +``` + +###### **示例 2:带 SASL 认证 Kafka** + +```json +{ + "protocol": "kafka", + "config": { + "sasl_config": { + "username": "kafka_user", + "password": "kafka_pass", + "mechanism": "sha_256" + }, + "bootstrap_servers": ["kafka.example.com:9094"], + "topic": "iot-event" + } +} +``` + +--- + +## **返回结果** + +| 状态 | 内容 | +| ------- | ---------------------------------------------------- | +| success | `{"data": "ok"}` | +| failed | `{"error": -1, "msg": "config kafka server failed"}` | + --- ### 5️⃣ 未知路径处理 @@ -902,15 +1104,23 @@ json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) **Method**:`POST` #### 请求参数 -| 参数名 | 类型 | 必填 | 说明 | -|--------|------|------|------| -| uuid | binary (string) | ✅ | 主机唯一标识符 | -| topic | binary (string) | ✅ | 事件主题 | -| content | binary (string) | ✅ | 发布内容 | +| 参数名 | 类型 | 必填 | 说明 | +|---------|-----------------|------|------------------| +| uuid | binary (string) | ✅ | 主机唯一标识符 | +| topic | binary (string) | ✅ | 事件主题 | +| qos | integer | ✅ | 消息的qos,qos = 0,1 | +| content | binary (string) | ✅ | 发布内容 | ### 请求示例 + ```json -{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "topic": "/device/1234/all", "content": "this is a topic payload", "timeout": 10} +{ + "uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "topic": "/device/1234/all", + "qos": 1, + "content": "this is a topic payload", + "timeout": 10 +} ``` #### 响应参数 diff --git a/apps/iot/include/endpoint.hrl b/apps/iot/include/endpoint.hrl index 49a33aa..f051fb9 100644 --- a/apps/iot/include/endpoint.hrl +++ b/apps/iot/include/endpoint.hrl @@ -10,6 +10,7 @@ -record(http_endpoint, { url = <<>> :: binary(), + token = <<>> :: binary(), pool_size = 10 :: integer() }). diff --git a/apps/iot/include/message.hrl b/apps/iot/include/message.hrl index 804148c..6673fa3 100644 --- a/apps/iot/include/message.hrl +++ b/apps/iot/include/message.hrl @@ -55,6 +55,7 @@ -record(pub, { topic :: binary(), + qos = 0 :: integer(), content :: binary() }). @@ -74,18 +75,10 @@ }). -record(data, { - service_id :: binary(), - device_uuid :: binary(), route_key :: binary(), metric :: binary() }). --record(event, { - service_id :: binary(), - event_type :: integer(), - params :: binary() -}). - -record(task_event_stream, { task_id :: integer(), type :: binary(), diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 89d1a1e..c1984e9 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -12,9 +12,9 @@ %% API -export([start_link/1]). --export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). +-export([get_name/1, get_pid/1, forward/2, reload/2, clean_up/1]). -export([get_alias_pid/1, is_support/1, get_protocol/1]). --export([endpoint_record/1]). +-export([endpoint_record/1, parse_config/2]). %%%=================================================================== %%% API @@ -47,9 +47,9 @@ get_alias_name(Name) when is_binary(Name) -> get_alias_pid(Name) when is_binary(Name) -> gproc:whereis_name({n, l, get_alias_name(Name)}). --spec forward(Pid :: pid(), ServiceId :: binary(), Metric :: binary()) -> no_return(). -forward(Pid, ServiceId, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Metric) -> - gen_server:cast(Pid, {forward, ServiceId, Metric}). +-spec forward(Pid :: pid(), Metric :: binary()) -> no_return(). +forward(Pid, Metric) when is_pid(Pid), is_binary(Metric) -> + gen_server:cast(Pid, {forward, Metric}). reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> gen_statem:cast(Pid, {reload, NEndpoint}). @@ -75,69 +75,214 @@ is_support(Protocol) when is_atom(Protocol) -> -spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson, <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> - try - Config = parse_config(Type, ConfigJson), - {ok, #endpoint { - id = Id, - matcher = Matcher, - title = Title, - config = Config, - status = Status, - updated_at = UpdatedAt, - created_at = CreatedAt - }} - catch throw:_ -> - error + case parse_config(Type, ConfigJson) of + {ok, Config} -> + {ok, #endpoint { + id = Id, + matcher = Matcher, + title = Title, + config = Config, + status = Status, + updated_at = UpdatedAt, + created_at = CreatedAt + }}; + {error, _Reason} -> + error end. +-spec parse_config(Protocol :: binary(), Config :: map()) -> {ok, #mqtt_endpoint{} | #kafka_endpoint{} | #http_endpoint{}} | {error, Errors :: [Error :: binary()]}. parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port0, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> Port = if is_binary(Port0) -> binary_to_integer(Port0); is_integer(Port0) -> Port0 end, - #mqtt_endpoint{ - host = Host, - port = Port, - client_id = ClientId, - username = Username, - password = Password, - topic = Topic, - qos = Qos - }; -parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> - #http_endpoint{ - url = Url, - pool_size = PoolSize - }; -parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - Mechanism = case Mechanism0 of - <<"sha_256">> -> - scram_sha_256; - <<"sha_512">> -> - scram_sha_512; - <<"plain">> -> - plain; - _ -> - plain - end, - #kafka_endpoint{ - sasl_config = {Mechanism, Username, Password}, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - #kafka_endpoint{ - sasl_config = undefined, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(_, _) -> - throw(invalid_config). - -parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> - lists:filtermap(fun(S) -> - case binary:split(S, <<":">>) of - [Host0, Port0] -> - {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; - _ -> - false + Errors = lists:filtermap(fun(Term) -> + case check_mqtt_argument(Term) of + ok -> + false; + {error, Error} -> + {true, Error} end - end, BootstrapServers). \ No newline at end of file + end, [{host, Host}, {port, Port}, {username, Username}, {password, Password}, {topic, Topic}, {qos, Qos}]), + case Errors =:= [] of + true -> + {ok, #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }}; + false -> + {error, Errors} + end; +parse_config(<<"http">>, C = #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> + Token = maps:get(<<"token">>, C, <<>>), + {ok, #http_endpoint{ + url = Url, + token = Token, + pool_size = PoolSize + }}; +parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + Errors = lists:filtermap(fun(Term) -> + case check_kafka_argument(Term) of + ok -> + false; + {error, Error} -> + {true, Error} + end + end, [{username, Username}, {password, Password}, {topic, Topic}, {mechanism, Mechanism0}, {bootstrap_servers, BootstrapServers}]), + case Errors =:= [] of + true -> + Mechanism = parse_kafka_mechanism(Mechanism0), + {ok, #kafka_endpoint{ + sasl_config = {Mechanism, Username, Password}, + bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers), + topic = Topic + }}; + false -> + {error, Errors} + end; +parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + Errors = lists:filtermap(fun(Term) -> + case check_kafka_argument(Term) of + ok -> + false; + {error, Error} -> + {true, Error} + end + end, [{topic, Topic}, {bootstrap_servers, BootstrapServers}]), + case Errors =:= [] of + true -> + {ok, #kafka_endpoint{ + sasl_config = undefined, + bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers), + topic = Topic + }}; + false -> + {error, Errors} + end; +parse_config(_, _) -> + {error, invalid_config}. + +-spec parse_kafka_bootstrap_servers(BootstrapServers :: [binary()]) -> Servers :: [{Host :: string(), Port :: integer()}]. +parse_kafka_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> + lists:map(fun(S) -> + [Host0, Port0] = binary:split(S, <<":">>), + {binary_to_list(Host0), binary_to_integer(Port0)} + end, BootstrapServers). + +-spec parse_kafka_mechanism(Mechanism0 :: binary()) -> atom(). +parse_kafka_mechanism(Mechanism0) when is_binary(Mechanism0) -> + case Mechanism0 of + <<"sha_256">> -> + scram_sha_256; + <<"sha_512">> -> + scram_sha_512; + <<"plain">> -> + plain; + _ -> + plain + end. + +-spec check_kafka_argument(tuple()) -> ok | {error, Reason :: binary()}. +check_kafka_argument({mechanism, Mechanism}) -> + case lists:member(Mechanism, [<<"sha_256">>, <<"sha_512">>, <<"plain">>]) of + true -> + ok; + false -> + {error, <<"mechanism invalid">>} + end; +check_kafka_argument({username, Username}) -> + case Username /= <<>> of + true -> + ok; + false -> + {error, <<"username is empty">>} + end; +check_kafka_argument({password, Password}) -> + case Password /= <<>> of + true -> + ok; + false -> + {error, <<"password is empty">>} + end; +check_kafka_argument({topic, Topic}) -> + case Topic /= <<>> of + true -> + ok; + false -> + {error, <<"topic is empty">>} + end; +check_kafka_argument({bootstrap_servers, BootstrapServers}) -> + case is_list(BootstrapServers) andalso length(BootstrapServers) > 0 of + true -> + InvalidServers = lists:filtermap(fun(S) -> + case binary:split(S, <<":">>) of + [Host0, Port0] -> + Host = binary_to_list(Host0), + Port = binary_to_integer(Port0), + case not string:is_empty(Host) andalso (is_integer(Port) andalso Port > 0) of + true -> + false; + false -> + {true, S} + end; + _ -> + {true, S} + end + end, BootstrapServers), + + case InvalidServers =:= [] of + true -> + ok; + false -> + {error, iolist_to_binary([<<"bootstrap_servers: ">>, lists:join(<<",">>, InvalidServers), <<" format is error">>])} + end; + false -> + {error, <<"bootstrap_servers is empty">>} + end. + +-spec check_mqtt_argument(tuple()) -> ok | {error, Reason :: binary()}. +check_mqtt_argument({host, Host}) -> + case Host /= <<>> of + true -> + ok; + false -> + {error, <<"host is empty">>} + end; +check_mqtt_argument({port, Port}) -> + case is_integer(Port) andalso Port > 0 of + true -> + ok; + false -> + {error, <<"port invalid">>} + end; +check_mqtt_argument({username, Username}) -> + case Username /= <<>> of + true -> + ok; + false -> + {error, <<"username is empty">>} + end; +check_mqtt_argument({password, Password}) -> + case Password /= <<>> of + true -> + ok; + false -> + {error, <<"password is empty">>} + end; +check_mqtt_argument({topic, Topic}) -> + case Topic /= <<>> of + true -> + ok; + false -> + {error, <<"topic is empty">>} + end; +check_mqtt_argument({qos, Qos}) -> + case is_integer(Qos) andalso lists:member(Qos, [0, 1, 2]) of + true -> + ok; + false -> + {error, <<"qos invalid">>} + end. \ No newline at end of file diff --git a/apps/iot/src/endpoint/endpoint_http.erl b/apps/iot/src/endpoint/endpoint_http.erl index 585af1e..752b274 100644 --- a/apps/iot/src/endpoint/endpoint_http.erl +++ b/apps/iot/src/endpoint/endpoint_http.erl @@ -69,8 +69,15 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), +handle_cast({forward, Metric}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{token = Token}}}) -> + Tuple = case is_binary(Token) andalso Token /= <<>> of + true -> + Sign = iot_util:sha256(erlang:iolist_to_binary([Token, Metric, Token])), + {Metric, Sign}; + false -> + {Metric, <<>>} + end, + NBuffer = endpoint_buffer:append(Tuple, Buffer), {noreply, State#state{buffer = NBuffer}}; handle_cast(cleanup, State = #state{buffer = Buffer}) -> @@ -83,11 +90,14 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> - Headers = [ - {<<"Content-Type">>, <<"application/octet-stream">>}, - {<<"Service-Id">>, ServiceId} - ], +handle_info({next_data, Id, {Metric, Sign}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) -> + BaseHeaders = [{<<"Content-Type">>, <<"application/json">>}], + ExtraHeaders = if + Sign =:= <<>> -> []; + true -> [{<<"X-Signature">>, Sign}] + end, + Headers = BaseHeaders ++ ExtraHeaders, + case hackney:request(post, Url, Headers, Metric) of {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), @@ -100,7 +110,8 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer {ok, RespBody} = hackney:body(ClientRef), hackney:close(ClientRef), lager:debug("[endpoint_http] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]), - {noreply, State}; + NBuffer = endpoint_buffer:ack(Id, Buffer), + {noreply, State#state{buffer = NBuffer}}; {error, Reason} -> lager:warning("[endpoint_http] url: ~p, get error: ~p", [Url, Reason]), {noreply, State} diff --git a/apps/iot/src/endpoint/endpoint_kafka.erl b/apps/iot/src/endpoint/endpoint_kafka.erl index 7c16a8e..5f451fe 100644 --- a/apps/iot/src/endpoint/endpoint_kafka.erl +++ b/apps/iot/src/endpoint/endpoint_kafka.erl @@ -82,8 +82,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), +handle_cast({forward, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append(Metric, Buffer), {noreply, State#state{buffer = NBuffer}}. %% @private @@ -116,6 +116,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS {noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}}; {error, Reason} -> lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]), + brod:stop_client(ClientId), retry_connect(), {noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}} end; @@ -129,7 +130,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {noreply, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid, +handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, client_pid = ClientPid, endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> ReceiverPid = self(), diff --git a/apps/iot/src/endpoint/endpoint_mqtt.erl b/apps/iot/src/endpoint/endpoint_mqtt.erl index 87eabec..727b1ab 100644 --- a/apps/iot/src/endpoint/endpoint_mqtt.erl +++ b/apps/iot/src/endpoint/endpoint_mqtt.erl @@ -19,7 +19,7 @@ -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). %% 消息重发间隔 --define(RETRY_INTERVAL, 5000). +-define(RETRY_INTERVAL, 15000). -define(DISCONNECTED, disconnected). -define(CONNECTED, connected). @@ -83,8 +83,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer), +handle_cast({forward, Metric}, State = #state{buffer = Buffer}) -> + NBuffer = endpoint_buffer:append(Metric, Buffer), {noreply, State#state{buffer = NBuffer}}. %% @private @@ -111,31 +111,32 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status {retry_interval, 5000} ], - {ok, ConnPid} = emqtt:start_link(Opts), - lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]), - case catch emqtt:connect(ConnPid, 5000) of - {ok, _} -> - lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]), - NBuffer = endpoint_buffer:trigger_n(Buffer), - {noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}}; - {error, Reason} -> - lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]), - erlang:start_timer(5000, self(), create_postman), - {noreply, State}; - Error -> - lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]), - erlang:start_timer(5000, self(), create_postman), - {noreply, State} + try + {ok, ConnPid} = emqtt:start_link(Opts), + lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]), + case emqtt:connect(ConnPid, 5000) of + {ok, _} -> + lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]), + NBuffer = endpoint_buffer:trigger_n(Buffer), + {noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}}; + {error, Reason} -> + lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, State} + end + catch _:Error-> + lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]), + erlang:start_timer(?RETRY_INTERVAL, self(), create_postman), + {noreply, State} end; %% 离线时,忽略数据发送逻辑 handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> {keep_state, State}; %% 发送数据到mqtt服务器 -handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, - endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) -> +handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight, + endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) -> - Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]), lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of ok -> diff --git a/apps/iot/src/endpoint/endpoint_subscription.erl b/apps/iot/src/endpoint/endpoint_subscription.erl index d9d0f05..4bcd702 100644 --- a/apps/iot/src/endpoint/endpoint_subscription.erl +++ b/apps/iot/src/endpoint/endpoint_subscription.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([subscribe/2, publish/3]). +-export([subscribe/2, publish/2, get_subscribers/0]). -export([match_components/2, is_valid_components/1, of_components/1]). %% gen_server callbacks @@ -45,9 +45,13 @@ subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}). --spec publish(RouteKey :: binary(), ServiceId :: binary(), Content :: binary()) -> no_return(). -publish(RouteKey, ServiceId, Content) when is_binary(RouteKey), is_binary(Content) -> - gen_server:cast(?SERVER, {publish, RouteKey, ServiceId, Content}). +-spec get_subscribers() -> {ok, Subscribers :: map()}. +get_subscribers() -> + gen_server:call(?SERVER, get_subscribers). + +-spec publish(RouteKey :: binary(), Content :: binary()) -> no_return(). +publish(RouteKey, Content) when is_binary(RouteKey), is_binary(Content) -> + gen_server:cast(?SERVER, {publish, RouteKey, Content}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> @@ -78,6 +82,8 @@ init([]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 同一个SubscriberPid只能订阅同一个topic一次 +handle_call(get_subscribers, _From, State = #state{subscribers = Subscribers}) -> + {reply, {ok, Subscribers}, State}; handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers = Subscribers}) -> Components = of_components(Topic), case is_valid_components(Components) of @@ -98,10 +104,10 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 发布消息 -handle_cast({publish, RouteKey, ServiceId, Metric}, State = #state{subscribers = Subscribers}) -> +handle_cast({publish, RouteKey, Metric}, State = #state{subscribers = Subscribers}) -> MatchedSubscribers = match_subscribers(Subscribers, RouteKey), lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> - endpoint:forward(SubscriberPid, ServiceId, Metric) + endpoint:forward(SubscriberPid, Metric) end, MatchedSubscribers), lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]), {noreply, State}. diff --git a/apps/iot/src/http_handlers/device_handler.erl b/apps/iot/src/http_handlers/device_handler.erl deleted file mode 100644 index 1707e23..0000000 --- a/apps/iot/src/http_handlers/device_handler.erl +++ /dev/null @@ -1,50 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2020, -%%% @doc -%%% -%%% @end -%%% Created : 26. 4月 2020 3:36 下午 -%%%------------------------------------------------------------------- --module(device_handler). --author("licheng5"). --include("iot.hrl"). - -%% API --export([handle_request/4]). - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% - -%% 重新加载对应的主机信息 -handle_request("POST", "/device/reload", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) -> - lager:debug("[device_handler] host_id: ~p, will reload device uuid: ~p", [HostId, DeviceUUID]), - AliasName = iot_host:get_alias_name(HostId), - case global:whereis_name(AliasName) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"reload device failed">>)}; - HostPid when is_pid(HostPid) -> - case iot_host:reload_device(HostPid, DeviceUUID) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[device_handler] reload device: ~p, get error: ~p", [DeviceUUID, Reason]), - {ok, 200, iot_util:json_error(404, <<"reload device failed">>)} - end - end; - -%% 删除对应的主机信息 -handle_request("POST", "/device/delete", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) -> - AliasName = iot_host:get_alias_name(HostId), - case global:whereis_name(AliasName) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"delete device failed">>)}; - HostPid when is_pid(HostPid) -> - ok = iot_host:delete_device(HostPid, DeviceUUID), - {ok, 200, iot_util:json_data(<<"success">>)} - end; - -handle_request(_, Path, _, _) -> - Path1 = list_to_binary(Path), - {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file diff --git a/apps/iot/src/http_handlers/endpoint_handler.erl b/apps/iot/src/http_handlers/endpoint_handler.erl index cee9c93..b390b62 100644 --- a/apps/iot/src/http_handlers/endpoint_handler.erl +++ b/apps/iot/src/http_handlers/endpoint_handler.erl @@ -78,11 +78,11 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]), {ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)} end; - Pid -> + Pid when is_pid(Pid) -> case endpoint_sup:delete_endpoint(Id) of ok -> case endpoint_sup:ensured_endpoint_started(Endpoint) of - {ok, Pid} when is_pid(Pid) -> + {ok, Pid0} when is_pid(Pid0) -> {ok, 200, iot_util:json_data(<<"success">>)}; {error, Reason} -> lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]), @@ -98,6 +98,113 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege end end; +%% http接口的测试 +handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"http">>, <<"config">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(PoolSize), PoolSize > 0 -> + Body = <<"">>, + ContentType = "application/json", + case httpc:request(post, {Url, [], ContentType, Body}, [], []) of + {ok, _} -> + {ok, 200, iot_util:json_data(<<"ok">>)}; + {error, Reason} -> + lager:debug("[endpint_handler] test http: ~p, error: ~p", [Url, Reason]), + {ok, 200, iot_util:json_error(-1, <<"url failed">>)} + end; + +%% 测试mqtt +handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"mqtt">>, <<"config">> := Config}) -> + case endpoint:parse_config(<<"mqtt">>, Config) of + {ok, #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password}} -> + %% 用户测试的client + ClientId = "mqtt_client_test:" ++ iot_util:rand_bytes(16), + Opts = [ + {owner, self()}, + {clientid, ClientId}, + {host, binary_to_list(Host)}, + {port, Port}, + {tcp_opts, []}, + {username, binary_to_list(Username)}, + {password, binary_to_list(Password)}, + {keepalive, 86400}, + {auto_ack, true}, + {connect_timeout, 5000}, + {proto_ver, v5}, + {retry_interval, 5000} + ], + + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + lager:debug("[endpint_handler] start connect, options: ~p", [Opts]), + case catch emqtt:connect(ConnPid, 5000) of + {ok, _} -> + lager:debug("[endpint_handler] connect success, pid: ~p", [ConnPid]), + emqtt:stop(ConnPid), + {ok, 200, iot_util:json_data(<<"ok">>)}; + {error, Reason} -> + lager:warning("[endpint_handler] connect get error: ~p", [Reason]), + emqtt:stop(ConnPid), + {ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)}; + Error -> + lager:warning("[endpint_handler] connect get error: ~p", [Error]), + emqtt:stop(ConnPid), + {ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)} + end; + Other -> + lager:warning("[endpint_handler] test connect mqtt with options: ~p, get error: ~p", [Opts, Other]), + {ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)} + end; + {error, Errors} -> + {ok, 200, iot_util:json_error(-1, Errors)} + end; + +%% 测试kafka +handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"kafka">>, <<"config">> := Config}) -> + case endpoint:parse_config(<<"kafka">>, Config) of + {ok, #kafka_endpoint{sasl_config = SaslConfig, bootstrap_servers = BootstrapServers, topic = Topic}} -> + BaseConfig = [ + {reconnect_cool_down_seconds, 5}, + {socket_options, [{keepalive, true}]} + ], + + ClientConfig = case SaslConfig of + {Mechanism, Username, Password} -> + [{sasl, {Mechanism, Username, Password}}|BaseConfig]; + undefined -> + BaseConfig + end, + ClientId = list_to_atom("brod_client_test:" ++ iot_util:rand_bytes(16)), + + case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of + {ok, _ClientPid} -> + case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of + ok -> + ok = brod:stop_client(ClientId), + {ok, 200, iot_util:json_data(<<"ok">>)}; + {error, Reason} -> + lager:debug("[endpint_handler] start_producer: ~p, get error: ~p", [ClientId, Reason]), + {ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)} + end; + Error -> + lager:debug("[endpint_handler] start_client: ~p, get error: ~p", [ClientId, Error]), + {ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)} + end; + {error, Errors} -> + {ok, 200, iot_util:json_error(-1, Errors)} + end; + +%% 发布消息 +handle_request("POST", "/endpoint/publish_metric", _, #{<<"route_key">> := RouteKey, <<"metric">> := Metric0}) when is_binary(RouteKey) -> + if + is_map(Metric0) orelse is_list(Metric0) -> + Metric = jiffy:encode(Metric0, [force_utf8]), + endpoint_subscription:publish(RouteKey, Metric), + {ok, 200, iot_util:json_data(<<"ok">>)}; + is_binary(Metric0) -> + endpoint_subscription:publish(RouteKey, Metric0), + {ok, 200, iot_util:json_data(<<"ok">>)}; + true -> + {ok, 200, iot_util:json_error(-1, <<"invalid metric">>)} + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file diff --git a/apps/iot/src/http_handlers/host_handler.erl b/apps/iot/src/http_handlers/host_handler.erl index 3dde9c6..5c67102 100644 --- a/apps/iot/src/http_handlers/host_handler.erl +++ b/apps/iot/src/http_handlers/host_handler.erl @@ -44,12 +44,11 @@ handle_request("GET", "/host/status", #{<<"uuid">> := UUID}, _) when is_binary(U %% 删除对应的主机信息 handle_request("POST", "/host/delete", _, #{<<"uuid">> := UUID}) when is_binary(UUID) -> - case iot_host_sup:delete_host(UUID) of - ok -> - lager:debug("[host_handler] will delete host uuid: ~p", [UUID]), + case iot_host:get_pid(UUID) of + Pid when is_pid(Pid) -> + ok = iot_host_sup:delete_host(UUID), {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[host_handler] delete host uuid: ~p, get error is: ~p", [UUID, Reason]), + undefined -> {ok, 200, iot_util:json_error(404, <<"error">>)} end; @@ -80,15 +79,16 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := end; %% 主机事件发布 -handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"content">> := Content}) - when is_binary(UUID), is_binary(Topic), is_binary(Content) -> +handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"qos">> := Qos0, <<"content">> := Content}) + when is_binary(UUID), is_binary(Topic), is_binary(Content), is_integer(Qos0) -> + Qos = case Qos0 > 0 of true -> 1; false -> 0 end, case iot_host_sup:ensured_host_started(UUID) of {error, Reason} -> lager:debug("[host_handler] pub host_id: ~p, topic: ~p, failed with reason: ~p", [UUID, Topic, Reason]), {ok, 200, iot_util:json_error(400, <<"host not found">>)}; {ok, Pid} when is_pid(Pid) -> - ok = iot_host:pub(Pid, Topic, Content), + ok = iot_host:pub(Pid, Topic, Qos, Content), {ok, 200, iot_util:json_data(<<"success">>)} end; diff --git a/apps/iot/src/iot.app.src b/apps/iot/src/iot.app.src index 0073eb7..09b7e59 100644 --- a/apps/iot/src/iot.app.src +++ b/apps/iot/src/iot.app.src @@ -26,6 +26,7 @@ erts, runtime_tools, observer, + inets, kernel, stdlib ]}, diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index 20969ab..a986f7a 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -136,7 +136,7 @@ do_post(Path, Params) when is_list(Path), is_map(Params) -> case hackney:request(post, Url, Headers, Body, [{pool, false}]) of {ok, 200, _, ClientRef} -> {ok, RespBody} = hackney:body(ClientRef), - lager:debug("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, RespBody]), + lager:debug("[iot_api] request url: ~p, send body: ~p, get response is: ~p", [Url, Body, RespBody]), hackney:close(ClientRef), case catch jiffy:decode(RespBody, [return_maps]) of #{<<"result">> := Result} -> diff --git a/apps/iot/src/iot_app.erl b/apps/iot/src/iot_app.erl index 3534e95..02c7f93 100644 --- a/apps/iot/src/iot_app.erl +++ b/apps/iot/src/iot_app.erl @@ -49,7 +49,7 @@ start_http_server() -> {'_', [ {"/host/[...]", http_protocol, [host_handler]}, {"/container/[...]", http_protocol, [container_handler]}, - {"/device/[...]", http_protocol, [device_handler]}, + {"/endpoint/[...]", http_protocol, [endpoint_handler]}, {"/event_stream", event_stream_handler, []} ]} ]), diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl deleted file mode 100644 index cf7cfcb..0000000 --- a/apps/iot/src/iot_device.erl +++ /dev/null @@ -1,73 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 14. 8月 2023 11:40 -%%%------------------------------------------------------------------- --module(iot_device). --author("aresei"). --include("iot.hrl"). - -%% API --export([new/1, change_status/2, reload/1]). - -%% 终端是否授权 --define(DEVICE_AUTH_DENIED, 0). --define(DEVICE_AUTH_AUTHED, 1). - -%% 状态 --define(STATE_DENIED, denied). --define(STATE_ACTIVATED, activated). - --record(device, { - device_uuid :: binary(), - status = ?DEVICE_OFFLINE -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - --spec new(DeviceInfo :: binary() | map()) -> error | {ok, Device :: #device{}}. -new(DeviceUUID) when is_binary(DeviceUUID) -> - case iot_api:get_device_by_uuid(DeviceUUID) of - {ok, #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} -> - {ok, #device{device_uuid = DeviceUUID, status = Status}}; - undefined -> - lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), - error - end; -new(#{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}) -> - {ok, #device{device_uuid = DeviceUUID, status = Status}}. - --spec change_status(Device :: #device{}, NewStatus :: integer()) -> NDevice :: #device{}. -change_status(Device = #device{status = Status}, NewStatus) when is_integer(NewStatus), Status =:= NewStatus -> - Device; -change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_ONLINE) -> - iot_api:change_device_status(DeviceUUID, ?DEVICE_ONLINE), - Device#device{status = ?DEVICE_ONLINE}; -change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) -> - {ok, #{<<"status">> := Status}} = iot_api:get_device_by_uuid(DeviceUUID), - case Status of - ?DEVICE_NOT_JOINED -> - lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]), - Device#device{status = ?DEVICE_NOT_JOINED}; - ?DEVICE_OFFLINE -> - lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]), - Device#device{status = ?DEVICE_OFFLINE}; - ?DEVICE_ONLINE -> - iot_api:change_device_status(DeviceUUID, ?DEVICE_OFFLINE), - Device#device{status = ?DEVICE_OFFLINE} - end. - --spec reload(Device :: #device{}) -> error | {ok, NDevice :: #device{}}. -reload(Device = #device{device_uuid = DeviceUUID}) -> - lager:debug("[iot_device] will reload: ~p", [DeviceUUID]), - case iot_api:get_device_by_uuid(DeviceUUID) of - {ok, #{<<"status">> := Status}} -> - {ok, Device#device{device_uuid = DeviceUUID, status = Status}}; - undefined -> - lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]), - error - end. \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index eb42f61..ebd6f14 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -22,12 +22,10 @@ %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, get_status/1]). +-export([get_metric/1, get_status/1, kill/1]). %% 通讯相关 --export([pub/3, attach_channel/2, command/3]). +-export([pub/4, attach_channel/2, command/3]). -export([deploy_container/3, start_container/2, stop_container/2, remove_container/2, kill_container/2, config_container/3, get_containers/1, await_reply/2]). -%% 设备管理 --export([reload_device/2, delete_device/2]). -export([heartbeat/1]). %% gen_statem callbacks @@ -42,10 +40,6 @@ heartbeat_counter = 0 :: integer(), %% websocket相关 channel_pid :: undefined | pid(), - - %% 设备的关系, #{device_uuid => Device} - device_map = #{}, - %% 主机的相关信息 metrics = #{} :: map() }). @@ -67,6 +61,15 @@ get_alias_name(HostId0) when is_integer(HostId0) -> HostId = integer_to_binary(HostId0), binary_to_atom(<<"iot_host_id:", HostId/binary>>). +-spec kill(UUID :: binary()) -> no_return(). +kill(UUID) when is_binary(UUID) -> + case whereis(get_name(UUID)) of + undefined -> + ok; + Pid -> + exit(Pid, kill) + end. + %% 处理消息 -spec handle(Pid :: pid(), Packet :: {atom(), any()}) -> no_return(). handle(Pid, Packet) when is_pid(Pid) -> @@ -142,23 +145,14 @@ await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) -> {error, <<"timeout">>} end. --spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}. -pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> - gen_statem:call(Pid, {pub, Topic, Content}). +-spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> ok | {error, Reason :: any()}. +pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) -> + gen_statem:call(Pid, {pub, Topic, Qos, Content}). -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}. command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) -> gen_statem:call(Pid, {command, CommandType, Command}). -%% 设备管理相关 --spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}. -reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_statem:call(Pid, {reload_device, DeviceUUID}). - --spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok. -delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) -> - gen_statem:call(Pid, {delete_device, DeviceUUID}). - -spec heartbeat(Pid :: pid()) -> no_return(). heartbeat(undefined) -> ok; @@ -193,19 +187,7 @@ init([UUID]) -> true -> ?STATE_ACTIVATED; false -> ?STATE_DENIED end, - - %% 加载设备信息 - {ok, DeviceInfos} = iot_api:get_host_devices(HostId), - Devices = lists:filtermap(fun(DeviceInfo = #{<<"device_uuid">> := DeviceUUID}) -> - case iot_device:new(DeviceInfo) of - error -> - false; - {ok, Device} -> - {true, {DeviceUUID, Device}} - end - end, DeviceInfos), - - {ok, StateName, #state{host_id = HostId, uuid = UUID, device_map = maps:from_list(Devices), has_session = false}}; + {ok, StateName, #state{host_id = HostId, uuid = UUID, has_session = false}}; undefined -> lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]), ignore @@ -249,12 +231,12 @@ handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #sta end; %% 发送指令时, pub/sub -handle_event({call, From}, {pub, Topic, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> +handle_event({call, From}, {pub, Topic, Qos, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) -> case HasSession andalso is_pid(ChannelPid) of true -> lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]), %% 通过websocket发送请求 - tcp_channel:pub(ChannelPid, Topic, Content), + tcp_channel:pub(ChannelPid, Topic, Qos, Content), {keep_state, State, [{reply, From, ok}]}; false -> @@ -318,52 +300,19 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]), {keep_state, State, [{reply, From, {error, <<"channel existed">>}}]}; -%% 重新加载设备信息 -handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) -> - case maps:find(DeviceUUID, DeviceMap) of - error -> - {keep_state, State, [{reply, From, {error, <<"device not found">>}}]}; - {ok, Device} -> - case iot_device:reload(Device) of - error -> - {keep_state, State#state{device_map = maps:remove(Device, DeviceMap)}, [{reply, From, {error, <<"reload device error">>}}]}; - {ok, NDevice} -> - {keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}, [{reply, From, ok}]} - end - end; - -%% 删除设备 -handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) -> - {keep_state, State#state{device_map = maps:remove(DeviceUUID, DeviceMap)}, [{reply, From, ok}]}; - -%% todo -handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, - State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) -> - - lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]), - case maps:find(DeviceUUID, DeviceMap) of - error -> - lager:warning("[iot_host] host uuid: ~p, device uuid: ~p not found, metric: ~p", [UUID, DeviceUUID, Metric]), - {keep_state, State}; - {ok, Device} -> - RouteKey = get_route_key(RouteKey0), - endpoint_subscription:publish(RouteKey, ServiceId, Metric), - NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE), - {keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}} - end; +%% 数据分发 +handle_event(cast, {handle, {data, #data{route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED, + State = #state{uuid = UUID, has_session = true}) -> + lager:debug("[iot_host] metric_data host: ~p, route_key: ~p, metric: ~p", [UUID, RouteKey0, Metric]), + RouteKey = get_route_key(RouteKey0), + endpoint_subscription:publish(RouteKey, Metric), + {keep_state, State}; %% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), {keep_state, State#state{metrics = Metrics}}; -handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> - lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]), - %DevicePid = iot_device:get_pid(DeviceUUID), - %iot_device:change_status(DevicePid, Status), - - {keep_state, State}; - %% 心跳机制 handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) -> {keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}}; diff --git a/apps/iot/src/iot_host_sup.erl b/apps/iot/src/iot_host_sup.erl index 26d875e..4762613 100644 --- a/apps/iot/src/iot_host_sup.erl +++ b/apps/iot/src/iot_host_sup.erl @@ -23,6 +23,8 @@ init([]) -> ensured_host_started(UUID) when is_binary(UUID) -> case iot_host:get_pid(UUID) of undefined -> + %% 尝试删除下host对应的信息 + delete_host(UUID), case supervisor:start_child(?MODULE, child_spec(UUID)) of {ok, Pid} when is_pid(Pid) -> {ok, Pid}; @@ -35,10 +37,17 @@ ensured_host_started(UUID) when is_binary(UUID) -> {ok, Pid} end. -delete_host(UUID) when is_binary(UUID) -> +delete_host(UUID) -> Id = iot_host:get_name(UUID), ok = supervisor:terminate_child(?MODULE, Id), - supervisor:delete_child(?MODULE, Id). + case supervisor:delete_child(?MODULE, Id) of + {error, running} -> + %% ensure killed then delete again + iot_host:kill(UUID), + supervisor:delete_child(?MODULE, Id); + _ -> + ok + end. child_spec(UUID) -> Id = iot_host:get_name(UUID), diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index 5267216..2c97506 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -14,6 +14,7 @@ -export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]). -export([json_data/1, json_error/2, is_json/1]). -export([queue_limited_in/3, assert_call/2, assert/2]). +-export([sha256/1]). -spec is_json(Json :: term()) -> boolean(). is_json(Json) when is_binary(Json) -> @@ -80,7 +81,7 @@ json_data(Data) -> <<"result">> => Data }, [force_utf8]). -json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) -> +json_error(ErrCode, ErrMessage) when is_integer(ErrCode) -> jiffy:encode(#{ <<"error">> => #{ <<"code">> => ErrCode, @@ -112,6 +113,14 @@ assert_call(true, Fun) -> assert_call(false, _) -> ok. +-spec sha256(Str :: string() | binary()) -> binary(). +sha256(Str) when is_list(Str) -> + sha256(unicode:characters_to_binary(Str)); +sha256(Bin) when is_binary(Bin) -> + HashBin = crypto:hash(sha256, Bin), + HexStr = lists:flatten([io_lib:format("~2.16.0B", [B]) || B <- binary:bin_to_list(HashBin)]), + list_to_binary(string:lowercase(HexStr)). + -spec md5(Str :: binary()) -> binary(). md5(Str) when is_binary(Str) -> list_to_binary(lists:flatten([hex(X) || <> <= erlang:md5(Str)])). diff --git a/apps/iot/src/message/message_codec.erl b/apps/iot/src/message/message_codec.erl index 085d63e..769c11e 100644 --- a/apps/iot/src/message/message_codec.erl +++ b/apps/iot/src/message/message_codec.erl @@ -10,8 +10,10 @@ -author("anlicheng"). -include("message.hrl"). --define(I32, 1). --define(Bytes, 2). +-define(I8, 1). +-define(I16, 2). +-define(I32, 3). +-define(Bytes, 4). %% API -export([encode/2, decode/1]). @@ -35,13 +37,18 @@ encode0(#auth_reply{code = Code, payload = Payload}) -> ]); encode0(#jsonrpc_reply{result = Result, error = undefined}) -> ResultBin = erlang:term_to_binary(#{<<"result">> => Result}), - iolist_to_binary([marshal(?Bytes, ResultBin)]); + iolist_to_binary([ + marshal(?Bytes, ResultBin) + ]); encode0(#jsonrpc_reply{result = undefined, error = Error}) -> ResultBin = erlang:term_to_binary(#{<<"error">> => Error}), - iolist_to_binary([marshal(?Bytes, ResultBin)]); -encode0(#pub{topic = Topic, content = Content}) -> + iolist_to_binary([ + marshal(?Bytes, ResultBin) + ]); +encode0(#pub{topic = Topic, qos = Qos, content = Content}) -> iolist_to_binary([ marshal(?Bytes, Topic), + marshal(?I8, Qos), marshal(?Bytes, Content) ]); encode0(#command{command_type = CommandType, command = Command}) -> @@ -52,20 +59,14 @@ encode0(#command{command_type = CommandType, command = Command}) -> encode0(#jsonrpc_request{method = Method, params = Params}) -> ReqBody = erlang:term_to_binary(#{<<"method">> => Method, <<"params">> => Params}), - marshal(?Bytes, ReqBody); -encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) -> iolist_to_binary([ - marshal(?Bytes, ServiceId), - marshal(?Bytes, DeviceUUID), + marshal(?Bytes, ReqBody) + ]); +encode0(#data{route_key = RouteKey, metric = Metric}) -> + iolist_to_binary([ marshal(?Bytes, RouteKey), marshal(?Bytes, Metric) ]); -encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) -> - iolist_to_binary([ - marshal(?Bytes, ServiceId), - marshal(?I32, EventType), - marshal(?Bytes, Params) - ]); encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) -> iolist_to_binary([ marshal(?I32, TaskId), @@ -92,8 +93,8 @@ decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) -> _ -> error end; -decode0(?MESSAGE_PUB, [Topic, Content]) -> - {ok, #pub{topic = Topic, content = Content}}; +decode0(?MESSAGE_PUB, [Topic, Qos, Content]) -> + {ok, #pub{topic = Topic, qos = Qos, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> {ok, #command{command_type = CommandType, command = Command}}; decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> @@ -101,10 +102,8 @@ decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) -> #{<<"method">> := Method, <<"params">> := Params} = erlang:binary_to_term(ReqBody), {ok, #jsonrpc_request{method = Method, params = Params}}; -decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) -> - {ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}}; -decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) -> - {ok, #event{service_id = ServiceId, event_type = EventType, params = Params}}; +decode0(?MESSAGE_DATA, [RouteKey, Metric]) -> + {ok, #data{route_key = RouteKey, metric = Metric}}; decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) -> {ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}}; decode0(_, _) -> @@ -114,7 +113,11 @@ decode0(_, _) -> %%% helper methods %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% --spec marshal(Type :: integer(), Field :: any()) -> binary(). +-spec marshal(Type :: ?I8 | ?I16 | ?I32 | ?Bytes, Field :: integer() | binary()) -> binary(). +marshal(?I8, Field) when is_integer(Field) -> + <>; +marshal(?I16, Field) when is_integer(Field) -> + <>; marshal(?I32, Field) when is_integer(Field) -> <>; marshal(?Bytes, Field) when is_binary(Field) -> @@ -126,6 +129,10 @@ unmarshal(Bin) when is_binary(Bin) -> unmarshal(Bin, []). unmarshal(<<>>, Acc) -> {ok, lists:reverse(Acc)}; +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) -> unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) -> diff --git a/apps/iot/src/mocker/endpoint_kafka_test.erl b/apps/iot/src/mocker/endpoint_kafka_test.erl new file mode 100644 index 0000000..7196ed8 --- /dev/null +++ b/apps/iot/src/mocker/endpoint_kafka_test.erl @@ -0,0 +1,60 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 14. 11月 2025 23:29 +%%%------------------------------------------------------------------- +-module(endpoint_kafka_test). +-author("anlicheng"). +-include("endpoint.hrl"). + +%% API +-export([start_test/0, test_consumer/0]). + +start_test() -> + Name = endpoint:get_name(100), + {ok, Pid} = endpoint_kafka:start_link(Name, #endpoint{ + id = 100, + %% 全局唯一,在路由规则中通过名称来指定 + matcher = <<"/dhlr/device/*/*">>, + %% 标题描述 + title = <<"test_kafka_title">>, + %% 配置项, 格式: #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}} + config = #kafka_endpoint{ + sasl_config = {plain, <<"test">>, <<"test123">>}, + bootstrap_servers = [{"118.178.229.213", 9092}], + topic = <<"dhlr_data">> + } + }), + + Json = jiffy:encode(#{ + <<"name">> => <<"anlicheng">>, + <<"age">> => 30 + }, [force_utf8]), + + endpoint:forward(Pid, Json), + ok. + +test_consumer() -> + KafkaBootstrapEndpoints = [{"118.178.229.213", 9092}], + Topic = <<"dhlr_data">>, + + ClientConfig = [ + {sasl, {plain, <<"test">>, <<"test123">>}}, + {reconnect_cool_down_seconds, 5}, + {socket_options, [{keepalive, true}]} + ], + + ok = brod:start_client(KafkaBootstrapEndpoints, client1, ClientConfig), + SubscriberCallbackFun = fun(_Partition, Msg, ShellPid = CallbackState) -> + lager:debug("call here msg: ~p", [Msg]), + ShellPid ! Msg, {ok, ack, CallbackState} + end, + Res = brod_topic_subscriber:start_link(client1, Topic, all, + _ConsumerConfig=[{begin_offset, 0}], + _CommittedOffsets=[], message, SubscriberCallbackFun, + _CallbackState=self()), + + lager:debug("start subscriber res: ~p", [Res]). \ No newline at end of file diff --git a/apps/iot/src/mocker/endpoint_mqtt_subscriber.erl b/apps/iot/src/mocker/endpoint_mqtt_subscriber.erl new file mode 100644 index 0000000..2bce1ab --- /dev/null +++ b/apps/iot/src/mocker/endpoint_mqtt_subscriber.erl @@ -0,0 +1,163 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 17. 11月 2025 16:48 +%%%------------------------------------------------------------------- +-module(endpoint_mqtt_subscriber). +-author("anlicheng"). + +-author("aresei"). +-include("iot.hrl"). + +-behaviour(gen_server). + +%% API +-export([start_link/0]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +%% 需要订阅的主题信息 +-define(Topics,[ + {<<"/dhlr/data">>, 0} +]). + +-record(state, { + conn_pid :: pid() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_link() -> + {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). +start_link() -> + gen_server:start_link({local, ?MODULE}, ?MODULE, [], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([]) -> + %% 建立到emqx服务器的连接 + ClientId = <<"mqtt-client-test-host-subscriber">>, + Opts = [ + {clientid, ClientId}, + {host, "118.178.229.213"}, + {port, 1883}, + {owner, self()}, + {tcp_opts, []}, + {username, "admin"}, + {password, "admin"}, + {keepalive, 86400}, + {auto_ack, true}, + {proto_ver, v5}, + {retry_interval, 5} + ], + + lager:debug("[opts] is: ~p", [Opts]), + case emqtt:start_link(Opts) of + {ok, ConnPid} -> + %% 监听和host相关的全部事件 + lager:debug("[iot_mqtt_subscriber] start conntecting, pid: ~p", [ConnPid]), + {ok, _} = emqtt:connect(ConnPid), + lager:debug("[iot_mqtt_subscriber] connect success, pid: ~p", [ConnPid]), + SubscribeResult = emqtt:subscribe(ConnPid, ?Topics), + + lager:debug("[iot_mqtt_subscriber] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]), + + {ok, #state{conn_pid = ConnPid}}; + ignore -> + lager:debug("[iot_mqtt_subscriber] connect emqx get ignore"), + {stop, ignore}; + {error, Reason} -> + lager:debug("[iot_mqtt_subscriber] connect emqx get error: ~p", [Reason]), + {stop, Reason} + end. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) -> + {reply, ok, State}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast(_Request, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({disconnect, ReasonCode, Properties}, State = #state{}) -> + lager:debug("[iot_mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]), + {stop, disconnected, State}; +%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 +handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) -> + lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]), + %% 将消息分发到对应的host进程去处理 + {noreply, State}; +handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> + lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]), + {noreply, State}; + +handle_info(Info, State = #state{}) -> + lager:debug("[iot_mqtt_subscriber] get info: ~p", [Info]), + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) -> + %% 取消topic的订阅 + TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics), + {ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames), + + ok = emqtt:disconnect(ConnPid), + lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), + ok; +terminate(Reason, _State) -> + lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]), + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== \ No newline at end of file diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index 9ceb712..cf344a6 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -12,7 +12,7 @@ -behaviour(ranch_protocol). %% API --export([pub/3, jsonrpc_call/3, command/3]). +-export([pub/4, jsonrpc_call/3, command/3]). -export([start_link/3, stop/2]). %% gen_server callbacks @@ -33,9 +33,9 @@ }). %% 向通道中写入消息 --spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> no_return(). -pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) -> - gen_server:cast(Pid, {pub, Topic, Content}). +-spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> no_return(). +pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) -> + gen_server:cast(Pid, {pub, Topic, Qos, Content}). %% 向通道中写入消息 -spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return(). @@ -74,8 +74,8 @@ handle_call(_Request, _From, State) -> {reply, ok, State}. %% 发送消息, 基于pub/sub机制 -handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) -> - EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}), +handle_cast({pub, Topic, Qos, Content}, State = #state{transport = Transport, socket = Socket}) -> + EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, qos = Qos, content = Content}), Transport:send(Socket, <>), {noreply, State}; @@ -137,8 +137,6 @@ handle_info({tcp, Socket, <>}, State = #state{sock case CastMessage of #data{} = Data -> iot_host:handle(HostPid, {data, Data}); - #event{} = Event -> - iot_host:handle(HostPid, {event, Event}); #task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} -> iot_event_stream_observer:stream_close(TaskId, Reason); #task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> @@ -153,7 +151,7 @@ handle_info({tcp, Socket, <>}, State = #state{sock % {noreply, State}; %% 主机端的消息响应 -handle_info({tcp, Socket, <>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 -> +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) when PacketId > 0 -> {ok, RpcReply} = message_codec:decode(ResponseBin), case maps:take(PacketId, Inflight) of error -> diff --git a/docs/endpoint.md b/docs/endpoint.md index 68116bb..403da9f 100644 --- a/docs/endpoint.md +++ b/docs/endpoint.md @@ -23,10 +23,13 @@ CREATE TABLE `endpoint` ( ### config_json中的数据配置 #### http方式: type=http + token是可选的,如果接收服务器端需要提供校验;可以添加token字段,校验算法采用的: $Sign = sha256(token + "post_body" + token), 有token的情况下 + 通过http推送的数据会增加一个header, 格式为 X-Signature: $Sign ```json { "url": "http(s)://www.test.com/api", + "token": "ngyehngcohezqyabnlhollsfcnyqobmwfcfyvbzdbhuubucojcpeefpczruiccfw", "pool_size": 10 }