Compare commits

..

No commits in common. "d0c468c97da53a08c551b127f4428d480a2da6d9" and "9d91c0ef1ea132c0860faee66def664b098554ff" have entirely different histories.

11 changed files with 85 additions and 1188 deletions

View File

@ -1,878 +0,0 @@
# 🧩 IoT 容器管理接口文档
**模块**`container_handler`
**作者**licheng5
**创建时间**2020-04-26
**说明**:提供容器的部署、配置、启动、停止、查询等管理 API 接口。
---
## 📦 模块结构
| 模块 | 说明 |
|------|------|
| `container_handler` | 提供容器管理的 HTTP 接口处理 |
| `iot_util` | 工具模块,用于生成标准化 JSON 响应 |
---
## ⚙️ `iot_util` 模块函数声明
```erlang
%%--------------------------------------------------------------------
%% @doc
%% 将数据封装为标准 JSON 响应:
%% {"result": Data}
%%--------------------------------------------------------------------
json_data(Data) ->
jiffy:encode(#{
<<"result">> => Data
}, [force_utf8]).
%%--------------------------------------------------------------------
%% @doc
%% 生成错误响应 JSON
%% {
%% "error": {
%% "code": ErrCode,
%% "message": ErrMessage
%% }
%% }
%%--------------------------------------------------------------------
json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) ->
jiffy:encode(#{
<<"error">> => #{
<<"code">> => ErrCode,
<<"message">> => ErrMessage
}
}, [force_utf8]).
```
### 📘 返回格式说明
| 字段 | 说明 |
|------|------|
| `result` | 正常返回数据 |
| `error.code` | 错误代码 |
| `error.message` | 错误信息 |
---
## 🌐 HTTP API 接口列表
以下为 `container_handler` 模块导出的全部 HTTP 接口。
---
### 1⃣ 获取容器列表
**URL**`/container/get_all`
**Method**`GET`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | list | 容器信息列表 |
#### 示例响应
```json
{
"result": [
{"name": "container_1", "status": "running"},
{"name": "container_2", "status": "stopped"}
]
}
```
#### 错误响应
```json
{
"error": {
"code": -1,
"message": "host not found"
}
}
```
---
### 2⃣ 下发配置文件
**URL**`/container/push_config`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| container_name | binary (string) | ✅ | 容器名称 |
| config | binary (string, JSON) | ✅ | 容器配置内容JSON 字符串) |
| timeout | integer | ✅ | 超时时间(秒) |
### 请求示例
```json
{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "container_name": "my_nginx_new", "config": "{\"application\":{\"namexyz\":\"RandomConfigApp\",\"version\":\"1.2.7\",\"environment\":{\"debug_mode\":true,\"log_level\":\"info\",\"max_log_files\":10}},\"server\":{\"host\":\"127.0.0.1\",\"port\":8080,\"ssl_enabled\":false,\"allowed_origins\":[\"https:\\/\\/example.com\",\"http:\\/\\/localhost:3000\"]},\"database\":{\"type\":\"postgresql\",\"host\":\"db.example.com\",\"port\":5432,\"username\":\"admin_7xq9f\",\"password\":\"p@ssw0rd!r4nd\",\"connection_pool\":15,\"timeout_seconds\":30},\"features\":{\"enable_analytics\":true,\"enable_cache\":false,\"experimental_features\":[\"ai_enhancement\",\"realtime_sync\"]},\"third_party\":{\"api_key\":\"a3b8c2d4e5f6g7h8i9j0k1l2m3n4o5p\",\"weather_service_url\":\"https:\\/\\/api.weather.example\\/v3\",\"payment_gateway\":{\"endpoint\":\"https:\\/\\/pay.example.com\",\"merchant_id\":\"M123456789\"}},\"scheduled_tasks\":[{\"name\":\"nightly_backup\",\"cron\":\"0 3 * * *\",\"enabled\":true},{\"name\":\"log_cleanup\",\"interval_minutes\":1440,\"retention_days\":7}],\"admins\":[{\"username\":\"alice_dev\",\"email\":\"alice@example.com\",\"permissions\":[\"read\",\"write\",\"admin\"]},{\"username\":\"bob_ops\",\"email\":\"bob@example.org\",\"permissions\":[\"read\",\"audit\"]}]}", "timeout": 10}
```
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | map | 配置结果 |
#### 示例响应
```json
{
"result": {
"container_name": "sensor_service",
"status": "config pushed"
}
}
```
#### 错误响应
```json
{
"error": {
"code": -1,
"message": "host not found"
}
}
```
---
### 3⃣ 部署容器服务
**URL**`/container/deploy`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| task_id | integer | ✅ | 任务 ID |
| config | map | ✅ | 部署配置内容 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | map | 部署结果 |
### 请求示例
```json
{
"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm",
"task_id": 1,
"timeout": 10,
"config": {
"image": "docker.1ms.run/library/nginx:latest",
"container_name": "my_nginx_new",
"command": [
"nginx",
"-g",
"daemon off;"
],
"entrypoint": [
"/docker-entrypoint.sh"
],
"envs": [
"ENV1=val1",
"ENV2=val2"
],
"env_file": [
"./env.list"
],
"ports": [
"8080:80",
"443:443"
],
"expose": [
"80",
"443"
],
"volumes": [
"/host/data:/data",
"/host/log:/var/log"
],
"networks": [
"mynet"
],
"labels": {
"role": "web",
"env": "prod"
},
"restart": "always",
"user": "www-data",
"working_dir": "/app",
"hostname": "myhost",
"privileged": true,
"cap_add": [
"NET_ADMIN"
],
"cap_drop": [
"MKNOD"
],
"devices": [
"/dev/snd:/dev/snd"
],
"mem_limit": "512m",
"mem_reservation": "256m",
"cpu_shares": 512,
"cpus": 1.5,
"ulimits": {
"nofile": "1024:2048"
},
"sysctls": {
"net.ipv4.ip_forward": "1"
},
"tmpfs": [
"/tmp"
],
"extra_hosts": [
"host1:192.168.0.1"
],
"healthcheck": {
"test": [
"CMD-SHELL",
"curl -f http://localhost || exit 1"
],
"interval": "30s",
"timeout": "10s",
"retries": 3
}
}
}
```
#### 示例响应
```json
{
"result": {
"task_id": 1001,
"status": "deployed"
}
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "host not found"
}
}
```
---
### 4⃣ 启动容器服务
**URL**`/container/start`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| container_name | binary (string) | ✅ | 容器名称 |
### 请求示例
```json
{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "container_name": "my_nginx_new"}
```
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | map | 启动结果 |
#### 示例响应
```json
{
"result": {
"container_name": "sensor_service",
"status": "started"
}
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "host not found"
}
}
```
---
### 5⃣ 停止容器服务
**URL**`/container/stop`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| container_name | binary (string) | ✅ | 容器名称 |
### 请求示例
```json
{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "container_name": "my_nginx_new"}
```
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | map | 停止结果 |
#### 示例响应
```json
{
"result": {
"container_name": "sensor_service",
"status": "stopped"
}
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "host not found"
}
}
```
---
### 6⃣ 未知路径处理
**说明**:如果请求路径未匹配任何定义接口,将返回错误信息。
#### 示例响应
```json
{
"error": {
"code": -1,
"message": "url: /unknown/path not found"
}
}
```
---
## 🧾 返回约定总结
| 类型 | 说明 |
|------|------|
| `{"result": Data}` | 表示请求成功 |
| `{"error": {"code": N, "message": Text}}` | 表示请求失败 |
# 🧩 IoT Endpoint 管理接口文档
**模块**`endpoint_handler`
**作者**licheng5
**创建时间**2020-04-26
**说明**:用于管理 IoT Endpoint 的运行状态,包括启动、停止、重启、状态查询等。
---
## 📦 模块结构
| 模块 | 说明 |
|------|------|
| `endpoint_handler` | 提供 Endpoint 管理的 HTTP 接口处理 |
| `iot_util` | 工具模块,用于生成标准化 JSON 响应 |
---
## ⚙️ `iot_util` 模块函数声明
```erlang
%%--------------------------------------------------------------------
%% @doc
%% 将数据封装为标准 JSON 响应:
%% {"result": Data}
%%--------------------------------------------------------------------
json_data(Data) ->
jiffy:encode(#{
<<"result">> => Data
}, [force_utf8]).
%%--------------------------------------------------------------------
%% @doc
%% 生成错误响应 JSON
%% {
%% "error": {
%% "code": ErrCode,
%% "message": ErrMessage
%% }
%% }
%%--------------------------------------------------------------------
json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) ->
jiffy:encode(#{
<<"error">> => #{
<<"code">> => ErrCode,
<<"message">> => ErrMessage
}
}, [force_utf8]).
```
### 📘 返回格式说明
| 字段 | 说明 |
|------|------|
| `result` | 正常返回数据 |
| `error.code` | 错误代码 |
| `error.message` | 错误信息 |
---
## 🌐 HTTP API 接口列表
以下为 `endpoint_handler` 模块导出的全部 HTTP 接口。
---
### 1⃣ 获取 Endpoint 运行状态
**URL**`/endpoint/run_statuses`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| Ids | list | ✅ | Endpoint ID 列表 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | list | 每个 ID 对应状态:`0` 未运行,`1` 运行中 |
#### 示例请求
```json
[1, 2, 3]
```
#### 示例响应
```json
{
"result": [1, 0, 1]
}
```
---
### 2⃣ 启动 Endpoint
**URL**`/endpoint/start`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| id | integer | ✅ | Endpoint 唯一 ID |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 启动结果,如 `"success"` |
#### 示例响应
```json
{
"result": "success"
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "endpoint not found"
}
}
```
---
### 3⃣ 停止 Endpoint
**URL**`/endpoint/stop`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| id | integer | ✅ | Endpoint 唯一 ID |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 停止结果,如 `"success"` |
#### 示例响应
```json
{
"result": "success"
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "stop endpoint error"
}
}
```
---
### 4⃣ 重启 Endpoint
**URL**`/endpoint/restart`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| id | integer | ✅ | Endpoint 唯一 ID |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 重启结果,如 `"success"` |
#### 示例响应
```json
{
"result": "success"
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "restart endpoint error"
}
}
```
---
### 5⃣ 未知路径处理
**说明**:如果请求路径未匹配任何定义接口,将返回错误信息。
#### 示例响应
```json
{
"error": {
"code": -1,
"message": "url: /unknown/path not found"
}
}
```
---
## 🧾 返回约定总结
| 类型 | 说明 |
|------|------|
| `{"result": Data}` | 表示请求成功 |
| `{"error": {"code": N, "message": Text}}` | 表示请求失败 |
# 🧩 IoT Host 管理接口文档
**模块**`host_handler`
**作者**licheng5
**创建时间**2020-04-26
**说明**:用于管理 IoT 主机,包括主机指标、状态查询、激活、删除、发布事件等接口。
---
## 📦 模块结构
| 模块 | 说明 |
|------|------|
| `host_handler` | 提供 Host 管理的 HTTP 接口处理 |
| `iot_util` | 工具模块,用于生成标准化 JSON 响应 |
---
## ⚙️ `iot_util` 模块函数声明
```erlang
%%--------------------------------------------------------------------
%% @doc
%% 将数据封装为标准 JSON 响应:
%% {"result": Data}
%%--------------------------------------------------------------------
json_data(Data) ->
jiffy:encode(#{
<<"result">> => Data
}, [force_utf8]).
%%--------------------------------------------------------------------
%% @doc
%% 生成错误响应 JSON
%% {
%% "error": {
%% "code": ErrCode,
%% "message": ErrMessage
%% }
%% }
%%--------------------------------------------------------------------
json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) ->
jiffy:encode(#{
<<"error">> => #{
<<"code">> => ErrCode,
<<"message">> => ErrMessage
}
}, [force_utf8]).
```
### 📘 返回格式说明
| 字段 | 说明 |
|------|------|
| `result` | 正常返回数据 |
| `error.code` | 错误代码 |
| `error.message` | 错误信息 |
---
## 🌐 HTTP API 接口列表
以下为 `host_handler` 模块导出的全部 HTTP 接口。
---
### 1⃣ 获取主机指标
**URL**`/host/metric`
**Method**`GET`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | map | 主机指标信息 |
#### 示例响应
```json
{
"result": {"cpu": 20, "memory": 1024, "disk": 51200}
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "host not found"
}
}
```
#### 无指标信息响应
```json
{
"error": {
"code": 404,
"message": "no metric info"
}
}
```
---
### 2⃣ 查询主机状态
**URL**`/host/status`
**Method**`GET`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | map | 主机状态信息 |
#### 示例响应
```json
{
"result": {"authorize_status": 1, "active": true}
}
```
---
### 3⃣ 重新加载主机信息
**URL**`/host/reload`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 重新加载结果,如 `"success"` |
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "reload error"
}
}
```
---
### 4⃣ 删除主机
**URL**`/host/delete`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 删除结果,如 `"success"` |
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "error"
}
}
```
---
### 5⃣ 激活主机
**URL**`/host/activate`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| auth | boolean | ✅ | `true` 激活, `false` 取消激活 |
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 激活结果,如 `"success"` |
#### 错误响应
```json
{
"error": {
"code": 400,
"message": "host not found"
}
}
```
---
### 6⃣ 发布主机事件
**URL**`/host/pub`
**Method**`POST`
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| topic | binary (string) | ✅ | 事件主题 |
| content | binary (string) | ✅ | 发布内容 |
### 请求示例
```json
{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "topic": "/device/1234/all", "content": "this is a topic payload", "timeout": 10}
```
#### 响应参数
| 字段 | 类型 | 说明 |
|------|------|------|
| result | string | 发布结果,如 `"success"` |
#### 错误响应
```json
{
"error": {
"code": 400,
"message": "host not found"
}
}
```
---
### 7⃣ 未知路径处理
**说明**:如果请求路径未匹配任何定义接口,将返回错误信息。
#### 示例响应
```json
{
"error": {
"code": -1,
"message": "url: /unknown/path not found"
}
}
```
---
## 🧾 返回约定总结
| 类型 | 说明 |
|------|------|
| `{"result": Data}` | 表示请求成功 |
| `{"error": {"code": N, "message": Text}}` | 表示请求失败 |

