diff --git a/apps/iot/include/message_pb.hrl b/apps/iot/include/message_pb.hrl index 9698fbb..6aa36e0 100644 --- a/apps/iot/include/message_pb.hrl +++ b/apps/iot/include/message_pb.hrl @@ -82,7 +82,8 @@ -define('DATA_PB_H', true). -record(data, {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional - metric = <<>> :: unicode:chardata() | undefined % = 2, optional + device_uuid = <<>> :: unicode:chardata() | undefined, % = 2, optional + metric = <<>> :: unicode:chardata() | undefined % = 3, optional }). -endif. diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index e45063d..5bca396 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -324,9 +324,27 @@ handle_event({call, From}, {activate_device, DeviceUUID, Auth}, _, State = #stat end; %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到 -handle_event(cast, {handle, {data, Data}}, ?STATE_ACTIVATED, State = #state{has_session = true}) -> - %handle_data(Data, State), - {keep_state, State}; +handle_event(cast, {handle, {data, DataBin}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true, device_map = DeviceMap}) -> + #data{service_id = ServiceId, device_uuid = DeviceUUID, metric = Metric} = message_pb:decode_msg(DataBin, data), + case DeviceUUID =/= <<"">> of + true -> + case maps:find(DeviceUUID, DeviceMap) of + error -> + lager:warning("[iot_host] host uuid: ~p, device uuid: ~p not found, metric: ~p", [UUID, DeviceUUID, Metric]), + {keep_state, State}; + {ok, Device} -> + case iot_device:is_activated(Device) of + true -> + NDevice = iot_device:change_status(Device, ?DEVICE_ONLINE), + {keep_state, State#state{device_map = maps:put(DeviceUUID, NDevice, DeviceMap)}}; + false -> + lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, metric: ~p", [UUID, DeviceUUID, Metric]), + {keep_state, State} + end + end; + false -> + {keep_state, State} + end; %% ping的数据是通过aes加密后的,因此需要在有会话的情况下才行 handle_event(cast, {handle, {ping, Metrics}}, ?STATE_ACTIVATED, State = #state{uuid = UUID, has_session = true}) -> @@ -417,43 +435,6 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== -%% 处理相关数据 -%handle_data(#data{device_uuid = DeviceUUID, service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID, device_map = DeviceMap}) -% when is_binary(DeviceUUID), DeviceUUID /= <<>> -> -% -% case maps:find(DeviceUUID, DeviceMap) of -% -% end, -% -% -% case iot_device:get_pid(DeviceUUID) of -% undefined -> -% lager:warning("[iot_host] host uuid: ~p, device uuid: ~p not found, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags]), -% ok; -% DevicePid when is_pid(DevicePid) -> -% case iot_device:is_activated(DevicePid) of -% true -> -% %% 查找终端设备对应的点位信息 -% iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), -% -% %% 数据写入influxdb -% NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID}, -% influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp), -% -% iot_device:change_status(DevicePid, ?DEVICE_ONLINE); -% false -> -% lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags]) -% end -% end; -% -%handle_data(#data{service_name = ServiceName, at = Timestamp, fields = FieldsList, tags = Tags}, #state{uuid = UUID}) -> -% %% 查找终端设备对应的点位信息 -% iot_router:route_uuid(UUID, FieldsList, Timestamp), -% -% %% 数据写入influxdb -% NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName}, -% influx_client:write_data(UUID, NTags, FieldsList, Timestamp). - -spec report_event(UUID :: binary(), NewStatus :: integer()) -> no_return(). report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) -> TextMap = #{ diff --git a/apps/iot/src/proto/message_pb.erl b/apps/iot/src/proto/message_pb.erl index 3c01aae..c109667 100644 --- a/apps/iot/src/proto/message_pb.erl +++ b/apps/iot/src/proto/message_pb.erl @@ -385,7 +385,7 @@ encode_msg_service_metrics(#service_metrics{service_id = F1, metrics = F2, timeo encode_msg_data(Msg, TrUserData) -> encode_msg_data(Msg, <<>>, TrUserData). -encode_msg_data(#data{service_id = F1, metric = F2}, Bin, TrUserData) -> +encode_msg_data(#data{service_id = F1, device_uuid = F2, metric = F3}, Bin, TrUserData) -> B1 = if F1 == undefined -> Bin; true -> begin @@ -396,13 +396,23 @@ encode_msg_data(#data{service_id = F1, metric = F2}, Bin, TrUserData) -> end end end, - if F2 == undefined -> B1; + B2 = if F2 == undefined -> B1; + true -> + begin + TrF2 = id(F2, TrUserData), + case is_empty_string(TrF2) of + true -> B1; + false -> e_type_string(TrF2, <>, TrUserData) + end + end + end, + if F3 == undefined -> B2; true -> begin - TrF2 = id(F2, TrUserData), - case is_empty_string(TrF2) of - true -> B1; - false -> e_type_string(TrF2, <>, TrUserData) + TrF3 = id(F3, TrUserData), + case is_empty_string(TrF3) of + true -> B2; + false -> e_type_string(TrF3, <>, TrUserData) end end end. @@ -1306,56 +1316,63 @@ skip_32_service_metrics(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUs skip_64_service_metrics(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_service_metrics(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). +decode_msg_data(Bin, TrUserData) -> dfp_read_field_def_data(Bin, 0, 0, 0, id(<<>>, TrUserData), id(<<>>, TrUserData), id(<<>>, TrUserData), TrUserData). -dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, _) -> #data{service_id = F@_1, metric = F@_2}; -dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, TrUserData). +dfp_read_field_def_data(<<10, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_data_service_id(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +dfp_read_field_def_data(<<18, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_data_device_uuid(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +dfp_read_field_def_data(<<26, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> d_field_data_metric(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +dfp_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #data{service_id = F@_1, device_uuid = F@_2, metric = F@_3}; +dfp_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dg_read_field_def_data(Other, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, TrUserData) -> +dg_read_field_def_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 32 - 7 -> dg_read_field_def_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +dg_read_field_def_data(<<0:1, X:7, Rest/binary>>, N, Acc, _, F@_1, F@_2, F@_3, TrUserData) -> Key = X bsl N + Acc, case Key of - 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); - 18 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, TrUserData); + 10 -> d_field_data_service_id(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); + 18 -> d_field_data_device_uuid(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); + 26 -> d_field_data_metric(Rest, 0, 0, 0, F@_1, F@_2, F@_3, TrUserData); _ -> case Key band 7 of - 0 -> skip_varint_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 1 -> skip_64_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 2 -> skip_length_delimited_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 3 -> skip_group_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData); - 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, TrUserData) + 0 -> skip_varint_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 1 -> skip_64_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 2 -> skip_length_delimited_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 3 -> skip_group_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData); + 5 -> skip_32_data(Rest, 0, 0, Key bsr 3, F@_1, F@_2, F@_3, TrUserData) end end; -dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, _) -> #data{service_id = F@_1, metric = F@_2}. +dg_read_field_def_data(<<>>, 0, 0, _, F@_1, F@_2, F@_3, _) -> #data{service_id = F@_1, device_uuid = F@_2, metric = F@_3}. -d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, TrUserData) -> +d_field_data_service_id(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_data_service_id(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +d_field_data_service_id(<<0:1, X:7, Rest/binary>>, N, Acc, F, _, F@_2, F@_3, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, NewFValue, F@_2, F@_3, TrUserData). -d_field_data_metric(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> d_field_data_metric(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -d_field_data_metric(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, TrUserData) -> +d_field_data_device_uuid(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_data_device_uuid(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +d_field_data_device_uuid(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, _, F@_3, TrUserData) -> {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, - dfp_read_field_def_data(RestF, 0, 0, F, F@_1, NewFValue, TrUserData). + dfp_read_field_def_data(RestF, 0, 0, F, F@_1, NewFValue, F@_3, TrUserData). -skip_varint_data(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> skip_varint_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData); -skip_varint_data(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). +d_field_data_metric(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> d_field_data_metric(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +d_field_data_metric(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, _, TrUserData) -> + {NewFValue, RestF} = begin Len = X bsl N + Acc, <> = Rest, Bytes2 = binary:copy(Bytes), {id(Bytes2, TrUserData), Rest2} end, + dfp_read_field_def_data(RestF, 0, 0, F, F@_1, F@_2, NewFValue, TrUserData). -skip_length_delimited_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) when N < 57 -> skip_length_delimited_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, TrUserData); -skip_length_delimited_data(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, TrUserData) -> +skip_varint_data(<<1:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> skip_varint_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData); +skip_varint_data(<<0:1, _:7, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). + +skip_length_delimited_data(<<1:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) when N < 57 -> skip_length_delimited_data(Rest, N + 7, X bsl N + Acc, F, F@_1, F@_2, F@_3, TrUserData); +skip_length_delimited_data(<<0:1, X:7, Rest/binary>>, N, Acc, F, F@_1, F@_2, F@_3, TrUserData) -> Length = X bsl N + Acc, <<_:Length/binary, Rest2/binary>> = Rest, - dfp_read_field_def_data(Rest2, 0, 0, F, F@_1, F@_2, TrUserData). + dfp_read_field_def_data(Rest2, 0, 0, F, F@_1, F@_2, F@_3, TrUserData). -skip_group_data(Bin, _, Z2, FNum, F@_1, F@_2, TrUserData) -> +skip_group_data(Bin, _, Z2, FNum, F@_1, F@_2, F@_3, TrUserData) -> {_, Rest} = read_group(Bin, FNum), - dfp_read_field_def_data(Rest, 0, Z2, FNum, F@_1, F@_2, TrUserData). + dfp_read_field_def_data(Rest, 0, Z2, FNum, F@_1, F@_2, F@_3, TrUserData). -skip_32_data(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). +skip_32_data(<<_:32, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). -skip_64_data(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, TrUserData). +skip_64_data(<<_:64, Rest/binary>>, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData) -> dfp_read_field_def_data(Rest, Z1, Z2, F, F@_1, F@_2, F@_3, TrUserData). decode_msg_ping(Bin, TrUserData) -> dfp_read_field_def_ping(Bin, @@ -2011,11 +2028,15 @@ merge_msg_service_metrics(#service_metrics{service_id = PFservice_id, metrics = end}. -compile({nowarn_unused_function,merge_msg_data/3}). -merge_msg_data(#data{service_id = PFservice_id, metric = PFmetric}, #data{service_id = NFservice_id, metric = NFmetric}, _) -> +merge_msg_data(#data{service_id = PFservice_id, device_uuid = PFdevice_uuid, metric = PFmetric}, #data{service_id = NFservice_id, device_uuid = NFdevice_uuid, metric = NFmetric}, _) -> #data{service_id = if NFservice_id =:= undefined -> PFservice_id; true -> NFservice_id end, + device_uuid = + if NFdevice_uuid =:= undefined -> PFdevice_uuid; + true -> NFdevice_uuid + end, metric = if NFmetric =:= undefined -> PFmetric; true -> NFmetric @@ -2292,12 +2313,15 @@ v_msg_service_metrics(X, Path, _TrUserData) -> mk_type_error({expected_msg, serv -compile({nowarn_unused_function,v_msg_data/3}). -dialyzer({nowarn_function,v_msg_data/3}). -v_msg_data(#data{service_id = F1, metric = F2}, Path, TrUserData) -> +v_msg_data(#data{service_id = F1, device_uuid = F2, metric = F3}, Path, TrUserData) -> if F1 == undefined -> ok; true -> v_type_string(F1, [service_id | Path], TrUserData) end, if F2 == undefined -> ok; - true -> v_type_string(F2, [metric | Path], TrUserData) + true -> v_type_string(F2, [device_uuid | Path], TrUserData) + end, + if F3 == undefined -> ok; + true -> v_type_string(F3, [metric | Path], TrUserData) end, ok; v_msg_data(X, Path, _TrUserData) -> mk_type_error({expected_msg, data}, X, Path). @@ -2519,7 +2543,10 @@ get_msg_defs() -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metrics, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]}, - {{msg, data}, [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metric, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]}, + {{msg, data}, + [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, + #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, + #field{name = metric, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]}, {{msg, ping}, [#field{name = adcode, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = boot_time, fnum = 2, rnum = 3, type = uint32, occurrence = optional, opts = []}, @@ -2601,7 +2628,10 @@ find_msg_def(service_metrics) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metrics, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, #field{name = timeout, fnum = 3, rnum = 4, type = uint32, occurrence = optional, opts = []}]; -find_msg_def(data) -> [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = metric, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}]; +find_msg_def(data) -> + [#field{name = service_id, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, + #field{name = device_uuid, fnum = 2, rnum = 3, type = string, occurrence = optional, opts = []}, + #field{name = metric, fnum = 3, rnum = 4, type = string, occurrence = optional, opts = []}]; find_msg_def(ping) -> [#field{name = adcode, fnum = 1, rnum = 2, type = string, occurrence = optional, opts = []}, #field{name = boot_time, fnum = 2, rnum = 3, type = uint32, occurrence = optional, opts = []},