Compare commits
10 Commits
d1e69a31f0
...
95028ff149
| Author | SHA1 | Date | |
|---|---|---|---|
| 95028ff149 | |||
| 43864341e6 | |||
| b70a2a35d1 | |||
| 089463e11f | |||
| 99cc98e129 | |||
| 76edfcd11f | |||
| 3050841280 | |||
| 361a3bed9f | |||
| 490894620b | |||
| 5f66983633 |
@ -35,7 +35,9 @@
|
||||
-define(METHOD_FEEDBACK_RESULT, 16#06).
|
||||
-define(METHOD_EVENT, 16#07).
|
||||
%% ai识别的事件上报
|
||||
-define(METHOD_AI_EVENT, 17#08).
|
||||
-define(METHOD_AI_EVENT, 16#08).
|
||||
%% 设备状态上报
|
||||
-define(METHOD_DEVICE_REPORT, 16#09).
|
||||
|
||||
%% 消息体类型
|
||||
-define(PACKET_REQUEST, 16#01).
|
||||
|
||||
@ -25,6 +25,9 @@
|
||||
%% 最大数据缓冲量
|
||||
-define(MAX_QUEUE_SIZE, 5_000_000).
|
||||
|
||||
%% 期望的数据总量
|
||||
-define(DESIRED_VALUE, 400_0000).
|
||||
|
||||
%% 处理日志信息
|
||||
-define(log(Msg), north_data:info(Msg)).
|
||||
|
||||
@ -37,7 +40,10 @@
|
||||
%% 定时器
|
||||
timer_ref :: undefined | reference(),
|
||||
%% 是否繁忙
|
||||
is_busy = false :: boolean()
|
||||
is_busy = false :: boolean(),
|
||||
|
||||
%% 单位时间内的累积值
|
||||
acc_num = 0
|
||||
}).
|
||||
|
||||
%%%===================================================================
|
||||
@ -81,6 +87,9 @@ init([]) ->
|
||||
%% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制
|
||||
erlang:start_timer(0, self(), create_postman),
|
||||
|
||||
%% 检测每个小时内的数据的异常变化
|
||||
erlang:start_timer(3600, self(), check_desired_ticker),
|
||||
|
||||
{ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined}}.
|
||||
|
||||
%% @private
|
||||
@ -132,7 +141,7 @@ handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPi
|
||||
end;
|
||||
|
||||
%% 收到确认消息
|
||||
handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef}) ->
|
||||
handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum}) ->
|
||||
%% 记录日志信息
|
||||
?log(iolist_to_binary(AssocMessage)),
|
||||
|
||||
@ -145,7 +154,7 @@ handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = Ti
|
||||
Key = get_counter_key(iot_util:date()),
|
||||
mnesia_counter:inc(Key),
|
||||
|
||||
{keep_state, State#state{timer_ref = undefined, is_busy = false}, Actions};
|
||||
{keep_state, State#state{timer_ref = undefined, is_busy = false, acc_num = AccNum + 1}, Actions};
|
||||
|
||||
%% 收到重发过期请求
|
||||
handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) ->
|
||||
@ -171,6 +180,20 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq
|
||||
{keep_state, State#state{postman_pid = undefined}}
|
||||
end;
|
||||
|
||||
%% 检测每个小时内的数据增量
|
||||
handle_event(info, {timeout, _, check_desired_ticker}, _, State = #state{acc_num = AccNum}) ->
|
||||
HourDesiredNum = ?DESIRED_VALUE div 24,
|
||||
%% 允许25%的的波动
|
||||
case HourDesiredNum >= AccNum orelse HourDesiredNum - AccNum < erlang:ceil(HourDesiredNum * 0.25) of
|
||||
true ->
|
||||
ok;
|
||||
false ->
|
||||
%% 报警,数据下降异常
|
||||
iot_watchdog:warn(iolist_to_binary([<<"中电数据异常:"/utf8>>, integer_to_binary(AccNum), <<"/h">>]))
|
||||
end,
|
||||
erlang:start_timer(3600, self(), check_desired_ticker),
|
||||
{keep_state, State#state{acc_num = 0}};
|
||||
|
||||
%% 获取当前统计信息
|
||||
handle_event({call, From}, get_stat, StateName, State = #state{iot_queue = Q}) ->
|
||||
Key = get_counter_key(iot_util:date()),
|
||||
|
||||
@ -73,19 +73,24 @@ handle_request("POST", "/device/query_edge_status", _, #{<<"host_id">> := HostId
|
||||
undefined ->
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
|
||||
HostPid when is_pid(HostPid) ->
|
||||
Ref = make_ref(),
|
||||
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
|
||||
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
|
||||
receive
|
||||
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
|
||||
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
|
||||
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
|
||||
{poll_task_reply, Ref, {error, Reason}} ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
|
||||
after Timeout * 1000 ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)}
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
|
||||
true ->
|
||||
case do_poll(HostPid, DeviceUUID, Timeout) of
|
||||
{ok, EdgeStatus} ->
|
||||
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
|
||||
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
|
||||
{error, Reason} ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
|
||||
end;
|
||||
false ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, not found", [DeviceUUID]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
|
||||
end
|
||||
end
|
||||
end;
|
||||
|
||||
@ -93,7 +98,6 @@ handle_request(_, Path, _, _) ->
|
||||
Path1 = list_to_binary(Path),
|
||||
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
||||
|
||||
|
||||
test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) ->
|
||||
Timeout = 10,
|
||||
lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostUUID, DeviceUUID]),
|
||||
@ -101,19 +105,42 @@ test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) ->
|
||||
undefined ->
|
||||
{ok, <<"host not found">>};
|
||||
HostPid when is_pid(HostPid) ->
|
||||
Ref = make_ref(),
|
||||
{ok, {TaskPid, MRef}} = iot_device_poll_task:start_monitor(),
|
||||
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
|
||||
receive
|
||||
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
|
||||
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
|
||||
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
|
||||
{poll_task_reply, Ref, {error, Reason}} ->
|
||||
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
|
||||
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
|
||||
{'DOWN', MRef, process, TaskPid, Reason} ->
|
||||
{ok, <<"task pid down with reason:">>, Reason}
|
||||
after Timeout * 1000 ->
|
||||
{ok, 200, <<"query_edge_status timeout">>}
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
{error, device_not_found};
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
|
||||
true ->
|
||||
do_poll(HostPid, DeviceUUID, Timeout);
|
||||
false ->
|
||||
{error, device_is_dead}
|
||||
end
|
||||
end
|
||||
end.
|
||||
|
||||
-spec do_poll(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> {ok, EdgeStatus :: integer()} | {error, Reason :: any()}.
|
||||
do_poll(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) ->
|
||||
Command = #{
|
||||
<<"device_uuid">> => DeviceUUID,
|
||||
<<"timeout">> => Timeout,
|
||||
<<"command">> => <<"query_status">>
|
||||
},
|
||||
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
|
||||
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
|
||||
{ok, Reply} when is_binary(Reply) ->
|
||||
case catch jiffy:decode(Reply, [return_maps]) of
|
||||
%% 返回的是一个数组
|
||||
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
|
||||
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
|
||||
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
|
||||
true ->
|
||||
{ok, 1};
|
||||
false ->
|
||||
{ok, 0}
|
||||
end;
|
||||
_ ->
|
||||
{error, <<"invalid reply json">>}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
{error, Reason}
|
||||
end.
|
||||
@ -12,6 +12,7 @@
|
||||
|
||||
%% API
|
||||
-export([route_uuid/3]).
|
||||
-export([set_tests/1]).
|
||||
|
||||
-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) ->
|
||||
@ -21,10 +22,19 @@ route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer
|
||||
lager:debug("[iot_ai_router] hget uuid: ~p, location_code: ~p", [RouterUUID, LocationCode]),
|
||||
%% 动火离人推送给金智 2024-12-02, 2边都要推送
|
||||
lists:member(EventType, [15]) andalso iot_donghuoliren_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params),
|
||||
%% 动火离人推送给金智 2024-03-21
|
||||
iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params);
|
||||
|
||||
TestEventTypes = application:get_env(iot, test_event_types, []),
|
||||
case lists:member(EventType, TestEventTypes) of
|
||||
true ->
|
||||
lager:debug("[iot_ai_router] event_type: ~p, is test", [EventType]);
|
||||
false ->
|
||||
iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params)
|
||||
end;
|
||||
{ok, _} ->
|
||||
lager:debug("[iot_ai_router] hget location_code, uuid: ~p, not found", [RouterUUID]);
|
||||
{error, Reason} ->
|
||||
lager:debug("[iot_ai_router] hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
||||
end.
|
||||
|
||||
set_tests(TestEventTypes) when is_list(TestEventTypes) ->
|
||||
application:set_env(iot, test_event_types, TestEventTypes).
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([get_name/1, get_pid/1, serialize/1]).
|
||||
-export([get_name/1, get_pid/1]).
|
||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]).
|
||||
-export([ai_event/3, handle_data/3]).
|
||||
|
||||
@ -37,9 +37,6 @@
|
||||
%% 设备类型
|
||||
model_id = 0,
|
||||
|
||||
%% 设备状态轮询周期
|
||||
poll_interval = 0 :: integer(),
|
||||
poll_ref :: undefined | reference(),
|
||||
%% 设备的最后状态,避免重复更新数据库
|
||||
edge_status = 0 :: integer(),
|
||||
|
||||
@ -50,28 +47,6 @@
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% 格式化efka上传的数据格式
|
||||
%"fields": [
|
||||
%{
|
||||
% "key": "test"
|
||||
% "value": 124,
|
||||
% "unit": "U",
|
||||
% "type": "AI:遥测值,DI:遥信值,SOE:事件",
|
||||
% "timestamp": int
|
||||
%}
|
||||
%],
|
||||
%% 用来保存在内存中的格式,不需要序列话处理 !!!, 放入到influxdb的数据是基于base64的
|
||||
-spec serialize(FieldsList :: [map()]) -> [{Key :: binary(), Values :: map()}].
|
||||
serialize(FieldsList) when is_list(FieldsList) ->
|
||||
lists:flatmap(fun serialize0/1, FieldsList).
|
||||
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
|
||||
Values = maps:remove(<<"key">>, Fields),
|
||||
%S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
|
||||
%[<<"base64:", S/binary>>];
|
||||
[{Key, Values}];
|
||||
serialize0(_) ->
|
||||
[].
|
||||
|
||||
-spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}.
|
||||
is_alive(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
@ -153,13 +128,8 @@ init([DeviceInfo = #{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"h
|
||||
|
||||
EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0),
|
||||
|
||||
%% 轮询机制
|
||||
{ok, PollInterval0} = application:get_env(iot, device_poll_interval),
|
||||
PollInterval = PollInterval0 * 1000,
|
||||
erlang:start_timer(PollInterval, self(), poll_ticker),
|
||||
|
||||
{ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus,
|
||||
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId), poll_interval = PollInterval}}.
|
||||
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
@ -282,26 +252,8 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, poll_ticker}, State = #state{poll_interval = PollInterval, device_uuid = DeviceUUID, host_id = HostId}) ->
|
||||
%% 开启下一次轮询
|
||||
erlang:start_timer(PollInterval, self(), poll_ticker),
|
||||
|
||||
AliasName = iot_host:get_alias_name(HostId),
|
||||
case global:whereis_name(AliasName) of
|
||||
undefined ->
|
||||
lager:warning("[iot_device] device_uuid: ~p, host not found", [DeviceUUID]),
|
||||
{noreply, State};
|
||||
HostPid when is_pid(HostPid) ->
|
||||
%% 建立临时任务,不阻塞当前进程
|
||||
Ref = make_ref(),
|
||||
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
|
||||
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, 10),
|
||||
|
||||
{noreply, State#state{poll_ref = Ref}}
|
||||
end;
|
||||
|
||||
%% 处理响应
|
||||
handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
|
||||
handle_info({poll_task_reply, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) ->
|
||||
case EdgeStatus =/= LastEdgeStatus of
|
||||
true ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]),
|
||||
@ -313,7 +265,7 @@ handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid
|
||||
end;
|
||||
|
||||
%% 通讯超时逻辑需要处理
|
||||
handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
|
||||
handle_info({poll_task_reply, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) ->
|
||||
lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]),
|
||||
EdgeStatus = -1,
|
||||
case EdgeStatus =/= LastEdgeStatus of
|
||||
@ -326,7 +278,7 @@ handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid
|
||||
end;
|
||||
|
||||
%% 其他类型的错误暂时不更新数据库状态
|
||||
handle_info({poll_task_reply, Ref, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, poll_ref = Ref}) ->
|
||||
handle_info({poll_task_reply, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus}) ->
|
||||
lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]),
|
||||
{noreply, State};
|
||||
|
||||
|
||||
@ -1,135 +0,0 @@
|
||||
%%%-------------------------------------------------------------------
|
||||
%%% @author anlicheng
|
||||
%%% @copyright (C) 2025, <COMPANY>
|
||||
%%% @doc
|
||||
%%%
|
||||
%%% @end
|
||||
%%% Created : 10. 3月 2025 21:09
|
||||
%%%-------------------------------------------------------------------
|
||||
-module(iot_device_poll_task).
|
||||
-author("anlicheng").
|
||||
|
||||
-behaviour(gen_server).
|
||||
|
||||
%% API
|
||||
-export([start_monitor/0]).
|
||||
-export([poll_task/6]).
|
||||
|
||||
%% gen_server callbacks
|
||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||
|
||||
-define(SERVER, ?MODULE).
|
||||
|
||||
-record(state, {}).
|
||||
|
||||
%%%===================================================================
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
-spec poll_task(Pid :: pid(), Ref :: reference(), HostPid :: pid(), ReceiverPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return().
|
||||
poll_task(Pid, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout)
|
||||
when is_pid(Pid), is_reference(Ref), is_pid(HostPid), is_pid(ReceiverPid), is_binary(DeviceUUID), is_integer(Timeout) ->
|
||||
gen_server:cast(Pid, {poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}).
|
||||
|
||||
%% @doc Spawns the server and registers the local name (unique)
|
||||
-spec(start_monitor() ->
|
||||
{'ok', {Pid :: pid(), MonRef :: reference()}} | 'ignore' | {'error', Reason :: term()}).
|
||||
start_monitor() ->
|
||||
gen_server:start_monitor(?MODULE, [], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_server callbacks
|
||||
%%%===================================================================
|
||||
|
||||
%% @private
|
||||
%% @doc Initializes the server
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([]) ->
|
||||
{ok, #state{}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
|
||||
State :: #state{}) ->
|
||||
{reply, Reply :: term(), NewState :: #state{}} |
|
||||
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_call(_Request, _From, State = #state{}) ->
|
||||
{reply, ok, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling cast messages
|
||||
-spec(handle_cast(Request :: term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast({poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}, State = #state{}) ->
|
||||
Command = #{
|
||||
<<"device_uuid">> => DeviceUUID,
|
||||
<<"timeout">> => Timeout,
|
||||
<<"command">> => <<"query_status">>
|
||||
},
|
||||
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
|
||||
|
||||
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
|
||||
{ok, Reply} when is_binary(Reply) ->
|
||||
case catch jiffy:decode(Reply, [return_maps]) of
|
||||
%% 返回的是一个数组
|
||||
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
|
||||
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
|
||||
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
|
||||
true ->
|
||||
ReceiverPid ! {poll_task_reply, Ref, {ok, 1}};
|
||||
false ->
|
||||
case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of
|
||||
true ->
|
||||
ReceiverPid ! {poll_task_reply, Ref, {ok, 0}};
|
||||
false ->
|
||||
ReceiverPid ! {poll_task_reply, Ref, {ok, lists:min(EdgeStatusList)}}
|
||||
end
|
||||
end;
|
||||
_ ->
|
||||
ReceiverPid ! {poll_task_reply, Ref, {error, <<"invalid reply json">>}}
|
||||
end;
|
||||
{error, Reason} ->
|
||||
ReceiverPid ! {poll_task_reply, Ref, {error, Reason}}
|
||||
end,
|
||||
{stop, normal, State};
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info(_Info, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
|
||||
%% @private
|
||||
%% @doc This function is called by a gen_server when it is about to
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_server terminates
|
||||
%% with Reason. The return value is ignored.
|
||||
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
|
||||
State :: #state{}) -> term()).
|
||||
terminate(_Reason, _State = #state{}) ->
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
%% @doc Convert process state when code is changed
|
||||
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
|
||||
Extra :: term()) ->
|
||||
{ok, NewState :: #state{}} | {error, Reason :: term()}).
|
||||
code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
{ok, State}.
|
||||
|
||||
%%%===================================================================
|
||||
%%% Internal functions
|
||||
%%%===================================================================
|
||||
@ -199,8 +199,10 @@ init([UUID]) ->
|
||||
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
|
||||
|
||||
StateName = case AuthorizeStatus =:= 1 of
|
||||
true -> ?STATE_ACTIVATED;
|
||||
false -> ?STATE_DENIED
|
||||
true ->
|
||||
?STATE_ACTIVATED;
|
||||
false ->
|
||||
?STATE_DENIED
|
||||
end,
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}};
|
||||
undefined ->
|
||||
@ -309,6 +311,7 @@ handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes
|
||||
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
|
||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
|
||||
|
||||
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
|
||||
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]),
|
||||
@ -320,6 +323,7 @@ handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, cha
|
||||
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
|
||||
ws_channel:stop(ChannelPid, closed),
|
||||
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
|
||||
|
||||
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
|
||||
|
||||
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
|
||||
@ -358,17 +362,18 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #sta
|
||||
{keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
|
||||
%% 重新加载设备信息
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State) ->
|
||||
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{}) ->
|
||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||
{ok, DevicePid} ->
|
||||
iot_device:reload(DevicePid),
|
||||
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||
end;
|
||||
|
||||
%% 删除设备
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State) ->
|
||||
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{}) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
ok;
|
||||
@ -378,11 +383,10 @@ handle_event({call, From}, {delete_device, DeviceUUID}, _, State) ->
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
|
||||
%% 激活设备
|
||||
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) ->
|
||||
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{}) ->
|
||||
case iot_device_sup:ensured_device_started(DeviceUUID) of
|
||||
{ok, DevicePid} ->
|
||||
iot_device:auth(DevicePid, Auth),
|
||||
|
||||
{keep_state, State, [{reply, From, ok}]};
|
||||
{error, Reason} ->
|
||||
{keep_state, State, [{reply, From, {error, Reason}}]}
|
||||
@ -399,6 +403,20 @@ handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
handle_event(cast, {handle, {device_report, Report}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
|
||||
PlainReport = iot_cipher_aes:decrypt(AES, Report),
|
||||
case catch jiffy:decode(PlainReport, [return_maps]) of
|
||||
DeviceEdgeStatusList when is_list(DeviceEdgeStatusList) ->
|
||||
lager:debug("[iot_host] host: ~p, update device edge_status num: ~p", [UUID, length(DeviceEdgeStatusList)]),
|
||||
%% 更新设备的状态
|
||||
lists:foreach(fun(#{<<"device_uuid">> := DeviceUUID, <<"edge_status">> := EdgeStatus}) ->
|
||||
device_bo:change_edge_status(DeviceUUID, EdgeStatus)
|
||||
end, DeviceEdgeStatusList);
|
||||
Other ->
|
||||
lager:notice("[iot_host] the device_report is invalid json: ~p", [Other])
|
||||
end,
|
||||
{keep_state, State};
|
||||
|
||||
%% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行
|
||||
handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, name = Name, aes = AES,
|
||||
heartbeat_counter = HeartbeatCounter, has_session = true}) ->
|
||||
|
||||
@ -126,6 +126,11 @@ websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_AI_EVENT:8, C
|
||||
iot_host:handle(HostPid, {ai_event, CipherEvent}),
|
||||
{ok, State};
|
||||
|
||||
%% 设备状态上报
|
||||
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_DEVICE_REPORT:8, CipherReport/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
|
||||
iot_host:handle(HostPid, {device_report, CipherReport}),
|
||||
{ok, State};
|
||||
|
||||
%% 主机端的消息响应
|
||||
websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, 0:32, Body/binary>>}, State = #state{uuid = UUID}) ->
|
||||
lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]),
|
||||
@ -136,7 +141,7 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
|
||||
error ->
|
||||
lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]),
|
||||
{ok, State};
|
||||
{{ReceiverPid, Ref}, NInflight} ->
|
||||
{{ReceiverPid, Ref, _}, NInflight} ->
|
||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||
true when Body == <<>> ->
|
||||
ReceiverPid ! {ws_response, Ref};
|
||||
@ -164,7 +169,7 @@ websocket_info({publish, ReceiverPid, Ref, Msg, Timeout}, State = #state{packet_
|
||||
TTL = iot_util:timestamp() + Timeout,
|
||||
maps:put(PacketId, {ReceiverPid, Ref, TTL}, Inflight);
|
||||
false ->
|
||||
maps:put(PacketId, {ReceiverPid, Ref}, Inflight)
|
||||
maps:put(PacketId, {ReceiverPid, Ref, 0}, Inflight)
|
||||
end,
|
||||
{reply, {binary, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State#state{packet_id = PacketId + 1, inflight = NInflight}};
|
||||
|
||||
@ -177,13 +182,8 @@ websocket_info({timeout, _, clean_ticker}, State=#state{inflight = Inflight}) ->
|
||||
clean_ticker(),
|
||||
|
||||
Timestamp = iot_util:timestamp(),
|
||||
NInflight = maps:filter(fun(_, Promise) ->
|
||||
case Promise of
|
||||
{_ReceiverPid, _Ref, TTL} ->
|
||||
TTL < Timestamp;
|
||||
{_ReceiverPid, _Ref} ->
|
||||
true
|
||||
end
|
||||
NInflight = maps:filter(fun(_, {_ReceiverPid, _Ref, TTL}) ->
|
||||
TTL == 0 orelse (TTL > 0 andalso TTL < Timestamp)
|
||||
end, Inflight),
|
||||
|
||||
{ok, State#state{inflight = NInflight}};
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user