fix codec

This commit is contained in:
anlicheng 2026-01-13 11:56:02 +08:00
parent 25bb8b0514
commit f96dd3fea7
23 changed files with 912 additions and 361 deletions

View File

@ -654,6 +654,208 @@ json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage)
}
```
### 5. 向endpoint发送消息
**URL**`/endpoint/publish_metric`
**Method**`POST`
###### **示例请求**
metric字段支持字符串map或者数组其他为非法数据; 如何要发送数字,可以采用数字的字符串格式, 比如: "5"
```json
{
"route_key": "/dhlr/warning",
"metric": {
"xyz": "test"
}
}
```
#### 示例响应
```json
{
"result": "ok"
}
```
#### 错误响应
```json
{
"error": {
"code": 404,
"message": "restart endpoint error"
}
}
```
### **POST /endpoint/test
用于测试指定协议HTTP / MQTT / Kafka配置是否可用**
#### **请求方式**
```
POST /endpoint/test
Content-Type: application/json
```
---
#### **通用参数**
| 字段 | 类型 | 必填 | 说明 |
| -------- | ------ | -- | ------------------------------- |
| protocol | string | 是 | 选择测试类型,可为:`http``mqtt``kafka` |
| config | object | 是 | 各协议对应的配置对象 |
---
#### **一、测试 HTTP Endpoint**
###### **请求参数**
| 字段 | 类型 | 必填 | 说明 |
| --------- | ------- | -- | ------------- |
| url | string | 是 | 要请求的 HTTP 地址 |
| pool_size | integer | 是 | 连接池大小(必须 > 0 |
###### **示例请求**
```json
{
"protocol": "http",
"config": {
"url": "http://example.com/ping",
"pool_size": 5
}
}
```
###### **返回结果**
| 状态 | 内容 |
| ------- |--------------------------------------|
| success | `{"data":"ok"}` |
| failed | `{"error": -1, "msg": "url failed"}` |
---
##### **二、测试 MQTT Endpoint**
###### **请求参数**
| 字段 | 类型 | 必填 | 说明 |
| --------- | ------ | -- | --------------- |
| host | string | 是 | MQTT 服务器地址 |
| port | int | 是 | MQTT 端口(>0 |
| client_id | string | 是 | 客户端 ID仅用于配置校验 |
| username | string | 是 | 用户名 |
| password | string | 是 | 密码 |
| topic | string | 是 | topic 名称 |
| qos | int | 是 | 0/1/2 |
###### **参数限制**
* host ≠ 空
* username/password/topic ≠ 空
* qos ∈ {0,1,2}
###### **示例请求**
```json
{
"protocol": "mqtt",
"config": {
"host": "test.mqttserver.com",
"port": 1883,
"client_id": "test-client-01",
"username": "user1",
"password": "pass123",
"topic": "device/event",
"qos": 1
}
}
```
###### **返回结果**
| 状态 | 内容 |
| ------- |------------------------------------------------------|
| success | `{"data": "ok"}` |
| failed | `{"error": -1, "msg": "connect mqtt server failed"}` |
---
##### **三、测试 Kafka Endpoint**
###### **请求参数(无认证)**
| 字段 | 类型 | 必填 | 说明 |
| ----------------- | ------------ | -- | ---------------------- |
| bootstrap_servers | list(string) | 是 | 服务器列表,格式 `"host:port"` |
| topic | string | 是 | Topic 名称 |
###### **请求参数SASL 认证)**
当需要 SASL 认证时,增加:
| 字段 | 类型 | 必填 | 说明 |
| --------------------- | ------ | -- | --------------------------------- |
| sasl_config.username | string | 是 | 用户名 |
| sasl_config.password | string | 是 | 密码 |
| sasl_config.mechanism | string | 是 | `"plain"` `"sha_256"` `"sha_512"` |
###### **合法 mechanism 值**
* `plain` → SASL/PLAIN
* `sha_256` → SCRAM-SHA-256
* `sha_512` → SCRAM-SHA-512
###### **bootstrap_servers 校验规则**
* 必须是 list(binary())
* 每项格式必须为 `"host:port"`
* port 必须为整数并 > 0
---
###### **示例 1无认证 Kafka**
```json
{
"protocol": "kafka",
"config": {
"bootstrap_servers": ["broker1:9092", "broker2:9092"],
"topic": "device-log"
}
}
```
###### **示例 2带 SASL 认证 Kafka**
```json
{
"protocol": "kafka",
"config": {
"sasl_config": {
"username": "kafka_user",
"password": "kafka_pass",
"mechanism": "sha_256"
},
"bootstrap_servers": ["kafka.example.com:9094"],
"topic": "iot-event"
}
}
```
---
## **返回结果**
| 状态 | 内容 |
| ------- | ---------------------------------------------------- |
| success | `{"data": "ok"}` |
| failed | `{"error": -1, "msg": "config kafka server failed"}` |
---
### 5⃣ 未知路径处理
@ -903,14 +1105,22 @@ json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage)
#### 请求参数
| 参数名 | 类型 | 必填 | 说明 |
|--------|------|------|------|
|---------|-----------------|------|------------------|
| uuid | binary (string) | ✅ | 主机唯一标识符 |
| topic | binary (string) | ✅ | 事件主题 |
| qos | integer | ✅ | 消息的qosqos = 0,1 |
| content | binary (string) | ✅ | 发布内容 |
### 请求示例
```json
{"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", "topic": "/device/1234/all", "content": "this is a topic payload", "timeout": 10}
{
"uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm",
"topic": "/device/1234/all",
"qos": 1,
"content": "this is a topic payload",
"timeout": 10
}
```
#### 响应参数

View File

@ -10,6 +10,7 @@
-record(http_endpoint, {
url = <<>> :: binary(),
token = <<>> :: binary(),
pool_size = 10 :: integer()
}).

View File

@ -55,6 +55,7 @@
-record(pub, {
topic :: binary(),
qos = 0 :: integer(),
content :: binary()
}).
@ -74,18 +75,10 @@
}).
-record(data, {
service_id :: binary(),
device_uuid :: binary(),
route_key :: binary(),
metric :: binary()
}).
-record(event, {
service_id :: binary(),
event_type :: integer(),
params :: binary()
}).
-record(task_event_stream, {
task_id :: integer(),
type :: binary(),

View File

@ -12,9 +12,9 @@
%% API
-export([start_link/1]).
-export([get_name/1, get_pid/1, forward/3, reload/2, clean_up/1]).
-export([get_name/1, get_pid/1, forward/2, reload/2, clean_up/1]).
-export([get_alias_pid/1, is_support/1, get_protocol/1]).
-export([endpoint_record/1]).
-export([endpoint_record/1, parse_config/2]).
%%%===================================================================
%%% API
@ -47,9 +47,9 @@ get_alias_name(Name) when is_binary(Name) ->
get_alias_pid(Name) when is_binary(Name) ->
gproc:whereis_name({n, l, get_alias_name(Name)}).
-spec forward(Pid :: pid(), ServiceId :: binary(), Metric :: binary()) -> no_return().
forward(Pid, ServiceId, Metric) when is_pid(Pid), is_binary(ServiceId), is_binary(Metric) ->
gen_server:cast(Pid, {forward, ServiceId, Metric}).
-spec forward(Pid :: pid(), Metric :: binary()) -> no_return().
forward(Pid, Metric) when is_pid(Pid), is_binary(Metric) ->
gen_server:cast(Pid, {forward, Metric}).
reload(Pid, NEndpoint = #endpoint{}) when is_pid(Pid) ->
gen_statem:cast(Pid, {reload, NEndpoint}).
@ -75,8 +75,8 @@ is_support(Protocol) when is_atom(Protocol) ->
-spec endpoint_record(EndpointInfo :: #{}) -> error | {ok, Endpoint :: #endpoint{}}.
endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title, <<"type">> := Type, <<"config">> := ConfigJson,
<<"status">> := Status, <<"updated_at">> := UpdatedAt, <<"created_at">> := CreatedAt}) ->
try
Config = parse_config(Type, ConfigJson),
case parse_config(Type, ConfigJson) of
{ok, Config} ->
{ok, #endpoint {
id = Id,
matcher = Matcher,
@ -85,14 +85,26 @@ endpoint_record(#{<<"id">> := Id, <<"matcher">> := Matcher, <<"title">> := Title
status = Status,
updated_at = UpdatedAt,
created_at = CreatedAt
}}
catch throw:_ ->
}};
{error, _Reason} ->
error
end.
-spec parse_config(Protocol :: binary(), Config :: map()) -> {ok, #mqtt_endpoint{} | #kafka_endpoint{} | #http_endpoint{}} | {error, Errors :: [Error :: binary()]}.
parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port0, <<"client_id">> := ClientId, <<"username">> := Username, <<"password">> := Password, <<"topic">> := Topic, <<"qos">> := Qos}) ->
Port = if is_binary(Port0) -> binary_to_integer(Port0); is_integer(Port0) -> Port0 end,
#mqtt_endpoint{
Errors = lists:filtermap(fun(Term) ->
case check_mqtt_argument(Term) of
ok ->
false;
{error, Error} ->
{true, Error}
end
end, [{host, Host}, {port, Port}, {username, Username}, {password, Password}, {topic, Topic}, {qos, Qos}]),
case Errors =:= [] of
true ->
{ok, #mqtt_endpoint{
host = Host,
port = Port,
client_id = ClientId,
@ -100,14 +112,69 @@ parse_config(<<"mqtt">>, #{<<"host">> := Host, <<"port">> := Port0, <<"client_id
password = Password,
topic = Topic,
qos = Qos
};
parse_config(<<"http">>, #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
#http_endpoint{
}};
false ->
{error, Errors}
end;
parse_config(<<"http">>, C = #{<<"url">> := Url, <<"pool_size">> := PoolSize}) ->
Token = maps:get(<<"token">>, C, <<>>),
{ok, #http_endpoint{
url = Url,
token = Token,
pool_size = PoolSize
};
}};
parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <<"password">> := Password, <<"mechanism">> := Mechanism0}, <<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
Mechanism = case Mechanism0 of
Errors = lists:filtermap(fun(Term) ->
case check_kafka_argument(Term) of
ok ->
false;
{error, Error} ->
{true, Error}
end
end, [{username, Username}, {password, Password}, {topic, Topic}, {mechanism, Mechanism0}, {bootstrap_servers, BootstrapServers}]),
case Errors =:= [] of
true ->
Mechanism = parse_kafka_mechanism(Mechanism0),
{ok, #kafka_endpoint{
sasl_config = {Mechanism, Username, Password},
bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers),
topic = Topic
}};
false ->
{error, Errors}
end;
parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
Errors = lists:filtermap(fun(Term) ->
case check_kafka_argument(Term) of
ok ->
false;
{error, Error} ->
{true, Error}
end
end, [{topic, Topic}, {bootstrap_servers, BootstrapServers}]),
case Errors =:= [] of
true ->
{ok, #kafka_endpoint{
sasl_config = undefined,
bootstrap_servers = parse_kafka_bootstrap_servers(BootstrapServers),
topic = Topic
}};
false ->
{error, Errors}
end;
parse_config(_, _) ->
{error, invalid_config}.
-spec parse_kafka_bootstrap_servers(BootstrapServers :: [binary()]) -> Servers :: [{Host :: string(), Port :: integer()}].
parse_kafka_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) ->
lists:map(fun(S) ->
[Host0, Port0] = binary:split(S, <<":">>),
{binary_to_list(Host0), binary_to_integer(Port0)}
end, BootstrapServers).
-spec parse_kafka_mechanism(Mechanism0 :: binary()) -> atom().
parse_kafka_mechanism(Mechanism0) when is_binary(Mechanism0) ->
case Mechanism0 of
<<"sha_256">> ->
scram_sha_256;
<<"sha_512">> ->
@ -116,28 +183,106 @@ parse_config(<<"kafka">>, #{<<"sasl_config">> := #{<<"username">> := Username, <
plain;
_ ->
plain
end,
end.
#kafka_endpoint{
sasl_config = {Mechanism, Username, Password},
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
topic = Topic
};
parse_config(<<"kafka">>, #{<<"bootstrap_servers">> := BootstrapServers, <<"topic">> := Topic}) ->
#kafka_endpoint{
sasl_config = undefined,
bootstrap_servers = parse_bootstrap_servers(BootstrapServers),
topic = Topic
};
parse_config(_, _) ->
throw(invalid_config).
parse_bootstrap_servers(BootstrapServers) when is_list(BootstrapServers) ->
lists:filtermap(fun(S) ->
-spec check_kafka_argument(tuple()) -> ok | {error, Reason :: binary()}.
check_kafka_argument({mechanism, Mechanism}) ->
case lists:member(Mechanism, [<<"sha_256">>, <<"sha_512">>, <<"plain">>]) of
true ->
ok;
false ->
{error, <<"mechanism invalid">>}
end;
check_kafka_argument({username, Username}) ->
case Username /= <<>> of
true ->
ok;
false ->
{error, <<"username is empty">>}
end;
check_kafka_argument({password, Password}) ->
case Password /= <<>> of
true ->
ok;
false ->
{error, <<"password is empty">>}
end;
check_kafka_argument({topic, Topic}) ->
case Topic /= <<>> of
true ->
ok;
false ->
{error, <<"topic is empty">>}
end;
check_kafka_argument({bootstrap_servers, BootstrapServers}) ->
case is_list(BootstrapServers) andalso length(BootstrapServers) > 0 of
true ->
InvalidServers = lists:filtermap(fun(S) ->
case binary:split(S, <<":">>) of
[Host0, Port0] ->
{true, {binary_to_list(Host0), binary_to_integer(Port0)}};
Host = binary_to_list(Host0),
Port = binary_to_integer(Port0),
case not string:is_empty(Host) andalso (is_integer(Port) andalso Port > 0) of
true ->
false;
false ->
{true, S}
end;
_ ->
false
{true, S}
end
end, BootstrapServers).
end, BootstrapServers),
case InvalidServers =:= [] of
true ->
ok;
false ->
{error, iolist_to_binary([<<"bootstrap_servers: ">>, lists:join(<<",">>, InvalidServers), <<" format is error">>])}
end;
false ->
{error, <<"bootstrap_servers is empty">>}
end.
-spec check_mqtt_argument(tuple()) -> ok | {error, Reason :: binary()}.
check_mqtt_argument({host, Host}) ->
case Host /= <<>> of
true ->
ok;
false ->
{error, <<"host is empty">>}
end;
check_mqtt_argument({port, Port}) ->
case is_integer(Port) andalso Port > 0 of
true ->
ok;
false ->
{error, <<"port invalid">>}
end;
check_mqtt_argument({username, Username}) ->
case Username /= <<>> of
true ->
ok;
false ->
{error, <<"username is empty">>}
end;
check_mqtt_argument({password, Password}) ->
case Password /= <<>> of
true ->
ok;
false ->
{error, <<"password is empty">>}
end;
check_mqtt_argument({topic, Topic}) ->
case Topic /= <<>> of
true ->
ok;
false ->
{error, <<"topic is empty">>}
end;
check_mqtt_argument({qos, Qos}) ->
case is_integer(Qos) andalso lists:member(Qos, [0, 1, 2]) of
true ->
ok;
false ->
{error, <<"qos invalid">>}
end.

View File

@ -69,8 +69,15 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer),
handle_cast({forward, Metric}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{token = Token}}}) ->
Tuple = case is_binary(Token) andalso Token /= <<>> of
true ->
Sign = iot_util:sha256(erlang:iolist_to_binary([Token, Metric, Token])),
{Metric, Sign};
false ->
{Metric, <<>>}
end,
NBuffer = endpoint_buffer:append(Tuple, Buffer),
{noreply, State#state{buffer = NBuffer}};
handle_cast(cleanup, State = #state{buffer = Buffer}) ->
@ -83,11 +90,14 @@ handle_cast(cleanup, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
Headers = [
{<<"Content-Type">>, <<"application/octet-stream">>},
{<<"Service-Id">>, ServiceId}
],
handle_info({next_data, Id, {Metric, Sign}}, State = #state{buffer = Buffer, endpoint = #endpoint{config = #http_endpoint{url = Url}}}) ->
BaseHeaders = [{<<"Content-Type">>, <<"application/json">>}],
ExtraHeaders = if
Sign =:= <<>> -> [];
true -> [{<<"X-Signature">>, Sign}]
end,
Headers = BaseHeaders ++ ExtraHeaders,
case hackney:request(post, Url, Headers, Metric) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
@ -100,7 +110,8 @@ handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{buffer = Buffer
{ok, RespBody} = hackney:body(ClientRef),
hackney:close(ClientRef),
lager:debug("[endpoint_http] url: ~p, http_code: ~p, response is: ~p", [Url, HttpCode, RespBody]),
{noreply, State};
NBuffer = endpoint_buffer:ack(Id, Buffer),
{noreply, State#state{buffer = NBuffer}};
{error, Reason} ->
lager:warning("[endpoint_http] url: ~p, get error: ~p", [Url, Reason]),
{noreply, State}

View File

@ -82,8 +82,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer),
handle_cast({forward, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append(Metric, Buffer),
{noreply, State#state{buffer = NBuffer}}.
%% @private
@ -116,6 +116,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
{noreply, State#state{buffer = NBuffer, client_pid = ClientPid, status = ?CONNECTED}};
{error, Reason} ->
lager:debug("[endpoint_kafka] start_producer: ~p, get error: ~p", [ClientId, Reason]),
brod:stop_client(ClientId),
retry_connect(),
{noreply, State#state{status = ?DISCONNECTED, client_pid = undefined}}
end;
@ -129,7 +130,7 @@ handle_info({timeout, _, connect}, State = #state{buffer = Buffer, status = ?DIS
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
{noreply, State};
%% mqtt服务器
handle_info({next_data, Id, {_ServiceId, Metric}}, State = #state{status = ?CONNECTED, client_pid = ClientPid,
handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, client_pid = ClientPid,
endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) ->
ReceiverPid = self(),

View File

@ -19,7 +19,7 @@
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
%%
-define(RETRY_INTERVAL, 5000).
-define(RETRY_INTERVAL, 15000).
-define(DISCONNECTED, disconnected).
-define(CONNECTED, connected).
@ -83,8 +83,8 @@ handle_call(get_stat, _From, State = #state{buffer = Buffer}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({forward, ServiceId, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append({ServiceId, Metric}, Buffer),
handle_cast({forward, Metric}, State = #state{buffer = Buffer}) ->
NBuffer = endpoint_buffer:append(Metric, Buffer),
{noreply, State#state{buffer = NBuffer}}.
%% @private
@ -111,20 +111,22 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
{retry_interval, 5000}
],
try
{ok, ConnPid} = emqtt:start_link(Opts),
lager:debug("[endpoint_mqtt] start connect, options: ~p", [Opts]),
case catch emqtt:connect(ConnPid, 5000) of
case emqtt:connect(ConnPid, 5000) of
{ok, _} ->
lager:debug("[endpoint_mqtt] connect success, pid: ~p", [ConnPid]),
NBuffer = endpoint_buffer:trigger_n(Buffer),
{noreply, State#state{conn_pid = ConnPid, buffer = NBuffer, status = ?CONNECTED}};
{error, Reason} ->
lager:warning("[endpoint_mqtt] connect get error: ~p", [Reason]),
erlang:start_timer(5000, self(), create_postman),
{noreply, State};
Error ->
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{noreply, State}
end
catch _:Error->
lager:warning("[endpoint_mqtt] connect get error: ~p", [Error]),
erlang:start_timer(5000, self(), create_postman),
erlang:start_timer(?RETRY_INTERVAL, self(), create_postman),
{noreply, State}
end;
@ -132,10 +134,9 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status
handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) ->
{keep_state, State};
%% mqtt服务器
handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic0, qos = Qos}}}) ->
handle_info({next_data, Id, Metric}, State = #state{status = ?CONNECTED, conn_pid = ConnPid, buffer = Buffer, inflight = InFlight,
endpoint = #endpoint{config = #mqtt_endpoint{topic = Topic, qos = Qos}}}) ->
Topic = re:replace(Topic0, <<"\\${service_id}">>, ServiceId, [global, {return, binary}]),
lager:debug("[endpoint_mqtt] will publish topic: ~p, metric: ~p, qos: ~p", [Topic, Metric, Qos]),
case emqtt:publish(ConnPid, Topic, #{}, Metric, [{qos, Qos}, {retain, true}]) of
ok ->

View File

@ -13,7 +13,7 @@
%% API
-export([start_link/0]).
-export([subscribe/2, publish/3]).
-export([subscribe/2, publish/2, get_subscribers/0]).
-export([match_components/2, is_valid_components/1, of_components/1]).
%% gen_server callbacks
@ -45,9 +45,13 @@
subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) ->
gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}).
-spec publish(RouteKey :: binary(), ServiceId :: binary(), Content :: binary()) -> no_return().
publish(RouteKey, ServiceId, Content) when is_binary(RouteKey), is_binary(Content) ->
gen_server:cast(?SERVER, {publish, RouteKey, ServiceId, Content}).
-spec get_subscribers() -> {ok, Subscribers :: map()}.
get_subscribers() ->
gen_server:call(?SERVER, get_subscribers).
-spec publish(RouteKey :: binary(), Content :: binary()) -> no_return().
publish(RouteKey, Content) when is_binary(RouteKey), is_binary(Content) ->
gen_server:cast(?SERVER, {publish, RouteKey, Content}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
@ -78,6 +82,8 @@ init([]) ->
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
%% SubscriberPid只能订阅同一个topic一次
handle_call(get_subscribers, _From, State = #state{subscribers = Subscribers}) ->
{reply, {ok, Subscribers}, State};
handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers = Subscribers}) ->
Components = of_components(Topic),
case is_valid_components(Components) of
@ -98,10 +104,10 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
%%
handle_cast({publish, RouteKey, ServiceId, Metric}, State = #state{subscribers = Subscribers}) ->
handle_cast({publish, RouteKey, Metric}, State = #state{subscribers = Subscribers}) ->
MatchedSubscribers = match_subscribers(Subscribers, RouteKey),
lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) ->
endpoint:forward(SubscriberPid, ServiceId, Metric)
endpoint:forward(SubscriberPid, Metric)
end, MatchedSubscribers),
lager:debug("[efka_subscription] route_key: ~p, metric: ~p, match subscribers: ~p", [RouteKey, Metric, MatchedSubscribers]),
{noreply, State}.

View File

@ -1,50 +0,0 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2020, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 26. 4 2020 3:36
%%%-------------------------------------------------------------------
-module(device_handler).
-author("licheng5").
-include("iot.hrl").
%% API
-export([handle_request/4]).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%
handle_request("POST", "/device/reload", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) ->
lager:debug("[device_handler] host_id: ~p, will reload device uuid: ~p", [HostId, DeviceUUID]),
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"reload device failed">>)};
HostPid when is_pid(HostPid) ->
case iot_host:reload_device(HostPid, DeviceUUID) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[device_handler] reload device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"reload device failed">>)}
end
end;
%%
handle_request("POST", "/device/delete", _, #{<<"host_id">> := HostId, <<"device_uuid">> := DeviceUUID}) when is_integer(HostId), is_binary(DeviceUUID) ->
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"delete device failed">>)};
HostPid when is_pid(HostPid) ->
ok = iot_host:delete_device(HostPid, DeviceUUID),
{ok, 200, iot_util:json_data(<<"success">>)}
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.

View File

@ -78,11 +78,11 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]),
{ok, 200, iot_util:json_error(404, <<"restart endpoint error">>)}
end;
Pid ->
Pid when is_pid(Pid) ->
case endpoint_sup:delete_endpoint(Id) of
ok ->
case endpoint_sup:ensured_endpoint_started(Endpoint) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid0} when is_pid(Pid0) ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:warning("[endpoint_handler] start endpoint: ~p, get error: ~p", [Title, Reason]),
@ -98,6 +98,113 @@ handle_request("POST", "/endpoint/restart", _, #{<<"id">> := Id}) when is_intege
end
end;
%% http接口的测试
handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"http">>, <<"config">> := #{<<"url">> := Url, <<"pool_size">> := PoolSize}}) when is_integer(PoolSize), PoolSize > 0 ->
Body = <<"">>,
ContentType = "application/json",
case httpc:request(post, {Url, [], ContentType, Body}, [], []) of
{ok, _} ->
{ok, 200, iot_util:json_data(<<"ok">>)};
{error, Reason} ->
lager:debug("[endpint_handler] test http: ~p, error: ~p", [Url, Reason]),
{ok, 200, iot_util:json_error(-1, <<"url failed">>)}
end;
%% mqtt
handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"mqtt">>, <<"config">> := Config}) ->
case endpoint:parse_config(<<"mqtt">>, Config) of
{ok, #mqtt_endpoint{host = Host, port = Port, username = Username, password = Password}} ->
%% client
ClientId = "mqtt_client_test:" ++ iot_util:rand_bytes(16),
Opts = [
{owner, self()},
{clientid, ClientId},
{host, binary_to_list(Host)},
{port, Port},
{tcp_opts, []},
{username, binary_to_list(Username)},
{password, binary_to_list(Password)},
{keepalive, 86400},
{auto_ack, true},
{connect_timeout, 5000},
{proto_ver, v5},
{retry_interval, 5000}
],
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
lager:debug("[endpint_handler] start connect, options: ~p", [Opts]),
case catch emqtt:connect(ConnPid, 5000) of
{ok, _} ->
lager:debug("[endpint_handler] connect success, pid: ~p", [ConnPid]),
emqtt:stop(ConnPid),
{ok, 200, iot_util:json_data(<<"ok">>)};
{error, Reason} ->
lager:warning("[endpint_handler] connect get error: ~p", [Reason]),
emqtt:stop(ConnPid),
{ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)};
Error ->
lager:warning("[endpint_handler] connect get error: ~p", [Error]),
emqtt:stop(ConnPid),
{ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)}
end;
Other ->
lager:warning("[endpint_handler] test connect mqtt with options: ~p, get error: ~p", [Opts, Other]),
{ok, 200, iot_util:json_error(-1, <<"connect mqtt server failed">>)}
end;
{error, Errors} ->
{ok, 200, iot_util:json_error(-1, Errors)}
end;
%% kafka
handle_request("POST", "/endpoint/test", _, #{<<"protocol">> := <<"kafka">>, <<"config">> := Config}) ->
case endpoint:parse_config(<<"kafka">>, Config) of
{ok, #kafka_endpoint{sasl_config = SaslConfig, bootstrap_servers = BootstrapServers, topic = Topic}} ->
BaseConfig = [
{reconnect_cool_down_seconds, 5},
{socket_options, [{keepalive, true}]}
],
ClientConfig = case SaslConfig of
{Mechanism, Username, Password} ->
[{sasl, {Mechanism, Username, Password}}|BaseConfig];
undefined ->
BaseConfig
end,
ClientId = list_to_atom("brod_client_test:" ++ iot_util:rand_bytes(16)),
case catch brod:start_link_client(BootstrapServers, ClientId, ClientConfig) of
{ok, _ClientPid} ->
case brod:start_producer(ClientId, Topic, _ProducerConfig = []) of
ok ->
ok = brod:stop_client(ClientId),
{ok, 200, iot_util:json_data(<<"ok">>)};
{error, Reason} ->
lager:debug("[endpint_handler] start_producer: ~p, get error: ~p", [ClientId, Reason]),
{ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)}
end;
Error ->
lager:debug("[endpint_handler] start_client: ~p, get error: ~p", [ClientId, Error]),
{ok, 200, iot_util:json_error(-1, <<"config kafka server failed">>)}
end;
{error, Errors} ->
{ok, 200, iot_util:json_error(-1, Errors)}
end;
%%
handle_request("POST", "/endpoint/publish_metric", _, #{<<"route_key">> := RouteKey, <<"metric">> := Metric0}) when is_binary(RouteKey) ->
if
is_map(Metric0) orelse is_list(Metric0) ->
Metric = jiffy:encode(Metric0, [force_utf8]),
endpoint_subscription:publish(RouteKey, Metric),
{ok, 200, iot_util:json_data(<<"ok">>)};
is_binary(Metric0) ->
endpoint_subscription:publish(RouteKey, Metric0),
{ok, 200, iot_util:json_data(<<"ok">>)};
true ->
{ok, 200, iot_util:json_error(-1, <<"invalid metric">>)}
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.

View File

@ -44,12 +44,11 @@ handle_request("GET", "/host/status", #{<<"uuid">> := UUID}, _) when is_binary(U
%%
handle_request("POST", "/host/delete", _, #{<<"uuid">> := UUID}) when is_binary(UUID) ->
case iot_host_sup:delete_host(UUID) of
ok ->
lager:debug("[host_handler] will delete host uuid: ~p", [UUID]),
case iot_host:get_pid(UUID) of
Pid when is_pid(Pid) ->
ok = iot_host_sup:delete_host(UUID),
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[host_handler] delete host uuid: ~p, get error is: ~p", [UUID, Reason]),
undefined ->
{ok, 200, iot_util:json_error(404, <<"error">>)}
end;
@ -80,15 +79,16 @@ handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> :=
end;
%%
handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"content">> := Content})
when is_binary(UUID), is_binary(Topic), is_binary(Content) ->
handle_request("POST", "/host/pub", _, #{<<"uuid">> := UUID, <<"topic">> := Topic, <<"qos">> := Qos0, <<"content">> := Content})
when is_binary(UUID), is_binary(Topic), is_binary(Content), is_integer(Qos0) ->
Qos = case Qos0 > 0 of true -> 1; false -> 0 end,
case iot_host_sup:ensured_host_started(UUID) of
{error, Reason} ->
lager:debug("[host_handler] pub host_id: ~p, topic: ~p, failed with reason: ~p", [UUID, Topic, Reason]),
{ok, 200, iot_util:json_error(400, <<"host not found">>)};
{ok, Pid} when is_pid(Pid) ->
ok = iot_host:pub(Pid, Topic, Content),
ok = iot_host:pub(Pid, Topic, Qos, Content),
{ok, 200, iot_util:json_data(<<"success">>)}
end;

View File

@ -26,6 +26,7 @@
erts,
runtime_tools,
observer,
inets,
kernel,
stdlib
]},

View File

@ -136,7 +136,7 @@ do_post(Path, Params) when is_list(Path), is_map(Params) ->
case hackney:request(post, Url, Headers, Body, [{pool, false}]) of
{ok, 200, _, ClientRef} ->
{ok, RespBody} = hackney:body(ClientRef),
lager:debug("[iot_api] request url: ~p, send body: ~p, get error is: ~p", [Url, Body, RespBody]),
lager:debug("[iot_api] request url: ~p, send body: ~p, get response is: ~p", [Url, Body, RespBody]),
hackney:close(ClientRef),
case catch jiffy:decode(RespBody, [return_maps]) of
#{<<"result">> := Result} ->

View File

@ -49,7 +49,7 @@ start_http_server() ->
{'_', [
{"/host/[...]", http_protocol, [host_handler]},
{"/container/[...]", http_protocol, [container_handler]},
{"/device/[...]", http_protocol, [device_handler]},
{"/endpoint/[...]", http_protocol, [endpoint_handler]},
{"/event_stream", event_stream_handler, []}
]}
]),

View File

@ -1,73 +0,0 @@
%%%-------------------------------------------------------------------
%%% @copyright (C) 2023, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 14. 8 2023 11:40
%%%-------------------------------------------------------------------
-module(iot_device).
-author("aresei").
-include("iot.hrl").
%% API
-export([new/1, change_status/2, reload/1]).
%%
-define(DEVICE_AUTH_DENIED, 0).
-define(DEVICE_AUTH_AUTHED, 1).
%%
-define(STATE_DENIED, denied).
-define(STATE_ACTIVATED, activated).
-record(device, {
device_uuid :: binary(),
status = ?DEVICE_OFFLINE
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec new(DeviceInfo :: binary() | map()) -> error | {ok, Device :: #device{}}.
new(DeviceUUID) when is_binary(DeviceUUID) ->
case iot_api:get_device_by_uuid(DeviceUUID) of
{ok, #{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}} ->
{ok, #device{device_uuid = DeviceUUID, status = Status}};
undefined ->
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
error
end;
new(#{<<"device_uuid">> := DeviceUUID, <<"status">> := Status}) ->
{ok, #device{device_uuid = DeviceUUID, status = Status}}.
-spec change_status(Device :: #device{}, NewStatus :: integer()) -> NDevice :: #device{}.
change_status(Device = #device{status = Status}, NewStatus) when is_integer(NewStatus), Status =:= NewStatus ->
Device;
change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_ONLINE) ->
iot_api:change_device_status(DeviceUUID, ?DEVICE_ONLINE),
Device#device{status = ?DEVICE_ONLINE};
change_status(Device = #device{device_uuid = DeviceUUID}, ?DEVICE_OFFLINE) ->
{ok, #{<<"status">> := Status}} = iot_api:get_device_by_uuid(DeviceUUID),
case Status of
?DEVICE_NOT_JOINED ->
lager:debug("[iot_device] device: ~p, device_maybe_offline, not joined, can not change to offline", [DeviceUUID]),
Device#device{status = ?DEVICE_NOT_JOINED};
?DEVICE_OFFLINE ->
lager:debug("[iot_device] device: ~p, device_maybe_offline, is offline, do nothing", [DeviceUUID]),
Device#device{status = ?DEVICE_OFFLINE};
?DEVICE_ONLINE ->
iot_api:change_device_status(DeviceUUID, ?DEVICE_OFFLINE),
Device#device{status = ?DEVICE_OFFLINE}
end.
-spec reload(Device :: #device{}) -> error | {ok, NDevice :: #device{}}.
reload(Device = #device{device_uuid = DeviceUUID}) ->
lager:debug("[iot_device] will reload: ~p", [DeviceUUID]),
case iot_api:get_device_by_uuid(DeviceUUID) of
{ok, #{<<"status">> := Status}} ->
{ok, Device#device{device_uuid = DeviceUUID, status = Status}};
undefined ->
lager:warning("[iot_device] device uuid: ~p, loaded from mysql failed", [DeviceUUID]),
error
end.

View File

@ -22,12 +22,10 @@
%% API
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
-export([get_metric/1, get_status/1]).
-export([get_metric/1, get_status/1, kill/1]).
%%
-export([pub/3, attach_channel/2, command/3]).
-export([pub/4, attach_channel/2, command/3]).
-export([deploy_container/3, start_container/2, stop_container/2, remove_container/2, kill_container/2, config_container/3, get_containers/1, await_reply/2]).
%%
-export([reload_device/2, delete_device/2]).
-export([heartbeat/1]).
%% gen_statem callbacks
@ -42,10 +40,6 @@
heartbeat_counter = 0 :: integer(),
%% websocket相关
channel_pid :: undefined | pid(),
%% , #{device_uuid => Device}
device_map = #{},
%%
metrics = #{} :: map()
}).
@ -67,6 +61,15 @@ get_alias_name(HostId0) when is_integer(HostId0) ->
HostId = integer_to_binary(HostId0),
binary_to_atom(<<"iot_host_id:", HostId/binary>>).
-spec kill(UUID :: binary()) -> no_return().
kill(UUID) when is_binary(UUID) ->
case whereis(get_name(UUID)) of
undefined ->
ok;
Pid ->
exit(Pid, kill)
end.
%%
-spec handle(Pid :: pid(), Packet :: {atom(), any()}) -> no_return().
handle(Pid, Packet) when is_pid(Pid) ->
@ -142,23 +145,14 @@ await_reply(Ref, Timeout) when is_reference(Ref), is_integer(Timeout) ->
{error, <<"timeout">>}
end.
-spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> ok | {error, Reason :: any()}.
pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) ->
gen_statem:call(Pid, {pub, Topic, Content}).
-spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> ok | {error, Reason :: any()}.
pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) ->
gen_statem:call(Pid, {pub, Topic, Qos, Content}).
-spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> ok | {error, Reason :: any()}.
command(Pid, CommandType, Command) when is_pid(Pid), is_integer(CommandType), is_binary(Command) ->
gen_statem:call(Pid, {command, CommandType, Command}).
%%
-spec reload_device(Pid :: pid(), DeviceUUID :: binary()) -> ok | {error, Reason :: any()}.
reload_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
gen_statem:call(Pid, {reload_device, DeviceUUID}).
-spec delete_device(Pid :: pid(), DeviceUUID :: binary()) -> ok.
delete_device(Pid, DeviceUUID) when is_pid(Pid), is_binary(DeviceUUID) ->
gen_statem:call(Pid, {delete_device, DeviceUUID}).
-spec heartbeat(Pid :: pid()) -> no_return().
heartbeat(undefined) ->
ok;
@ -193,19 +187,7 @@ init([UUID]) ->
true -> ?STATE_ACTIVATED;
false -> ?STATE_DENIED
end,
%%
{ok, DeviceInfos} = iot_api:get_host_devices(HostId),
Devices = lists:filtermap(fun(DeviceInfo = #{<<"device_uuid">> := DeviceUUID}) ->
case iot_device:new(DeviceInfo) of
error ->
false;
{ok, Device} ->
{true, {DeviceUUID, Device}}
end
end, DeviceInfos),
{ok, StateName, #state{host_id = HostId, uuid = UUID, device_map = maps:from_list(Devices), has_session = false}};
{ok, StateName, #state{host_id = HostId, uuid = UUID, has_session = false}};
undefined ->
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
ignore
@ -249,12 +231,12 @@ handle_event({call, From}, {jsonrpc_call, ReceiverPid, RpcCall}, _, State = #sta
end;
%% , pub/sub
handle_event({call, From}, {pub, Topic, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
handle_event({call, From}, {pub, Topic, Qos, Content}, ?STATE_ACTIVATED, State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = HasSession}) ->
case HasSession andalso is_pid(ChannelPid) of
true ->
lager:debug("[iot_host] host: ~p, publish to topic: ~p, content: ~p", [UUID, Topic, Content]),
%% websocket发送请求
tcp_channel:pub(ChannelPid, Topic, Content),
tcp_channel:pub(ChannelPid, Topic, Qos, Content),
{keep_state, State, [{reply, From, ok}]};
false ->
@ -318,52 +300,19 @@ handle_event({call, From}, {attach_channel, _}, _, State = #state{uuid = UUID, c
lager:notice("[iot_host] attach_channel host_id uuid: ~p, old channel exists: ~p", [UUID, OldChannelPid]),
{keep_state, State, [{reply, From, {error, <<"channel existed">>}}]};
%%
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
case maps:find(DeviceUUID, DeviceMap) of
error ->
{keep_state, State, [{reply, From, {error, <<"device not found">>}}]};
{ok, Device} ->
case iot_device:reload(Device) of
error ->
{keep_state, State#state{device_map = maps:remove(Device, DeviceMap)}, [{reply, From, {error, <<"reload device error">>}}]};
{ok, NDevice} ->
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}, [{reply, From, ok}]}
end
end;
%%
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{device_map = DeviceMap}) ->
{keep_state, State#state{device_map = maps:remove(DeviceUUID, DeviceMap)}, [{reply, From, ok}]};
%% todo
handle_event(cast, {handle, {data, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) ->
lager:debug("[iot_host] metric_data host: ~p, service_id: ~p, device_uuid: ~p, route_key: ~p, metric: ~p", [UUID, ServiceId, DeviceUUID, RouteKey0, Metric]),
case maps:find(DeviceUUID, DeviceMap) of
error ->
lager:warning("[iot_host] host uuid: ~p, device uuid: ~p not found, metric: ~p", [UUID, DeviceUUID, Metric]),
{keep_state, State};
{ok, Device} ->
%%
handle_event(cast, {handle, {data, #data{route_key = RouteKey0, metric = Metric}}}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] metric_data host: ~p, route_key: ~p, metric: ~p", [UUID, RouteKey0, Metric]),
RouteKey = get_route_key(RouteKey0),
endpoint_subscription:publish(RouteKey, ServiceId, Metric),
NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE),
{keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}}
end;
endpoint_subscription:publish(RouteKey, Metric),
{keep_state, State};
%% ping的数据是通过aes加密后的
handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] ping host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
handle_event(cast, {handle, {event, #event{service_id = ServiceId, event_type = EventType, params = Params}}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) ->
lager:debug("[iot_host] event uuid: ~p, service_id: ~p, event_type: ~p, params: ~p", [UUID, ServiceId, EventType, Params]),
%DevicePid = iot_device:get_pid(DeviceUUID),
%iot_device:change_status(DevicePid, Status),
{keep_state, State};
%%
handle_event(cast, heartbeat, _, State = #state{heartbeat_counter = HeartbeatCounter}) ->
{keep_state, State#state{heartbeat_counter = HeartbeatCounter + 1}};

View File

@ -23,6 +23,8 @@ init([]) ->
ensured_host_started(UUID) when is_binary(UUID) ->
case iot_host:get_pid(UUID) of
undefined ->
%% host对应的信息
delete_host(UUID),
case supervisor:start_child(?MODULE, child_spec(UUID)) of
{ok, Pid} when is_pid(Pid) ->
{ok, Pid};
@ -35,10 +37,17 @@ ensured_host_started(UUID) when is_binary(UUID) ->
{ok, Pid}
end.
delete_host(UUID) when is_binary(UUID) ->
delete_host(UUID) ->
Id = iot_host:get_name(UUID),
ok = supervisor:terminate_child(?MODULE, Id),
supervisor:delete_child(?MODULE, Id).
case supervisor:delete_child(?MODULE, Id) of
{error, running} ->
%% ensure killed then delete again
iot_host:kill(UUID),
supervisor:delete_child(?MODULE, Id);
_ ->
ok
end.
child_spec(UUID) ->
Id = iot_host:get_name(UUID),

View File

@ -14,6 +14,7 @@
-export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]).
-export([json_data/1, json_error/2, is_json/1]).
-export([queue_limited_in/3, assert_call/2, assert/2]).
-export([sha256/1]).
-spec is_json(Json :: term()) -> boolean().
is_json(Json) when is_binary(Json) ->
@ -80,7 +81,7 @@ json_data(Data) ->
<<"result">> => Data
}, [force_utf8]).
json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) ->
json_error(ErrCode, ErrMessage) when is_integer(ErrCode) ->
jiffy:encode(#{
<<"error">> => #{
<<"code">> => ErrCode,
@ -112,6 +113,14 @@ assert_call(true, Fun) ->
assert_call(false, _) ->
ok.
-spec sha256(Str :: string() | binary()) -> binary().
sha256(Str) when is_list(Str) ->
sha256(unicode:characters_to_binary(Str));
sha256(Bin) when is_binary(Bin) ->
HashBin = crypto:hash(sha256, Bin),
HexStr = lists:flatten([io_lib:format("~2.16.0B", [B]) || B <- binary:bin_to_list(HashBin)]),
list_to_binary(string:lowercase(HexStr)).
-spec md5(Str :: binary()) -> binary().
md5(Str) when is_binary(Str) ->
list_to_binary(lists:flatten([hex(X) || <<X:4>> <= erlang:md5(Str)])).

View File

@ -10,8 +10,10 @@
-author("anlicheng").
-include("message.hrl").
-define(I32, 1).
-define(Bytes, 2).
-define(I8, 1).
-define(I16, 2).
-define(I32, 3).
-define(Bytes, 4).
%% API
-export([encode/2, decode/1]).
@ -35,13 +37,18 @@ encode0(#auth_reply{code = Code, payload = Payload}) ->
]);
encode0(#jsonrpc_reply{result = Result, error = undefined}) ->
ResultBin = erlang:term_to_binary(#{<<"result">> => Result}),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
iolist_to_binary([
marshal(?Bytes, ResultBin)
]);
encode0(#jsonrpc_reply{result = undefined, error = Error}) ->
ResultBin = erlang:term_to_binary(#{<<"error">> => Error}),
iolist_to_binary([marshal(?Bytes, ResultBin)]);
encode0(#pub{topic = Topic, content = Content}) ->
iolist_to_binary([
marshal(?Bytes, ResultBin)
]);
encode0(#pub{topic = Topic, qos = Qos, content = Content}) ->
iolist_to_binary([
marshal(?Bytes, Topic),
marshal(?I8, Qos),
marshal(?Bytes, Content)
]);
encode0(#command{command_type = CommandType, command = Command}) ->
@ -52,20 +59,14 @@ encode0(#command{command_type = CommandType, command = Command}) ->
encode0(#jsonrpc_request{method = Method, params = Params}) ->
ReqBody = erlang:term_to_binary(#{<<"method">> => Method, <<"params">> => Params}),
marshal(?Bytes, ReqBody);
encode0(#data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
marshal(?Bytes, DeviceUUID),
marshal(?Bytes, ReqBody)
]);
encode0(#data{route_key = RouteKey, metric = Metric}) ->
iolist_to_binary([
marshal(?Bytes, RouteKey),
marshal(?Bytes, Metric)
]);
encode0(#event{service_id = ServiceId, event_type = EventType, params = Params}) ->
iolist_to_binary([
marshal(?Bytes, ServiceId),
marshal(?I32, EventType),
marshal(?Bytes, Params)
]);
encode0(#task_event_stream{task_id = TaskId, type = Type, stream = Stream}) ->
iolist_to_binary([
marshal(?I32, TaskId),
@ -92,8 +93,8 @@ decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) ->
_ ->
error
end;
decode0(?MESSAGE_PUB, [Topic, Content]) ->
{ok, #pub{topic = Topic, content = Content}};
decode0(?MESSAGE_PUB, [Topic, Qos, Content]) ->
{ok, #pub{topic = Topic, qos = Qos, content = Content}};
decode0(?MESSAGE_COMMAND, [CommandType, Command]) ->
{ok, #command{command_type = CommandType, command = Command}};
decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
@ -101,10 +102,8 @@ decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) ->
decode0(?MESSAGE_JSONRPC_REQUEST, [ReqBody]) ->
#{<<"method">> := Method, <<"params">> := Params} = erlang:binary_to_term(ReqBody),
{ok, #jsonrpc_request{method = Method, params = Params}};
decode0(?MESSAGE_DATA, [ServiceId, DeviceUUID, RouteKey, Metric]) ->
{ok, #data{service_id = ServiceId, device_uuid = DeviceUUID, route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT, [ServiceId, EventType, Params]) ->
{ok, #event{service_id = ServiceId, event_type = EventType, params = Params}};
decode0(?MESSAGE_DATA, [RouteKey, Metric]) ->
{ok, #data{route_key = RouteKey, metric = Metric}};
decode0(?MESSAGE_EVENT_STREAM, [TaskId, Type, Stream]) ->
{ok, #task_event_stream{task_id = TaskId, type = Type, stream = Stream}};
decode0(_, _) ->
@ -114,7 +113,11 @@ decode0(_, _) ->
%%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
-spec marshal(Type :: integer(), Field :: any()) -> binary().
-spec marshal(Type :: ?I8 | ?I16 | ?I32 | ?Bytes, Field :: integer() | binary()) -> binary().
marshal(?I8, Field) when is_integer(Field) ->
<<?I8, Field:8>>;
marshal(?I16, Field) when is_integer(Field) ->
<<?I16, Field:16>>;
marshal(?I32, Field) when is_integer(Field) ->
<<?I32, Field:32>>;
marshal(?Bytes, Field) when is_binary(Field) ->
@ -126,6 +129,10 @@ unmarshal(Bin) when is_binary(Bin) ->
unmarshal(Bin, []).
unmarshal(<<>>, Acc) ->
{ok, lists:reverse(Acc)};
unmarshal(<<?I8, F:8, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(<<?I16, F:16, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(<<?I32, F:32, Rest/binary>>, Acc) ->
unmarshal(Rest, [F|Acc]);
unmarshal(<<?Bytes, Len:16, F:Len/binary, Rest/binary>>, Acc) ->

View File

@ -0,0 +1,60 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 14. 11 2025 23:29
%%%-------------------------------------------------------------------
-module(endpoint_kafka_test).
-author("anlicheng").
-include("endpoint.hrl").
%% API
-export([start_test/0, test_consumer/0]).
start_test() ->
Name = endpoint:get_name(100),
{ok, Pid} = endpoint_kafka:start_link(Name, #endpoint{
id = 100,
%%
matcher = <<"/dhlr/device/*/*">>,
%%
title = <<"test_kafka_title">>,
%% , : #{<<"protocol">> => <<"http|https|ws|kafka|mqtt">>, <<"args">> => #{}}
config = #kafka_endpoint{
sasl_config = {plain, <<"test">>, <<"test123">>},
bootstrap_servers = [{"118.178.229.213", 9092}],
topic = <<"dhlr_data">>
}
}),
Json = jiffy:encode(#{
<<"name">> => <<"anlicheng">>,
<<"age">> => 30
}, [force_utf8]),
endpoint:forward(Pid, Json),
ok.
test_consumer() ->
KafkaBootstrapEndpoints = [{"118.178.229.213", 9092}],
Topic = <<"dhlr_data">>,
ClientConfig = [
{sasl, {plain, <<"test">>, <<"test123">>}},
{reconnect_cool_down_seconds, 5},
{socket_options, [{keepalive, true}]}
],
ok = brod:start_client(KafkaBootstrapEndpoints, client1, ClientConfig),
SubscriberCallbackFun = fun(_Partition, Msg, ShellPid = CallbackState) ->
lager:debug("call here msg: ~p", [Msg]),
ShellPid ! Msg, {ok, ack, CallbackState}
end,
Res = brod_topic_subscriber:start_link(client1, Topic, all,
_ConsumerConfig=[{begin_offset, 0}],
_CommittedOffsets=[], message, SubscriberCallbackFun,
_CallbackState=self()),
lager:debug("start subscriber res: ~p", [Res]).

View File

@ -0,0 +1,163 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 17. 11 2025 16:48
%%%-------------------------------------------------------------------
-module(endpoint_mqtt_subscriber).
-author("anlicheng").
-author("aresei").
-include("iot.hrl").
-behaviour(gen_server).
%% API
-export([start_link/0]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
%%
-define(Topics,[
{<<"/dhlr/data">>, 0}
]).
-record(state, {
conn_pid :: pid()
}).
%%%===================================================================
%%% API
%%%===================================================================
%% @doc Spawns the server and registers the local name (unique)
-spec(start_link() ->
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
start_link() ->
gen_server:start_link({local, ?MODULE}, ?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
%% emqx服务器的连接
ClientId = <<"mqtt-client-test-host-subscriber">>,
Opts = [
{clientid, ClientId},
{host, "118.178.229.213"},
{port, 1883},
{owner, self()},
{tcp_opts, []},
{username, "admin"},
{password, "admin"},
{keepalive, 86400},
{auto_ack, true},
{proto_ver, v5},
{retry_interval, 5}
],
lager:debug("[opts] is: ~p", [Opts]),
case emqtt:start_link(Opts) of
{ok, ConnPid} ->
%% host相关的全部事件
lager:debug("[iot_mqtt_subscriber] start conntecting, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid),
lager:debug("[iot_mqtt_subscriber] connect success, pid: ~p", [ConnPid]),
SubscribeResult = emqtt:subscribe(ConnPid, ?Topics),
lager:debug("[iot_mqtt_subscriber] subscribe topics: ~p, result is: ~p", [?Topics, SubscribeResult]),
{ok, #state{conn_pid = ConnPid}};
ignore ->
lager:debug("[iot_mqtt_subscriber] connect emqx get ignore"),
{stop, ignore};
{error, Reason} ->
lager:debug("[iot_mqtt_subscriber] connect emqx get error: ~p", [Reason]),
{stop, Reason}
end.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({disconnect, ReasonCode, Properties}, State = #state{}) ->
lager:debug("[iot_mqtt_subscriber] Recv a DISONNECT packet - ReasonCode: ~p, Properties: ~p", [ReasonCode, Properties]),
{stop, disconnected, State};
%% json反序列需要在host进程进行
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State = #state{conn_pid = _ConnPid}) ->
lager:debug("[iot_mqtt_subscriber] Recv a topic: ~p, publish packet: ~p, qos: ~p", [Topic, Payload, Qos]),
%% host进程去处理
{noreply, State};
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
lager:debug("[iot_mqtt_subscriber] receive puback packet: ~p", [Packet]),
{noreply, State};
handle_info(Info, State = #state{}) ->
lager:debug("[iot_mqtt_subscriber] get info: ~p", [Info]),
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(Reason, _State = #state{conn_pid = ConnPid}) when is_pid(ConnPid) ->
%% topic的订阅
TopicNames = lists:map(fun({Name, _}) -> Name end, ?Topics),
{ok, _Props, _ReasonCode} = emqtt:unsubscribe(ConnPid, #{}, TopicNames),
ok = emqtt:disconnect(ConnPid),
lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok;
terminate(Reason, _State) ->
lager:debug("[iot_mqtt_subscriber] terminate with reason: ~p", [Reason]),
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -12,7 +12,7 @@
-behaviour(ranch_protocol).
%% API
-export([pub/3, jsonrpc_call/3, command/3]).
-export([pub/4, jsonrpc_call/3, command/3]).
-export([start_link/3, stop/2]).
%% gen_server callbacks
@ -33,9 +33,9 @@
}).
%%
-spec pub(Pid :: pid(), Topic :: binary(), Content :: binary()) -> no_return().
pub(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) ->
gen_server:cast(Pid, {pub, Topic, Content}).
-spec pub(Pid :: pid(), Topic :: binary(), Qos :: integer(), Content :: binary()) -> no_return().
pub(Pid, Topic, Qos, Content) when is_pid(Pid), is_binary(Topic), is_integer(Qos), is_binary(Content) ->
gen_server:cast(Pid, {pub, Topic, Qos, Content}).
%%
-spec command(Pid :: pid(), CommandType :: integer(), Command :: binary()) -> no_return().
@ -74,8 +74,8 @@ handle_call(_Request, _From, State) ->
{reply, ok, State}.
%% , pub/sub机制
handle_cast({pub, Topic, Content}, State = #state{transport = Transport, socket = Socket}) ->
EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, content = Content}),
handle_cast({pub, Topic, Qos, Content}, State = #state{transport = Transport, socket = Socket}) ->
EncPub = message_codec:encode(?MESSAGE_PUB, #pub{topic = Topic, qos = Qos, content = Content}),
Transport:send(Socket, <<?PACKET_CAST, EncPub/binary>>),
{noreply, State};
@ -137,8 +137,6 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
case CastMessage of
#data{} = Data ->
iot_host:handle(HostPid, {data, Data});
#event{} = Event ->
iot_host:handle(HostPid, {event, Event});
#task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} ->
iot_event_stream_observer:stream_close(TaskId, Reason);
#task_event_stream{task_id = TaskId, type = Type, stream = Stream} ->
@ -153,7 +151,7 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
% {noreply, State};
%%
handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, uuid = UUID, inflight = Inflight}) when PacketId > 0 ->
handle_info({tcp, Socket, <<?PACKET_RESPONSE, PacketId:32, ResponseBin/binary>>}, State = #state{socket = Socket, inflight = Inflight}) when PacketId > 0 ->
{ok, RpcReply} = message_codec:decode(ResponseBin),
case maps:take(PacketId, Inflight) of
error ->

View File

@ -23,10 +23,13 @@ CREATE TABLE `endpoint` (
### config_json中的数据配置
#### http方式: type=http
token是可选的如果接收服务器端需要提供校验可以添加token字段校验算法采用的: $Sign = sha256(token + "post_body" + token), 有token的情况下
通过http推送的数据会增加一个header, 格式为 X-Signature: $Sign
```json
{
"url": "http(s)://www.test.com/api",
"token": "ngyehngcohezqyabnlhollsfcnyqobmwfcfyvbzdbhuubucojcpeefpczruiccfw",
"pool_size": 10
}