diff --git a/apps/iot/src/endpoint/endpoint.erl b/apps/iot/src/endpoint/endpoint.erl index 250f013..b946010 100644 --- a/apps/iot/src/endpoint/endpoint.erl +++ b/apps/iot/src/endpoint/endpoint.erl @@ -14,6 +14,7 @@ -export([start_link/1]). -export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]). -export([get_alias_pid/1]). +-export([endpoint_record/1]). %%%=================================================================== %%% API @@ -58,4 +59,74 @@ reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) -> -spec clean_up(Pid :: pid()) -> ok. clean_up(Pid) when is_pid(Pid) -> - gen_server:call(Pid, clean_up, 5000). \ No newline at end of file + gen_server:call(Pid, clean_up, 5000). + + +-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. +endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, + <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> + try + Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])), + {ok, #endpoint { + id = Id, + name = Name, + title = Title, + config = Config, + status = Status, + updated_at = UpdatedAt, + created_at = CreatedAt + }} + catch throw:_ -> + error + end. + +parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> + #mqtt_endpoint{ + host = Host, + port = Port, + client_id = ClientId, + username = Username, + password = Password, + topic = Topic, + qos = Qos + }; +parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> + #http_endpoint{ + url = Url, + pool_size = PoolSize + }; +parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + Mechanism = case Mechanism0 of + <<"sha_256">> -> + scram_sha_256; + <<"sha_512">> -> + scram_sha_512; + <<"plain">> -> + plain; + _ -> + plain + end, + + #kafka_endpoint{ + sasl_config = {Mechanism, Username, Password}, + bootstrap_servers = parse_bootstrap_servers(BootstrapServers), + topic = Topic + }; +parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> + #kafka_endpoint{ + sasl_config = undefined, + bootstrap_servers = parse_bootstrap_servers(BootstrapServers), + topic = Topic + }; +parse_config(_, _) -> + throw(invalid_config). + +parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> + lists:filtermap(fun(S) -> + case binary:split(S, <<":">>) of + [Host0, Port0] -> + {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; + _ -> + false + end + end, BootstrapServers). \ No newline at end of file diff --git a/apps/iot/src/endpoint/endpoint_sup.erl b/apps/iot/src/endpoint/endpoint_sup.erl index c8bd065..271f31a 100644 --- a/apps/iot/src/endpoint/endpoint_sup.erl +++ b/apps/iot/src/endpoint/endpoint_sup.erl @@ -30,9 +30,9 @@ start_link() -> %% modules => modules()} % optional init([]) -> SupFlags = #{strategy => one_for_one, intensity => 1000, period => 3600}, - Endpoints = endpoint_bo:get_all_endpoints(), + Endpoints = iot_api:get_all_endpoints(), ChildSpecs = lists:flatmap(fun(EndpointInfo) -> - case endpoint_bo:endpoint_record(EndpointInfo) of + case endpoint:endpoint_record(EndpointInfo) of error -> []; {ok, Endpoint} -> diff --git a/apps/iot/src/http_handlers/endpoint_handler.erl b/apps/iot/src/http_handlers/endpoint_handler.erl index 0e14e16..6a59576 100644 --- a/apps/iot/src/http_handlers/endpoint_handler.erl +++ b/apps/iot/src/http_handlers/endpoint_handler.erl @@ -30,11 +30,11 @@ handle_request("POST", "/endpoint/run_statuses", _, Ids) when is_list(Ids) -> {ok, 200, iot_util:json_data(Statuses)}; handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer(Id) -> - case endpoint_bo:get_endpoint(Id) of + case iot_api:get_endpoint(Id) of undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; {ok, EndpointInfo} -> - case endpoint_bo:endpoint_record(EndpointInfo) of + case endpoint:endpoint_record(EndpointInfo) of {ok, Endpoint = #endpoint{name = Name}} -> case endpoint_sup:ensured_endpoint_started(Endpoint) of {ok, Pid} when is_pid(Pid) -> @@ -49,7 +49,7 @@ handle_request("POST", "/endpoint/start", _, #{<<"id">> := Id}) when is_integer( end; handle_request("POST", "/endpoint/stop", _, #{<<"id">> := Id}) when is_integer(Id) -> - case endpoint_bo:get_endpoint(Id) of + case iot_api:get_endpoint(Id) of undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; {ok, _} -> @@ -63,11 +63,11 @@ handle_request("POST", "/endpoint/stop", _, #{<<"id">> := Id}) when is_integer(I end; handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_integer(Id) -> - case endpoint_bo:get_endpoint(Id) of + case iot_api:get_endpoint(Id) of undefined -> {ok, 200, iot_util:json_error(404, <<"endpoint not found">>)}; {ok, EndpointInfo} -> - case endpoint_bo:endpoint_record(EndpointInfo) of + case endpoint:endpoint_record(EndpointInfo) of {ok, Endpoint = #endpoint{name = Name}} -> case endpoint:get_pid(Id) of undefined -> diff --git a/apps/iot/src/http_handlers/host_handler.erl b/apps/iot/src/http_handlers/host_handler.erl index 2701736..23930ba 100644 --- a/apps/iot/src/http_handlers/host_handler.erl +++ b/apps/iot/src/http_handlers/host_handler.erl @@ -47,7 +47,7 @@ handle_request("POST", "/host/reload", _, #{<<"uuid">> := UUID}) when is_binary( lager:debug("[host_handler] will reload host uuid: ~p", [UUID]), case iot_host_sup:ensured_host_started(UUID) of {ok, Pid} when is_pid(Pid) -> - {ok, #{<<"authorize_status">> := AuthorizeStatus}} = host_bo:get_host_by_uuid(UUID), + {ok, #{<<"authorize_status">> := AuthorizeStatus}} = iot_api:get_host_by_uuid(UUID), ok = iot_host:activate(Pid, AuthorizeStatus =:= 1), lager:debug("[host_handler] already_started reload host uuid: ~p, success", [UUID]), {ok, 200, iot_util:json_data(<<"success">>)}; diff --git a/apps/iot/src/iot_api.erl b/apps/iot/src/iot_api.erl index b5a20e3..9dc495d 100644 --- a/apps/iot/src/iot_api.erl +++ b/apps/iot/src/iot_api.erl @@ -8,7 +8,6 @@ %%%------------------------------------------------------------------- -module(iot_api). -author("anlicheng"). --include("endpoint.hrl"). %% API -export([ai_event/1]). @@ -17,7 +16,7 @@ -export([get_all_hosts/0, get_host_by_id/1, get_host_by_uuid/1, change_host_status/2]). -export([get_host_devices/1, get_device_by_uuid/1, change_device_status/2]). --export([get_all_endpoints/0, get_endpoint/1, endpoint_record/1]). +-export([get_all_endpoints/0, get_endpoint/1]). %%%=================================================================== %%% API @@ -51,7 +50,7 @@ get_host_by_id(HostId) when is_integer(HostId) -> end. %% 修改主机的状态 --spec change_host_status(UUID :: binary(), Status :: integer()) -> {ok, AffectedRows :: integer()} | {error, Reason :: any()}. +-spec change_host_status(UUID :: binary(), Status :: integer()) -> {ok, Result :: any()} | {error, Reason :: any()}. change_host_status(UUID, NStatus) when is_binary(UUID), is_integer(NStatus) -> do_post("/change_host_status", [{<<"uuid">>, UUID}, {<<"new_status">>, integer_to_binary(NStatus)}]). @@ -96,75 +95,6 @@ get_endpoint(Id) when is_integer(Id) -> undefined end. --spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}. -endpoint_record(#{<<"id">> := Id, <<"name">> := Name, <<"title">> := Title, <<"type">> := Type, <<"config_json">> := ConfigJson, - <<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) -> - try - Config = parse_config(Type, catch jiffy:decode(ConfigJson, [return_maps])), - {ok, #endpoint { - id = Id, - name = Name, - title = Title, - config = Config, - status = Status, - updated_at = UpdatedAt, - created_at = CreatedAt - }} - catch throw:_ -> - error - end. - -parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) -> - #mqtt_endpoint{ - host = Host, - port = Port, - client_id = ClientId, - username = Username, - password = Password, - topic = Topic, - qos = Qos - }; -parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) -> - #http_endpoint{ - url = Url, - pool_size = PoolSize - }; -parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - Mechanism = case Mechanism0 of - <<"sha_256">> -> - scram_sha_256; - <<"sha_512">> -> - scram_sha_512; - <<"plain">> -> - plain; - _ -> - plain - end, - - #kafka_endpoint{ - sasl_config = {Mechanism, Username, Password}, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) -> - #kafka_endpoint{ - sasl_config = undefined, - bootstrap_servers = parse_bootstrap_servers(BootstrapServers), - topic = Topic - }; -parse_config(_, _) -> - throw(invalid_config). - -parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) -> - lists:filtermap(fun(S) -> - case binary:split(S, <<":">>) of - [Host0, Port0] -> - {true, {binary_to_list(Host0), binary_to_integer(Port0)}}; - _ -> - false - end - end, BootstrapServers). - ai_event(Id) when is_integer(Id) -> Token = iot_util:md5(<>), {ok, Url} = application:get_env(iot, api_url), diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 42096ce..36a088f 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -32,7 +32,7 @@ -spec new(DeviceInfo :: binary() | map()) -> error | {ok, Device :: #device{}}. new(DeviceUUID) when is_binary(DeviceUUID) -> - case device_bo:get_device_by_uuid(DeviceUUID) of + case iot_api:get_device_by_uuid(DeviceUUID) of {ok, #{<<"device_uuid">> := DeviceUUID, <<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> {ok, #device{device_uuid = DeviceUUID, status = Status, auth_state = auth_state(AuthorizeStatus)}}; undefined -> @@ -50,11 +50,11 @@ is_activated(#device{auth_state = AuthState}) -> change_status(Device = #device{status = Status}, NewStatus) when is_integer(NewStatus), Status =:= NewStatus -> Device; change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_ONLINE) -> - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_ONLINE), + iot_api:change_device_status(DeviceUUID, ?DEVICE_ONLINE), report_event(DeviceUUID, ?DEVICE_ONLINE), Device#device{status = ?DEVICE_ONLINE}; change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) -> - {ok, #{<<"status">> := Status}} = device_bo:get_device_by_uuid(DeviceUUID), + {ok, #{<<"status">> := Status}} = iot_api:get_device_by_uuid(DeviceUUID), case Status of ?DEVICE_NOT_JOINED -> lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]), @@ -63,7 +63,7 @@ change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) -> lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]), Device#device{status = ?DEVICE_OFFLINE}; ?DEVICE_ONLINE -> - {ok, _} = device_bo:change_status(DeviceUUID, ?DEVICE_OFFLINE), + iot_api:change_device_status(DeviceUUID, ?DEVICE_OFFLINE), report_event(DeviceUUID, ?DEVICE_OFFLINE), Device#device{status = ?DEVICE_OFFLINE} end. @@ -71,7 +71,7 @@ change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) -> -spec reload(Device :: #device{}) -> error | {ok, NDevice :: #device{}}. reload(Device = #device{device_uuid = DeviceUUID}) -> lager:debug("[iot_device] will reload: ~p", [DeviceUUID]), - case device_bo:get_device_by_uuid(DeviceUUID) of + case iot_api:get_device_by_uuid(DeviceUUID) of {ok, #{<<"authorize_status">> := AuthorizeStatus, <<"status">> := Status}} -> {ok, Device#device{device_uuid = DeviceUUID, status = Status, auth_state = auth_state(AuthorizeStatus)}}; undefined -> diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 957955e..c3949d8 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -189,7 +189,7 @@ start_link(Name, UUID) when is_atom(Name), is_binary(UUID) -> %% gen_statem:start_link/[3,4], this function is called by the new %% process to initialize. init([UUID]) -> - case host_bo:get_host_by_uuid(UUID) of + case iot_api:get_host_by_uuid(UUID) of {ok, #{<<"id">> := HostId, <<"authorize_status">> := AuthorizeStatus}} -> %% 通过host_id注册别名, 可以避免通过查询数据库获取HostPid AliasName = get_alias_name(HostId), @@ -204,7 +204,7 @@ init([UUID]) -> end, %% 加载设备信息 - {ok, DeviceInfos} = device_bo:get_host_devices(HostId), + {ok, DeviceInfos} = iot_api:get_host_devices(HostId), Devices = lists:filtermap(fun(DeviceInfo = #{<<"device_uuid">> := DeviceUUID}) -> case iot_device:new(DeviceInfo) of error -> @@ -313,9 +313,9 @@ handle_event({call, From}, {attach_channel, ChannelPid}, StateName, State = #sta ?STATE_ACTIVATED -> erlang:monitor(process, ChannelPid), %% 更新主机为在线状态 - {ok, AffectedRow} = host_bo:change_status(UUID, ?HOST_ONLINE), + ChangeResult = iot_api:change_host_status(UUID, ?HOST_ONLINE), report_event(UUID, ?HOST_ONLINE), - lager:debug("[iot_host] host_id(attach_channel) uuid: ~p, will change status, affected_row: ~p", [UUID, AffectedRow]), + lager:debug("[iot_host] host_id(attach_channel) uuid: ~p, will change status, result: ~p", [UUID, ChangeResult]), {keep_state, State#state{channel_pid = ChannelPid, has_session = true}, [{reply, From, ok}]}; %% 主机未激活 ?STATE_DENIED -> @@ -405,14 +405,14 @@ handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCou %% 没有收到心跳包,主机下线, 设备状态不变 handle_event(info, {timeout, _, heartbeat_ticker}, _, State = #state{uuid = UUID, heartbeat_counter = 0, channel_pid = ChannelPid}) -> lager:warning("[iot_host] uuid: ~p, heartbeat lost, devices will unknown", [UUID]), - {ok, #{<<"status">> := Status}} = host_bo:get_host_by_uuid(UUID), + {ok, #{<<"status">> := Status}} = iot_api:get_host_by_uuid(UUID), case Status of ?HOST_NOT_JOINED -> lager:debug("[iot_host] host: ~p, host_maybe_offline, host not joined, can not change to offline", [UUID]); ?HOST_OFFLINE -> lager:debug("[iot_host] host: ~p, host_maybe_offline, host now is offline, do nothing", [UUID]); ?HOST_ONLINE -> - {ok, _} = host_bo:change_status(UUID, ?HOST_OFFLINE), + iot_api:change_host_status(UUID, ?HOST_OFFLINE), report_event(UUID, ?HOST_OFFLINE) end, diff --git a/apps/iot/src/iot_host_sup.erl b/apps/iot/src/iot_host_sup.erl index 76c28bc..26d875e 100644 --- a/apps/iot/src/iot_host_sup.erl +++ b/apps/iot/src/iot_host_sup.erl @@ -15,7 +15,7 @@ start_link() -> supervisor:start_link({local, ?MODULE}, ?MODULE, []). init([]) -> - Specs = lists:map(fun child_spec/1, host_bo:get_all_hosts()), + Specs = lists:map(fun child_spec/1, iot_api:get_all_hosts()), {ok, {#{strategy => one_for_one, intensity => 1000, period => 3600}, Specs}}. diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index de9cdfc..9ceb712 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -96,7 +96,7 @@ handle_info({tcp, Socket, <>}, lager:debug("[ws_channel] auth uuid: ~p", [UUID]), case iot_auth:check(Username, Token, UUID, Salt, Timestamp) of true -> - case host_bo:get_host_by_uuid(UUID) of + case iot_api:get_host_by_uuid(UUID) of undefined -> lager:warning("[ws_channel] uuid: ~p, user: ~p, host not found", [UUID, Username]), {stop, State}; diff --git a/docs/iot_api.md b/docs/iot_api.md new file mode 100644 index 0000000..74d9659 --- /dev/null +++ b/docs/iot_api.md @@ -0,0 +1,395 @@ +--- + +```markdown +# 📘 IoT API 接口文档 + +> 模块:`iot_api` +> 作者:**anlicheng** +> 创建时间:2023-12-24 +> 数据格式:`application/json` +> 认证方式:内置 `API_TOKEN = "wv6fGyBhl*7@AsD9"` + +--- + +## 🔐 通用请求头 + +| Header | 值 | 说明 | +|--------|----|------| +| Content-Type | application/json | 请求体格式 | +| Accept | application/json | 响应体格式 | + +--- + +## 🧩 主机(Host)相关接口 + +### 1. 获取所有主机列表 + +**接口:** +``` + +GET /get_all_hosts + +```` + +**参数:** +无 + +**返回示例:** +```json +{"result": + [ + "uuid-1", + "uuid-2", + "uuid-3" + ] +} +```` +--- + +### 2. 通过 UUID 获取主机信息 + +**接口:** + +``` +GET /get_host_by_uuid?uuid= +``` + +**参数:** + +| 参数名 | 类型 | 必填 | 说明 | +| ---- | ------ | -- | ------- | +| uuid | string | ✅ | 主机 UUID | + +**返回示例:** + +```json +{ + "result": { + "id": 1, + "uuid": "uuid-1", + "name": "HostA", + "status": 1, + "created_at": "2024-01-01T00:00:00Z" + } +} +``` + +--- + +### 3. 通过主机 ID 获取主机信息 + +**接口:** + +``` +GET /get_host_by_id?host_id= +``` + +**参数:** + +| 参数名 | 类型 | 必填 | 说明 | +| ------- | ------- | -- | ----- | +| host_id | integer | ✅ | 主机 ID | + +**返回示例:** + +```json +{ + "result": { + "id": 1, + "uuid": "uuid-1", + "name": "HostA" + } +} +``` + +--- + +### 4. 修改主机状态 + +**接口:** + +``` +POST /change_host_status +``` + +**请求体:** + +```json +{ + "uuid": "uuid-1", + "new_status": 1 +} +``` + +**返回示例:** + +```json +{ + "result": 1 +} +``` + +--- + +### 5. 获取主机下的设备列表 + +**接口:** + +``` +GET /get_host_devices?host_id= +``` + +**参数:** + +| 参数名 | 类型 | 必填 | 说明 | +| ------- | ------- | -- | ----- | +| host_id | integer | ✅ | 主机 ID | + +**返回示例:** + +```json +{ + "result": [ + { "device_uuid": "dev-1", "status": 1 }, + { "device_uuid": "dev-2", "status": 0 } + ] +} +``` + +--- + +## 🔧 设备(Device)相关接口 + +### 6. 获取设备详情 + +**接口:** + +``` +GET /get_device_by_uuid?device_uuid= +``` + +**参数:** + +| 参数名 | 类型 | 必填 | 说明 | +| ----------- | ------ | -- | ------- | +| device_uuid | string | ✅ | 设备 UUID | + +**返回示例:** + +```json +{ + "result": { + "device_uuid": "dev-1", + "type": "sensor", + "status": 1 + } +} +``` + +--- + +### 7. 修改设备状态 + +**接口:** + +``` +POST /change_device_status +``` + +**请求体:** + +```json +{ + "device_uuid": "dev-1", + "new_status": 1 +} +``` + +**返回示例:** + +```json +{ + "result": 1 +} +``` + +--- + +## 🌐 Endpoint(数据终端)相关接口 + +### 8. 获取所有 Endpoint + +**接口:** + +``` +GET /get_all_endpoints +``` + +**参数:** 无 + +**返回示例:** + +```json +{ + "result": [ + { + "id": 1, + "name": "mqtt_endpoint", + "type": "mqtt", + "status": 1 + } + ] +} +``` + +--- + +### 9. 获取指定 Endpoint 信息 + +**接口:** + +``` +GET /get_endpoint?id= +``` + +**参数:** + +| 参数名 | 类型 | 必填 | 说明 | +| --- | ------- | -- | ----------- | +| id | integer | ✅ | Endpoint ID | + +**返回示例:** + +```json +{ + "result": { + "id": 1, + "name": "mqtt_endpoint", + "title": "MQTT接口", + "type": "mqtt", + "config_json": "{\"host\":\"mqtt.example.com\",\"port\":1883}" + } +} +``` + +--- + +## 🤖 AI 事件接口 + +### 10. 触发 AI 事件 + +**接口:** + +``` +POST {api_url} +``` + +**请求体:** + +```json +{ + "id": 123, + "token": "" +} +``` + +**返回:** + +* 成功:HTTP 200 +* 失败:返回错误日志(`lager` 记录) + +**说明:** +`token` 通过以下 Erlang 表达式生成: + +```erlang +iot_util:md5(<>) +``` + +--- + +## 🧱 Endpoint 配置结构说明 + +根据 `type` 不同,`config_json` 结构如下: + +### MQTT + +```json +{ + "host": "broker.example.com", + "port": 1883, + "client_id": "client-1", + "username": "user", + "password": "pass", + "topic": "iot/topic", + "qos": 1 +} +``` + +### HTTP + +```json +{ + "url": "https://api.example.com", + "pool_size": 5 +} +``` + +### Kafka(带认证) + +```json +{ + "bootstrap_servers": ["kafka1:9092", "kafka2:9092"], + "topic": "iot_topic", + "sasl_config": { + "username": "user", + "password": "pass", + "mechanism": "sha_512" + } +} +``` + +### Kafka(无认证) + +```json +{ + "bootstrap_servers": ["kafka1:9092"], + "topic": "iot_topic" +} +``` + +--- + +## ⚠️ 错误响应格式 + +统一错误返回结构: + +```json +{ + "error": { + "code": 400, + "message": "Invalid parameter" + } +} +``` + +--- + +## 📄 附录:内部函数说明(开发者参考) + +| 函数名 | 作用 | 说明 | +| ------------------- | ----------------------------- | ------------------------------------- | +| `do_get/2` | 执行 HTTP GET 请求 | 返回 `{ok, Result}` 或 `{error, Reason}` | +| `do_post/2` | 执行 HTTP POST 请求 | 自动编码 JSON,解析返回值 | +| `endpoint_record/1` | 将 JSON 转换为内部 `#endpoint{}` 结构 | 内部使用 | +| `parse_config/2` | 解析不同类型 endpoint 配置 | 支持 mqtt/http/kafka | + +--- + +📅 **最后更新:2025-11-07** + +``` + +--- + +是否希望我帮你把这份 `.md` 文件转成一个 **可浏览的 HTML 文档**(带目录与搜索功能)? +```