12
README.md Normal file
View File

@ -0,0 +1,12 @@
iot
=====
An OTP application
## erlang client sdk
https://github.com/emqx/emqtt
Build
-----
$ rebar3 compile

View File

@ -21,20 +21,15 @@
%%
-define(MESSAGE_AUTH_REQUEST, 16#01).
-define(MESSAGE_AUTH_REPLY, 16#02).
-define(MESSAGE_PUB, 16#02).
-define(MESSAGE_COMMAND, 16#03).
-define(MESSAGE_DEPLOY, 16#04).
-define(MESSAGE_PUB, 16#05).
-define(MESSAGE_RPC_DEPLOY, 16#04).
-define(MESSAGE_RPC_CONTAINER, 16#05).
-define(MESSAGE_DATA, 16#06).
-define(MESSAGE_EVENT, 16#07).
%% efka主动上报的event-stream流, : docker-create的实时处理逻辑上报
-define(MESSAGE_EVENT_STREAM, 16#08).
-define(MESSAGE_JSONRPC_REQUEST, 16#F0).
-define(MESSAGE_JSONRPC_REPLY, 16#F1).
%%
-define(MESSAGE_RPC_REPLY, 16#FF).
%%%% ,
%%
@ -48,11 +43,6 @@
timestamp :: integer()
}).
-record(auth_reply, {
code :: integer(),
payload :: binary()
}).
-record(pub, {
topic :: binary(),
content :: binary()
@ -63,14 +53,20 @@
command :: binary()
}).
-record(jsonrpc_request, {
method :: binary(),
params = <<>> :: any()
-record(rpc_deploy, {
task_id :: integer(),
config :: binary()
}).
-record(jsonrpc_reply, {
result :: any() | undefined,
error :: any() | undefined
-record(rpc_container, {
method :: binary(),
container_name :: binary(),
params = <<>> :: binary()
}).
-record(rpc_reply, {
code :: integer(),
payload :: binary()
}).
-record(data, {
@ -85,9 +81,3 @@
event_type :: integer(),
params :: binary()
}).
-record(task_event_stream, {
task_id :: integer(),
type :: binary(),
stream :: binary()
}).

View File

@ -13,25 +13,6 @@
%% API
-export([handle_request/4]).
handle_request("GET", "/container/get_all", #{<<"uuid">> := UUID}, _) when is_binary(UUID) ->
%% ConfigJson是否是合法的json字符串
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(-1, <<"host not found">>)};
Pid when is_pid(Pid) ->
case iot_host:get_containers(Pid) of
{ok, Ref} ->
case iot_host:await_reply(Ref, 10000) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(-1, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(-1, Reason)}
end
end;
%% config.json,
handle_request("POST", "/container/push_config", _,
#{<<"uuid">> := UUID, <<"container_name">> := ContainerName, <<"config">> := Config, <<"timeout">> := Timeout0})
@ -65,7 +46,8 @@ handle_request("POST", "/container/deploy", _, #{<<"uuid">> := UUID, <<"task_id"
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case iot_host:deploy_container(Pid, TaskId, Config) of
ConfigBin = jiffy:encode(Config, [force_utf8]),
case iot_host:deploy_container(Pid, TaskId, ConfigBin) of
{ok, Ref} ->
case iot_host:await_reply(Ref, 5000) of
{ok, Result} ->

View File

@ -1,46 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 08. 5 2025 13:00
%%%-------------------------------------------------------------------
-module(event_stream_handler).
-author("anlicheng").
%% API
-export([init/2]).
init(Req0, Opts) ->
Method = binary_to_list(cowboy_req:method(Req0)),
Path = binary_to_list(cowboy_req:path(Req0)),
GetParams0 = cowboy_req:parse_qs(Req0),
GetParams = maps:from_list(GetParams0),
#{<<"task_id">> := TaskId0} = GetParams,
TaskId = binary_to_integer(TaskId0),
lager:debug("method: ~p, path: ~p, get: ~p", [Method, Path, GetParams]),
Req1 = cowboy_req:stream_reply(200, #{
<<"content-type">> => <<"text/event-stream">>,
<<"cache-control">> => <<"no-cache">>
}, Req0),
ok = iot_event_stream_observer:add_listener(self(), TaskId),
receiver_events(TaskId, Req1),
{ok, Req1, Opts}.
receiver_events(TaskId, Req) ->
receive
{stream_data, TaskId, Type, Stream} ->
Data = jiffy:encode(#{<<"type">> => Type, <<"stream">> => Stream}, [force_utf8]),
Body = iolist_to_binary([<<"event: message\n">>, <<"data: ", Data/binary, "\n">>, <<"\n">>]),
ok = cowboy_req:stream_body(Body, nofin, Req),
receiver_events(TaskId, Req);
{stream_close, TaskId} ->
CloseFrame = iolist_to_binary([<<"event: close\n">>, <<"data: bye\n">>, <<"\n">>]),
ok = cowboy_req:stream_body(CloseFrame, fin, Req)
end.

View File

@ -49,8 +49,7 @@ start_http_server() ->
{'_', [
{"/host/[...]", http_protocol, [host_handler]},
{"/container/[...]", http_protocol, [container_handler]},
{"/device/[...]", http_protocol, [device_handler]},
{"/event_stream", event_stream_handler, []}
{"/device/[...]", http_protocol, [device_handler]}
]}
]),
@ -59,6 +58,7 @@ start_http_server() ->
num_acceptors => Acceptors,
shutdown => brutal_kill,
socket_opts => [
{reuseaddr, true},
{backlog, Backlog},
{port, Port}
]
@ -80,6 +80,10 @@ start_tcp_server() ->
num_acceptors => Acceptors,
shutdown => brutal_kill,
socket_opts => [
binary,
{packet, 4},
{reuseaddr, true},
{active, false},
{nodelay, false},
{backlog, Backlog},
{port, Port}

View File

@ -1,128 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 26. 9 2025 12:19
%%%-------------------------------------------------------------------
-module(iot_event_stream_observer).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_link/0]).
-export([add_listener/2, stream_data/3, stream_close/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
listeners = #{}
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec add_listener(ListenerPid :: pid(), TaskId :: integer()) -> ok.
add_listener(ListenerPid, TaskId) when is_pid(ListenerPid), is_integer(TaskId) ->
gen_server:call(?SERVER, {add_listener, ListenerPid, TaskId}).
-spec stream_data(TaskId :: integer(), Type :: binary(), Stream :: binary()) -> no_return().
stream_data(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) ->
gen_server:cast(?SERVER, {stream_data, TaskId, Type, Stream}).
-spec stream_close(TaskId :: integer()) -> no_return().
stream_close(TaskId) when is_integer(TaskId) ->
gen_server:cast(?SERVER, {stream_close, TaskId}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?SERVER}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({add_listener, ListenerPid, TaskId}, _From, State = #state{listeners = Listeners}) ->
erlang:monitor(process, ListenerPid),
{reply, ok, State#state{listeners = maps:put(TaskId, ListenerPid, Listeners)}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({stream_data, TaskId, Type, Stream}, State = #state{listeners = Listeners}) ->
case maps:find(TaskId, Listeners) of
error ->
ok;
{ok, ListenerPid} ->
is_process_alive(ListenerPid) andalso ListenerPid ! {stream_data, TaskId, Type, Stream}
end,
{noreply, State};
handle_cast({stream_close, TaskId}, State = #state{listeners = Listeners}) ->
case maps:find(TaskId, Listeners) of
error ->
ok;
{ok, ListenerPid} ->
is_process_alive(ListenerPid) andalso ListenerPid ! {stream_close, TaskId}
end,
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'DOWN', _Ref, process, Pid, _Reason}, State = #state{listeners = Listeners}) ->
NListeners = maps:filter(fun(_, ListenerPid) -> ListenerPid /= Pid end, Listeners),
{noreply, State#state{listeners = NListeners}}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -25,7 +25,7 @@
-export([get_metric/1, get_status/1]).
%%
-export([pub/3, attach_channel/2, command/3]).
-export([deploy_container/3, start_container/2, stop_container/2, config_container/3, get_containers/1, await_reply/2]).
-export([deploy_container/3, start_container/2, stop_container/2, config_container/3, await_reply/2]).
%%
-export([reload_device/2, delete_device/2, activate_device/3]).
-export([heartbeat/1]).
@ -68,7 +68,7 @@ get_alias_name(HostId0) when is_integer(HostId0) ->
binary_to_atom(<<"iot_host_id:", HostId/binary>>).
%%
-spec handle(Pid :: pid(), Packet :: {atom(), any()}) -> no_return().
-spec handle(Pid :: pid(), Packet :: {atom(), binary()} | {atom(), {binary(), binary()}}) -> no_return().
handle(Pid, Packet) when is_pid(Pid) ->
gen_statem:cast(Pid, {handle, Packet}).
@ -89,47 +89,37 @@ get_metric(Pid) when is_pid(Pid) ->
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_statem:call(Pid, {attach_channel, ChannelPid}).
-spec get_containers(Pid :: pid()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
get_containers(Pid) when is_pid(Pid) ->
Request = #jsonrpc_request{method = <<"get_containers">>, params = #{}},
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}).
-spec config_container(Pid :: pid(), ContainerName :: binary(), ConfigJson :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
config_container(Pid, ContainerName, ConfigJson) when is_pid(Pid), is_binary(ContainerName), is_binary(ConfigJson) ->
Request = #jsonrpc_request{method = <<"config_container">>, params = #{<<"container_name">> => ContainerName, <<"config">> => ConfigJson}},
EncConfigBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {jsonrpc_call, self(), EncConfigBin}).
EncConfigBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"config">>, container_name = ContainerName, params = ConfigJson}),
gen_statem:call(Pid, {rpc_call, self(), EncConfigBin}).
-spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: map()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_map(Config) ->
Request = #jsonrpc_request{method = <<"deploy">>, params = #{<<"task_id">> => TaskId, <<"config">> => Config}},
EncDeployBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {jsonrpc_call, self(), EncDeployBin}).
-spec deploy_container(Pid :: pid(), TaskId :: integer(), Config :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
deploy_container(Pid, TaskId, Config) when is_pid(Pid), is_integer(TaskId), is_binary(Config) ->
EncDeployBin = message_codec:encode(?MESSAGE_RPC_DEPLOY, #rpc_deploy{task_id = TaskId, config = Config}),
gen_statem:call(Pid, {rpc_call, self(), EncDeployBin}).
-spec start_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
start_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
Request = #jsonrpc_request{method = <<"start_container">>, params = #{<<"container_name">> => ContainerName}},
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}).
EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"start">>, container_name = ContainerName}),
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
-spec stop_container(Pid :: pid(), ContainerName :: binary()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
stop_container(Pid, ContainerName) when is_pid(Pid), is_binary(ContainerName) ->
Request = #jsonrpc_request{method = <<"stop_container">>, params = #{<<"container_name">> => ContainerName}},
EncCallBin = message_codec:encode(?MESSAGE_JSONRPC_REQUEST, Request),
gen_statem:call(Pid, {jsonrpc_call, self(), EncCallBin}).
EncCallBin = message_codec:encode(?MESSAGE_RPC_CONTAINER, #rpc_container{method = <<"stop">>, container_name = ContainerName}),
gen_statem:call(Pid, {rpc_call, self(), EncCallBin}).
%-spec task_log(Pid :: pid(), TaskId :: integer()) -> {ok, Ref :: reference()} | {error, Reason :: any()}.
%task_log(Pid, TaskId) when is_pid(Pid), is_integer(TaskId) ->
% TaskLogBin = message_pb:encode_msg(#fetch_task_log{task_id = TaskId}),
% gen_statem:call(Pid, {jsonrpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
% gen_statem:call(Pid, {rpc_call, self(), ?PUSH_TASK_LOG, TaskLogBin}).
-spec await_reply(Ref :: reference(), Timeout :: integer()) -> {ok, Result :: binary()} | {error, Reason :: binary()}.
await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
receive
{jsonrpc_reply, Ref, #jsonrpc_reply{result = Result, error = undefined}} ->
{rpc_reply, Ref, #rpc_reply{code = 1, payload = Result}} ->
{ok, Result};
{jsonrpc_reply, Ref, #jsonrpc_reply{result = undefined, error = #{<<"message">> := Message}}} ->
{rpc_reply, Ref, #rpc_reply{code = 0, payload = Message}} ->
{error, Message}
after Timeout ->
{error, <<"timeout">>}
@ -234,11 +224,11 @@ handle_event({call, From}, get_status, _, State = #state{channel_pid = ChannelPi
{keep_state, State, [{reply, From, {ok, Reply}}]};
%% channel存在
handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
handle_event({call, From}, {rpc_call, ReceiverPid, RpcCall}, _, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
case HasSession andalso is_pid(ChannelPid) of
true ->
%% websocket发送请求
Ref = tcp_channel:jsonrpc_call(ChannelPid, ReceiverPid, RpcCall),
Ref = tcp_channel:rpc_call(ChannelPid, ReceiverPid, RpcCall),
{keep_state, State, [{reply, From, {ok, Ref}}]};
false ->
lager:debug("[iot_host] uuid: ~p, invalid state: ~p", [UUID, state_map(State)]),

View File

@ -28,15 +28,6 @@ init([]) ->
SupFlags = #{strategy => one_for_one, intensity => 10, period => 36},
Specs = [
#{
id => 'iot_event_stream_observer',
start => {'iot_event_stream_observer', start_link, []},
restart => permanent,
shutdown => 2000,
type => worker,
modules => ['iot_event_stream_observer']
},
#{
id => 'iot_name_server',
start => {'iot_name_server', start_link, []},

View File

@ -28,17 +28,11 @@ encode0(#auth_request{uuid = UUID, username = Username, salt = Salt, token = Tok
marshal(?Bytes, Token),
marshal(?I32, Timestamp)
]);
encode0(#auth_reply{code = Code, payload = Payload}) ->
encode0(#rpc_reply{code = Code, payload = Message}) ->
iolist_to_binary([
marshal(?I32, Code),
marshal(?Bytes, Payload)
marshal(?Bytes, Message)
]);
encode0(#jsonrpc_reply{result = Result, error = undefined}) ->
ResultBin = jiffy:encode(#{<<"result">> => Result}, [force_utf8]),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#jsonrpc_reply{result = undefined, error = Error}) ->
ResultBin = iolist_to_binary(jiffy:encode(#{<<"error">> => Error}, [force_utf8])),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#pub{topic = Topic, content = Content}) ->
iolist_to_binary([
marshal(?Bytes, Topic),
@ -49,10 +43,17 @@ encode0(#command{command_type = CommandType, command = Command}) ->
marshal(?I32, CommandType),
marshal(?Bytes, Command)
]);
encode0(#jsonrpc_request{method = Method, params = Params}) ->
ReqBody = iolist_to_binary(jiffy:encode(#{<<"method">> => Method, <<"params">> => Params}, [force_utf8])),
marshal(?Bytes, ReqBody);
encode0(#rpc_deploy{task_id = TaskId, config = Config}) ->
iolist_to_binary([
marshal(?I32, TaskId),
marshal(?Bytes, Config)
]);
encode0(#rpc_container{method = Method, container_name = ContainerName, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, Method),
marshal(?Bytes, ContainerName),
marshal(?Bytes, Params)
]);
encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
@ -65,12 +66,6 @@ encode0(#event{service_id = ServiceId, event_type = EventType, params = Params})
marshal(?Bytes, ServiceId),
marshal(?I32, EventType),
marshal(?Bytes, Params)
]);
encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) ->
iolist_to_binary([
marshal(?I32, TaskId),
marshal(?Bytes, Type),
marshal(?Bytes, Stream)
]).
-spec decode(Bin :: binary()) -> {ok, Message :: any()} | error.
@ -83,30 +78,20 @@ decode(<<PacketType:8, Packet/binary>>) ->
end.
decode0(?MESSAGE_AUTH_REQUEST, [UUID, Username, Salt, Token, Timestamp]) ->
{ok, #auth_request{uuid = UUID, username = Username, salt = Salt, token = Token, timestamp = Timestamp}};
decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
case jiffy:decode(ReplyBin, [return_maps]) of
#{<<"result">> := Result} ->
{ok, #jsonrpc_reply{result = Result}};
#{<<"error">> := Error} ->
{ok, #jsonrpc_reply{error = Error}};
_ ->
error
end;
decode0(?MESSAGE_RPC_REPLY, [Code, Message]) ->
{ok, #rpc_reply{code = Code, payload = Message}};
decode0(?MESSAGE_PUB, [Topic, Content]) ->
{ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
{ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
{ok, #auth_reply{code = Code, payload = Payload}};
decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) ->
#{<<"method">> := Method, <<"params">> := Params} = jiffy:decode(ReqBody, [return_maps]),
{ok, #jsonrpc_request{method = Method, params = Params}};
decode0(?MESSAGE_RPC_DEPLOY, [TaskId, Config]) ->
{ok, #rpc_deploy{task_id = TaskId, config = Config}};
decode0(?MESSAGE_RPC_CONTAINER, [Method, ContainerName, Params]) ->
{ok, #rpc_container{method = Method, container_name = ContainerName, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) ->
{ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}};
decode0(_, _) ->
error.

View File

@ -12,7 +12,7 @@
-behaviour(ranch_protocol).
%% API
-export([pub/3, jsonrpc_call/3, command/3]).
-export([pub/3, rpc_call/3, command/3]).
-export([start_link/3, stop/2]).
%% gen_server callbacks
@ -43,10 +43,10 @@ command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is
gen_server:cast(Pid, {command, CommandType, Command}).
%%
-spec jsonrpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference().
jsonrpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) ->
-spec rpc_call(Pid :: pid(), ReceiverPid :: pid(), CallBin :: binary()) -> Ref :: reference().
rpc_call(Pid, ReceiverPid, CallBin) when is_pid(Pid), is_pid(ReceiverPid), is_binary(CallBin) ->
Ref = make_ref(),
gen_server:cast(Pid, {jsonrpc_call, ReceiverPid, Ref, CallBin}),
gen_server:cast(Pid, {rpc_call, ReceiverPid, Ref, CallBin}),
Ref.
%%
@ -66,7 +66,7 @@ start_link(Ref, Transport, Opts) ->
init(Ref, Transport, _Opts = []) ->
{ok, Socket} = ranch:handshake(Ref),
lager:debug("[sdlan_channel] get a new connection: ~p", [Socket]),
Transport:setopts(Socket, [binary, {active, true}, {packet, 4}]),
Transport:setopts(Socket, [{active, true}]),
% erlang:start_timer(?PING_TICKER, self(), ping_ticker),
gen_server:enter_loop(?MODULE, [], #state{transport = Transport, socket = Socket}).
@ -86,7 +86,7 @@ handle_cast({command, CommandType, Command}, State = #state{transport = Transpor
{noreply, State};
%%
handle_cast({jsonrpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
handle_cast({rpc_call, ReceiverPid, Ref, CallBin}, State = #state{transport = Transport, socket = Socket, packet_id = PacketId, inflight = Inflight}) ->
Transport:send(Socket, <<?PACKET_REQUEST, PacketId:32, CallBin/binary>>),
{noreply, State#state{packet_id = PacketId + 1, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}.
@ -107,20 +107,20 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, RequestBin/binary>>},
ok ->
%% host的monitor
erlang:monitor(process, HostPid),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 0, payload = <<"ok">>}),
AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 0, payload = <<"ok">>}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{denied, Reason} when is_binary(Reason) ->
erlang:monitor(process, HostPid),
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 1, payload = Reason}),
AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 1, payload = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{error, Reason} when is_binary(Reason) ->
AuthReplyBin = message_codec:encode(?MESSAGE_AUTH_REPLY, #auth_reply{code = 2, payload = Reason}),
AuthReplyBin = message_codec:encode(?MESSAGE_RPC_REPLY, #rpc_reply{code = 2, payload = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
@ -138,12 +138,7 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
#data{} = Data ->
iot_host:handle(HostPid, {data, Data});
#event{} = Event ->
iot_host:handle(HostPid, {event, Event});
#task_event_stream{task_id = TaskId, type = <<"close">>, stream = <<>>} ->
iot_event_stream_observer:stream_close(TaskId);
#task_event_stream{task_id = TaskId, type = Type, stream = Stream} ->
lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream]),
iot_event_stream_observer:stream_data(TaskId, Type, Stream)
iot_host:handle(HostPid, {event, Event})
end,
{noreply, State};
@ -161,7 +156,7 @@ handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true ->
ReceiverPid ! {jsonrpc_reply, Ref, RpcReply};
ReceiverPid ! {rpc_reply, Ref, RpcReply};
false ->
lager:warning("[ws_channel] get async_call_reply message: ~p, packet_id: ~p, but receiver_pid is deaded", [RpcReply, PacketId])
end,
@ -186,7 +181,7 @@ handle_info({'DOWN', _, process, HostPid, Reason}, State = #state{uuid = UUID, h
{stop, State};
handle_info(Info, State) ->
lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed, state: ~p", [Info, State]),
lager:warning("[sdlan_channel] get a unknown message: ~p, channel will closed", [Info]),
{noreply, State}.
terminate(Reason, #state{}) ->