Compare commits

..

No commits in common. "95028ff1497ae03cba8048cf40936a59076ef034" and "d1e69a31f04dd4c7250fa6b534206a7a6890244d" have entirely different histories.

8 changed files with 238 additions and 135 deletions

View File

@ -35,9 +35,7 @@
-define(METHOD_FEEDBACK_RESULT, 16#06).
-define(METHOD_EVENT, 16#07).
%% ai识别的事件上报
-define(METHOD_AI_EVENT, 16#08).
%%
-define(METHOD_DEVICE_REPORT, 16#09).
-define(METHOD_AI_EVENT, 17#08).
%%
-define(PACKET_REQUEST, 16#01).

View File

@ -25,9 +25,6 @@
%%
-define(MAX_QUEUE_SIZE, 5_000_000).
%%
-define(DESIRED_VALUE, 400_0000).
%%
-define(log(Msg), north_data:info(Msg)).
@ -40,10 +37,7 @@
%%
timer_ref :: undefined | reference(),
%%
is_busy = false :: boolean(),
%%
acc_num = 0
is_busy = false :: boolean()
}).
%%%===================================================================
@ -87,9 +81,6 @@ init([]) ->
%% ,
erlang:start_timer(0, self(), create_postman),
%%
erlang:start_timer(3600, self(), check_desired_ticker),
{ok, disconnected, #state{mqtt_opts = Opts, iot_queue = iot_queue:new(?MAX_QUEUE_SIZE), postman_pid = undefined}}.
%% @private
@ -141,7 +132,7 @@ handle_event(info, fetch_next, connected, State = #state{postman_pid = PostmanPi
end;
%%
handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef, acc_num = AccNum}) ->
handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = TimerRef}) ->
%%
?log(iolist_to_binary(AssocMessage)),
@ -154,7 +145,7 @@ handle_event(info, {ack, AssocMessage}, StateName, State = #state{timer_ref = Ti
Key = get_counter_key(iot_util:date()),
mnesia_counter:inc(Key),
{keep_state, State#state{timer_ref = undefined, is_busy = false, acc_num = AccNum + 1}, Actions};
{keep_state, State#state{timer_ref = undefined, is_busy = false}, Actions};
%%
handle_event(info, {timeout, _, {repost_ticker, Body}}, connected, State = #state{postman_pid = PostmanPid}) ->
@ -180,20 +171,6 @@ handle_event(info, {timeout, _, create_postman}, disconnected, State = #state{mq
{keep_state, State#state{postman_pid = undefined}}
end;
%%
handle_event(info, {timeout, _, check_desired_ticker}, _, State = #state{acc_num = AccNum}) ->
HourDesiredNum = ?DESIRED_VALUE div 24,
%% 25%
case HourDesiredNum >= AccNum orelse HourDesiredNum - AccNum < erlang:ceil(HourDesiredNum * 0.25) of
true ->
ok;
false ->
%%
iot_watchdog:warn(iolist_to_binary([<<"中电数据异常:"/utf8>>, integer_to_binary(AccNum), <<"/h">>]))
end,
erlang:start_timer(3600, self(), check_desired_ticker),
{keep_state, State#state{acc_num = 0}};
%%
handle_event({call, From}, get_stat, StateName, State = #state{iot_queue = Q}) ->
Key = get_counter_key(iot_util:date()),

View File

@ -73,24 +73,19 @@ handle_request("POST", "/device/query_edge_status", _, #{<<"host_id">> := HostId
undefined ->
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
HostPid when is_pid(HostPid) ->
case iot_device:get_pid(DeviceUUID) of
undefined ->
ok;
DevicePid when is_pid(DevicePid) ->
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
true ->
case do_poll(HostPid, DeviceUUID, Timeout) of
{ok, EdgeStatus} ->
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
{error, Reason} ->
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
end;
false ->
lager:debug("[device_handler] query_edge_status device: ~p, not found", [DeviceUUID]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
end
Ref = make_ref(),
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
receive
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
{poll_task_reply, Ref, {error, Reason}} ->
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)}
after Timeout * 1000 ->
lager:debug("[device_handler] query_edge_status device: ~p, timeout", [DeviceUUID]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status timeout">>)}
end
end;
@ -98,6 +93,7 @@ handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) ->
Timeout = 10,
lager:debug("[device_handler] host_id: ~p, will query_edge_status uuid: ~p", [HostUUID, DeviceUUID]),
@ -105,42 +101,19 @@ test(HostUUID, DeviceUUID) when is_binary(HostUUID), is_binary(DeviceUUID) ->
undefined ->
{ok, <<"host not found">>};
HostPid when is_pid(HostPid) ->
case iot_device:get_pid(DeviceUUID) of
undefined ->
{error, device_not_found};
DevicePid when is_pid(DevicePid) ->
case is_process_alive(DevicePid) andalso iot_device:is_activated(DevicePid) of
true ->
do_poll(HostPid, DeviceUUID, Timeout);
false ->
{error, device_is_dead}
end
Ref = make_ref(),
{ok, {TaskPid, MRef}} = iot_device_poll_task:start_monitor(),
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, Timeout),
receive
{poll_task_reply, Ref, {ok, EdgeStatus}} ->
{ok, _} = device_bo:change_edge_status(DeviceUUID, EdgeStatus),
{ok, 200, iot_util:json_data(#{<<"edge_status">> => EdgeStatus})};
{poll_task_reply, Ref, {error, Reason}} ->
lager:debug("[device_handler] query_edge_status device: ~p, get error: ~p", [DeviceUUID, Reason]),
{ok, 200, iot_util:json_error(404, <<"query_edge_status failed">>)};
{'DOWN', MRef, process, TaskPid, Reason} ->
{ok, <<"task pid down with reason:">>, Reason}
after Timeout * 1000 ->
{ok, 200, <<"query_edge_status timeout">>}
end
end.
-spec do_poll(HostPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> {ok, EdgeStatus :: integer()} | {error, Reason :: any()}.
do_poll(HostPid, DeviceUUID, Timeout) when is_pid(HostPid), is_binary(DeviceUUID), is_integer(Timeout) ->
Command = #{
<<"device_uuid">> => DeviceUUID,
<<"timeout">> => Timeout,
<<"command">> => <<"query_status">>
},
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
{ok, Reply} when is_binary(Reply) ->
case catch jiffy:decode(Reply, [return_maps]) of
%%
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
true ->
{ok, 1};
false ->
{ok, 0}
end;
_ ->
{error, <<"invalid reply json">>}
end;
{error, Reason} ->
{error, Reason}
end.

View File

@ -12,7 +12,6 @@
%% API
-export([route_uuid/3]).
-export([set_tests/1]).
-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return().
route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) ->
@ -22,19 +21,10 @@ route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer
lager:debug("[iot_ai_router] hget uuid: ~p, location_code: ~p", [RouterUUID, LocationCode]),
%% 2024-12-02, 2
lists:member(EventType, [15]) andalso iot_donghuoliren_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params),
TestEventTypes = application:get_env(iot, test_event_types, []),
case lists:member(EventType, TestEventTypes) of
true ->
lager:debug("[iot_ai_router] event_type: ~p, is test", [EventType]);
false ->
iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params)
end;
%% 2024-03-21
iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params);
{ok, _} ->
lager:debug("[iot_ai_router] hget location_code, uuid: ~p, not found", [RouterUUID]);
{error, Reason} ->
lager:debug("[iot_ai_router] hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
end.
set_tests(TestEventTypes) when is_list(TestEventTypes) ->
application:set_env(iot, test_event_types, TestEventTypes).
end.

View File

@ -14,7 +14,7 @@
-behaviour(gen_server).
%% API
-export([get_name/1, get_pid/1]).
-export([get_name/1, get_pid/1, serialize/1]).
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]).
-export([ai_event/3, handle_data/3]).
@ -37,6 +37,9 @@
%%
model_id = 0,
%%
poll_interval = 0 :: integer(),
poll_ref :: undefined | reference(),
%%
edge_status = 0 :: integer(),
@ -47,6 +50,28 @@
%%% API
%%%===================================================================
%% efka上传的数据格式
%"fields": [
%{
% "key": "test"
% "value": 124,
% "unit": "U",
% "type": "AI遥测值DI遥信值SOE事件",
% "timestamp": int
%}
%],
%% !!!, influxdb的数据是基于base64的
-spec serialize(FieldsList :: [map()]) -> [{Key :: binary(), Values :: map()}].
serialize(FieldsList) when is_list(FieldsList) ->
lists:flatmap(fun serialize0/1, FieldsList).
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
Values = maps:remove(<<"key">>, Fields),
%S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
%[<<"base64:", S/binary>>];
[{Key, Values}];
serialize0(_) ->
[].
-spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}.
is_alive(DeviceUUID) when is_binary(DeviceUUID) ->
case iot_device:get_pid(DeviceUUID) of
@ -128,8 +153,13 @@ init([DeviceInfo = #{<<"id">> := DeviceId, <<"device_uuid">> := DeviceUUID, <<"h
EdgeStatus = maps:get(<<"edge_status">>, DeviceInfo, 0),
%%
{ok, PollInterval0} = application:get_env(iot, device_poll_interval),
PollInterval = PollInterval0 * 1000,
erlang:start_timer(PollInterval, self(), poll_ticker),
{ok, #state{device_id = DeviceId, device_uuid = DeviceUUID, ai_event_throttle = AiEventThrottle, host_id = as_integer(HostId), edge_status = EdgeStatus,
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId)}}.
status = as_integer(Status), auth_status = as_integer(AuthorizeStatus), model_id = as_integer(ModelId), poll_interval = PollInterval}}.
%% @private
%% @doc Handling call messages
@ -252,8 +282,26 @@ handle_cast({handle_data, Fields, Timestamp}, State = #state{device_uuid = Devic
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({timeout, _, poll_ticker}, State = #state{poll_interval = PollInterval, device_uuid = DeviceUUID, host_id = HostId}) ->
%%
erlang:start_timer(PollInterval, self(), poll_ticker),
AliasName = iot_host:get_alias_name(HostId),
case global:whereis_name(AliasName) of
undefined ->
lager:warning("[iot_device] device_uuid: ~p, host not found", [DeviceUUID]),
{noreply, State};
HostPid when is_pid(HostPid) ->
%%
Ref = make_ref(),
{ok, {TaskPid, _}} = iot_device_poll_task:start_monitor(),
iot_device_poll_task:poll_task(TaskPid, Ref, HostPid, self(), DeviceUUID, 10),
{noreply, State#state{poll_ref = Ref}}
end;
%%
handle_info({poll_task_reply, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) ->
handle_info({poll_task_reply, Ref, {ok, EdgeStatus}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
case EdgeStatus =/= LastEdgeStatus of
true ->
lager:debug("[iot_device] device_uuid: ~p, poll edge_status is: ~p", [DeviceUUID, EdgeStatus]),
@ -265,7 +313,7 @@ handle_info({poll_task_reply, {ok, EdgeStatus}}, State = #state{device_uuid = De
end;
%%
handle_info({poll_task_reply, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status}) ->
handle_info({poll_task_reply, Ref, {error, timeout}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, status = Status, poll_ref = Ref}) ->
lager:debug("[iot_device] device_uuid: ~p, poll timeout", [DeviceUUID]),
EdgeStatus = -1,
case EdgeStatus =/= LastEdgeStatus of
@ -278,7 +326,7 @@ handle_info({poll_task_reply, {error, timeout}}, State = #state{device_uuid = De
end;
%%
handle_info({poll_task_reply, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus}) ->
handle_info({poll_task_reply, Ref, {error, Reason}}, State = #state{device_uuid = DeviceUUID, edge_status = LastEdgeStatus, poll_ref = Ref}) ->
lager:notice("[iot_device] device_uuid: ~p, poll get error: ~p, last edge_status: ~p", [DeviceUUID, Reason, LastEdgeStatus]),
{noreply, State};

View File

@ -0,0 +1,135 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 10. 3 2025 21:09
%%%-------------------------------------------------------------------
-module(iot_device_poll_task).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_monitor/0]).
-export([poll_task/6]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {}).
%%%===================================================================
%%% API
%%%===================================================================
-spec poll_task(Pid :: pid(), Ref :: reference(), HostPid :: pid(), ReceiverPid :: pid(), DeviceUUID :: binary(), Timeout :: integer()) -> no_return().
poll_task(Pid, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout)
when is_pid(Pid), is_reference(Ref), is_pid(HostPid), is_pid(ReceiverPid), is_binary(DeviceUUID), is_integer(Timeout) ->
gen_server:cast(Pid, {poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_monitor() ->
{'ok', {Pid :: pid(), MonRef :: reference()}} | 'ignore' | {'error', Reason :: term()}).
start_monitor() ->
gen_server:start_monitor(?MODULE, [], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([]) ->
{ok, #state{}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call(_Request, _From, State = #state{}) ->
{reply, ok, State}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({poll_task, Ref, HostPid, ReceiverPid, DeviceUUID, Timeout}, State = #state{}) ->
Command = #{
<<"device_uuid">> => DeviceUUID,
<<"timeout">> => Timeout,
<<"command">> => <<"query_status">>
},
BinCommand = iolist_to_binary(jiffy:encode(Command, [force_utf8])),
case iot_host:publish_message(HostPid, 17, {aes, BinCommand}, Timeout * 1000) of
{ok, Reply} when is_binary(Reply) ->
case catch jiffy:decode(Reply, [return_maps]) of
%%
#{<<"result">> := ResponseList} when is_list(ResponseList) andalso length(ResponseList) > 0 ->
EdgeStatusList = lists:map(fun(#{<<"edge_status">> := EdgeStatus}) -> EdgeStatus end, ResponseList),
case lists:any(fun(S) -> S =:= 1 end, EdgeStatusList) of
true ->
ReceiverPid ! {poll_task_reply, Ref, {ok, 1}};
false ->
case lists:all(fun(S) -> S =:= 0 end, EdgeStatusList) of
true ->
ReceiverPid ! {poll_task_reply, Ref, {ok, 0}};
false ->
ReceiverPid ! {poll_task_reply, Ref, {ok, lists:min(EdgeStatusList)}}
end
end;
_ ->
ReceiverPid ! {poll_task_reply, Ref, {error, <<"invalid reply json">>}}
end;
{error, Reason} ->
ReceiverPid ! {poll_task_reply, Ref, {error, Reason}}
end,
{stop, normal, State};
handle_cast(_Request, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================

View File

@ -199,10 +199,8 @@ init([UUID]) ->
erlang:start_timer(?HEARTBEAT_INTERVAL, self(), heartbeat_ticker),
StateName = case AuthorizeStatus =:= 1 of
true ->
?STATE_ACTIVATED;
false ->
?STATE_DENIED
true -> ?STATE_ACTIVATED;
false -> ?STATE_DENIED
end,
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, name = Name, has_session = false}};
undefined ->
@ -311,7 +309,6 @@ handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, aes
BinReply = jiffy:encode(#{<<"auth">> => true, <<"aes">> => Aes}, [force_utf8]),
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
lager:debug("[iot_host] uuid: ~p, activate: true, will send message: ~p", [UUID, BinReply]),
{next_state, ?STATE_ACTIVATED, State, [{reply, From, ok}]};
handle_event({call, From}, {activate, true}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
lager:debug("[iot_host] uuid: ~p, activate: true, no channel", [UUID]),
@ -323,7 +320,6 @@ handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, cha
ws_channel:send(ChannelPid, <<8:8, BinReply/binary>>),
ws_channel:stop(ChannelPid, closed),
lager:debug("[iot_host] uuid: ~p, activate: false, will send message: ~p", [UUID, BinReply]),
{next_state, ?STATE_DENIED, State#state{channel_pid = undefined, has_session = false}, [{reply, From, ok}]};
handle_event({call, From}, {activate, false}, _, State = #state{uuid = UUID, channel_pid = undefined}) ->
@ -362,18 +358,17 @@ handle_event({call, From}, {create_session, PubKey}, ?STATE_DENIED, State = #sta
{keep_state, State#state{has_session = false}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
%%
handle_event({call, From}, {reload_device, DeviceUUID}, _, State = #state{}) ->
handle_event({call, From}, {reload_device, DeviceUUID}, _, State) ->
case iot_device_sup:ensured_device_started(DeviceUUID) of
{ok, DevicePid} ->
iot_device:reload(DevicePid),
{keep_state, State, [{reply, From, ok}]};
{error, Reason} ->
{keep_state, State, [{reply, From, {error, Reason}}]}
end;
%%
handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{}) ->
handle_event({call, From}, {delete_device, DeviceUUID}, _, State) ->
case iot_device:get_pid(DeviceUUID) of
undefined ->
ok;
@ -383,10 +378,11 @@ handle_event({call, From}, {delete_device, DeviceUUID}, _, State = #state{}) ->
{keep_state, State, [{reply, From, ok}]};
%%
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #state{}) ->
handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State) ->
case iot_device_sup:ensured_device_started(DeviceUUID) of
{ok, DevicePid} ->
iot_device:auth(DevicePid, Auth),
{keep_state, State, [{reply, From, ok}]};
{error, Reason} ->
{keep_state, State, [{reply, From, {error, Reason}}]}
@ -403,20 +399,6 @@ handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{aes
end,
{keep_state, State};
handle_event(cast, {handle, {device_report, Report}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, aes = AES, has_session = true}) ->
PlainReport = iot_cipher_aes:decrypt(AES, Report),
case catch jiffy:decode(PlainReport, [return_maps]) of
DeviceEdgeStatusList when is_list(DeviceEdgeStatusList) ->
lager:debug("[iot_host] host: ~p, update device edge_status num: ~p", [UUID, length(DeviceEdgeStatusList)]),
%%
lists:foreach(fun(#{<<"device_uuid">> := DeviceUUID, <<"edge_status">> := EdgeStatus}) ->
device_bo:change_edge_status(DeviceUUID, EdgeStatus)
end, DeviceEdgeStatusList);
Other ->
lager:notice("[iot_host] the device_report is invalid json: ~p", [Other])
end,
{keep_state, State};
%% ping的数据是通过aes加密后的
handle_event(cast, {handle, {ping, CipherMetric}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, name = Name, aes = AES,
heartbeat_counter = HeartbeatCounter, has_session = true}) ->

View File

@ -126,11 +126,6 @@ websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_AI_EVENT:8, C
iot_host:handle(HostPid, {ai_event, CipherEvent}),
{ok, State};
%%
websocket_handle({binary, <<?PACKET_REQUEST, _PacketId:32, ?METHOD_DEVICE_REPORT:8, CipherReport/binary>>}, State = #state{host_pid = HostPid}) when is_pid(HostPid) ->
iot_host:handle(HostPid, {device_report, CipherReport}),
{ok, State};
%%
websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, 0:32, Body/binary>>}, State = #state{uuid = UUID}) ->
lager:debug("[ws_channel] uuid: ~p, get send response message: ~p", [UUID, Body]),
@ -141,7 +136,7 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
error ->
lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]),
{ok, State};
{{ReceiverPid, Ref, _}, NInflight} ->
{{ReceiverPid, Ref}, NInflight} ->
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
true when Body == <<>> ->
ReceiverPid ! {ws_response, Ref};
@ -169,7 +164,7 @@ websocket_info({publish, ReceiverPid, Ref, Msg, Timeout}, State = #state{packet_
TTL = iot_util:timestamp() + Timeout,
maps:put(PacketId, {ReceiverPid, Ref, TTL}, Inflight);
false ->
maps:put(PacketId, {ReceiverPid, Ref, 0}, Inflight)
maps:put(PacketId, {ReceiverPid, Ref}, Inflight)
end,
{reply, {binary, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State#state{packet_id = PacketId + 1, inflight = NInflight}};
@ -182,8 +177,13 @@ websocket_info({timeout, _, clean_ticker}, State=#state{inflight = Inflight}) ->
clean_ticker(),
Timestamp = iot_util:timestamp(),
NInflight = maps:filter(fun(_, {_ReceiverPid, _Ref, TTL}) ->
TTL == 0 orelse (TTL > 0 andalso TTL < Timestamp)
NInflight = maps:filter(fun(_, Promise) ->
case Promise of
{_ReceiverPid, _Ref, TTL} ->
TTL < Timestamp;
{_ReceiverPid, _Ref} ->
true
end
end, Inflight),
{ok, State#state{inflight = NInflight}};