Merge pull request 'fix_location_influxdb_mst' (#2) from fix_location_influxdb_mst into master
Reviewed-on: http://101.43.184.190:3000/alc/iot/pulls/2
This commit is contained in:
commit
84ceab0b5d
@ -87,6 +87,7 @@
|
||||
-record(north_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
dynamic_location_code :: binary(),
|
||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
||||
fields :: [{K :: binary(), V :: any()}],
|
||||
timestamp = 0 :: integer()
|
||||
@ -96,14 +97,14 @@
|
||||
-record(event_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
dynamic_location_code :: binary(),
|
||||
event_type :: integer(),
|
||||
params :: map()
|
||||
}).
|
||||
|
||||
%% 发送数据
|
||||
-record(post_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
||||
body :: binary() | list()
|
||||
%% 设备数据
|
||||
-record(device_data, {
|
||||
tags :: map(),
|
||||
val :: any(),
|
||||
timestamp :: integer()
|
||||
}).
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/2]).
|
||||
-export([get_pid/1, forward/4, get_stat/0]).
|
||||
-export([get_pid/1, forward/5, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
@ -42,9 +42,9 @@
|
||||
get_pid(Name) when is_atom(Name) ->
|
||||
whereis(Name).
|
||||
|
||||
-spec forward(Pid :: pid(), LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(Pid, LocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(Pid, {forward, LocationCode, EventType, Params}).
|
||||
-spec forward(Pid :: pid(), LocationCode :: binary(), RealLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(Pid, LocationCode, RealLocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_binary(RealLocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(Pid, {forward, LocationCode, RealLocationCode, EventType, Params}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
@ -82,8 +82,8 @@ callback_mode() ->
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
|
||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params},
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case FlightNum < PoolSize of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
@ -159,12 +159,12 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
%%%===================================================================
|
||||
|
||||
-spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return().
|
||||
do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
|
||||
do_post(PostmanPid, #event_data{id = Id, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
|
||||
Data = #{
|
||||
<<"version">> => <<"1.0">>,
|
||||
<<"event_type">> => EventType,
|
||||
<<"params">> => Params
|
||||
},
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
||||
PostmanPid ! {post, self(), {Id, Body}},
|
||||
ok.
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_pid/0, forward/3, get_stat/0]).
|
||||
-export([get_pid/0, forward/4, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
@ -46,9 +46,9 @@
|
||||
get_pid() ->
|
||||
whereis(?MODULE).
|
||||
|
||||
-spec forward(LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(LocationCode, EventType, Params) when is_binary(LocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, EventType, Params}).
|
||||
-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
forward(LocationCode, DynamicLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType), is_map(Params) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, EventType, Params}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
@ -92,8 +92,8 @@ callback_mode() ->
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params},
|
||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) ->
|
||||
EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params},
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case FlightNum < PoolSize of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
@ -176,7 +176,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
||||
%%%===================================================================
|
||||
|
||||
-spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return().
|
||||
do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventType,
|
||||
do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType,
|
||||
params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}},
|
||||
#state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) ->
|
||||
|
||||
@ -190,6 +190,7 @@ do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventTyp
|
||||
|
||||
DeviceInfo = #{
|
||||
<<"location">> => LocationCode,
|
||||
<<"dynamic_location">> => DynamicLocationCode,
|
||||
<<"category">> => EventCode,
|
||||
<<"description">> => Description,
|
||||
<<"occurrenceTime">> => Datetime,
|
||||
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_pid/0, forward/3, get_stat/0]).
|
||||
-export([get_pid/0, forward/4, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
@ -46,9 +46,9 @@
|
||||
get_pid() ->
|
||||
whereis(?MODULE).
|
||||
|
||||
-spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}).
|
||||
-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
forward(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
@ -91,8 +91,8 @@ callback_mode() ->
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) ->
|
||||
mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}),
|
||||
handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) ->
|
||||
mnesia_queue:insert(#north_data{location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}),
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case StateName =:= connected andalso not IsBusy of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
@ -228,16 +228,17 @@ create_postman(Opts) ->
|
||||
mqtt_postman:start_link(PostmanOpts, Topic, Qos).
|
||||
|
||||
-spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return().
|
||||
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
|
||||
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
|
||||
Data = #{
|
||||
<<"version">> => <<"1.0">>,
|
||||
<<"location_code">> => LocationCode,
|
||||
<<"dynamic_location_code">> => DynamicLocationCode,
|
||||
<<"ts">> => Timestamp,
|
||||
<<"properties">> => Fields
|
||||
},
|
||||
try
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}
|
||||
PostmanPid ! {post, self(), {Id, Body}}
|
||||
catch _:_ ->
|
||||
self() ! {ack, Id, <<"json error">>}
|
||||
end.
|
||||
@ -63,6 +63,32 @@ handle_request("POST", "/device/activate", _, #{<<"host_id">> := HostId, <<"devi
|
||||
end
|
||||
end;
|
||||
|
||||
%% 重新加载对应的主机信息
|
||||
handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) ->
|
||||
lager:debug("[device_handler] query device uuid: ~p, params: ~p", [DeviceUUID, Params]),
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
undefined ->
|
||||
{ok, 200, iot_util:json_error(404, <<"device not found">>)};
|
||||
DevicePid when is_pid(DevicePid) ->
|
||||
Tags = maps:get(<<"tags">>, Params, #{}),
|
||||
StartTs = maps:get(<<"start_ts">>, Params, 0),
|
||||
StopTs = maps:get(<<"stop_ts">>, Params, 0),
|
||||
Limit = maps:get(<<"limit">>, Params, 0),
|
||||
|
||||
case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit)
|
||||
andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of
|
||||
|
||||
true ->
|
||||
{ok, DeviceDataList} = iot_device:query(DevicePid, Tags, StartTs, StopTs, Limit),
|
||||
DataItems = lists:map(fun(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) ->
|
||||
#{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp}
|
||||
end, DeviceDataList),
|
||||
{ok, 200, iot_util:json_data(DataItems)};
|
||||
false ->
|
||||
{ok, 200, iot_util:json_error(404, <<"invalid params">>)}
|
||||
end
|
||||
end;
|
||||
|
||||
handle_request(_, Path, _, _) ->
|
||||
Path1 = list_to_binary(Path),
|
||||
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
||||
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
|
||||
|
||||
@ -71,6 +71,7 @@ write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement)
|
||||
end, NFieldsList),
|
||||
Precision = influx_client:get_precision(Timestamp),
|
||||
|
||||
%poolboy:transaction(influx_pool_backup, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end);
|
||||
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end);
|
||||
false ->
|
||||
ok
|
||||
|
||||
@ -16,11 +16,11 @@
|
||||
-spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return().
|
||||
route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
case redis_client:hget(RouterUUID, <<"location_code">>) of
|
||||
{ok, undefined} ->
|
||||
case redis_client:hgetall(RouterUUID) of
|
||||
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
|
||||
iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params);
|
||||
{ok, _} ->
|
||||
lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]);
|
||||
{ok, LocationCode} when is_binary(LocationCode) ->
|
||||
iot_jinzhi_endpoint:forward(LocationCode, EventType, Params);
|
||||
{error, Reason} ->
|
||||
lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason])
|
||||
end.
|
||||
@ -12,8 +12,8 @@
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([get_name/1, get_pid/1]).
|
||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]).
|
||||
-export([get_name/1, get_pid/1, serialize/1]).
|
||||
-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
@ -22,12 +22,17 @@
|
||||
-define(DEVICE_AUTH_DENIED, 0).
|
||||
-define(DEVICE_AUTH_AUTHED, 1).
|
||||
|
||||
%% 存储数据的上限
|
||||
-define(MAX_SIZE, 2000).
|
||||
|
||||
%% 状态
|
||||
-define(STATE_DENIED, denied).
|
||||
-define(STATE_ACTIVATED, activated).
|
||||
|
||||
-record(state, {
|
||||
device_uuid :: binary(),
|
||||
%% 用来保存数据,作为存储在influxdb里面的数据的备份
|
||||
queue = queue:new(),
|
||||
status = ?DEVICE_OFFLINE
|
||||
}).
|
||||
|
||||
@ -35,6 +40,27 @@
|
||||
%%% API
|
||||
%%%===================================================================
|
||||
|
||||
%% 格式化efka上传的数据格式
|
||||
%"fields": [
|
||||
%{
|
||||
% "key": "test"
|
||||
% "value": 124,
|
||||
% "unit": "U",
|
||||
% "type": "AI:遥测值,DI:遥信值,SOE:事件",
|
||||
% "timestamp": int
|
||||
%}
|
||||
%],
|
||||
-spec serialize(FieldsList :: [map()]) -> [Val :: map()].
|
||||
serialize(FieldsList) when is_list(FieldsList) ->
|
||||
lists:flatmap(fun serialize0/1, FieldsList).
|
||||
serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> ->
|
||||
Values = maps:remove(<<"key">>, Fields),
|
||||
%S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))),
|
||||
%[<<"base64:", S/binary>>];
|
||||
[#{Key => Values}];
|
||||
serialize0(_) ->
|
||||
[].
|
||||
|
||||
-spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}.
|
||||
is_alive(DeviceUUID) when is_binary(DeviceUUID) ->
|
||||
case iot_device:get_pid(DeviceUUID) of
|
||||
@ -77,6 +103,14 @@ reload(Pid) when is_pid(Pid) ->
|
||||
auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) ->
|
||||
gen_statem:cast(Pid, {auth, Auth}).
|
||||
|
||||
-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return().
|
||||
data(Pid, DataList) when is_pid(Pid), is_list(DataList) ->
|
||||
gen_statem:cast(Pid, {data, DataList}).
|
||||
|
||||
-spec query(Pid :: pid(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [#device_data{}]}.
|
||||
query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 ->
|
||||
gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
@ -164,6 +198,38 @@ handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) ->
|
||||
{stop, normal, State}
|
||||
end;
|
||||
|
||||
%% 向设备中追加数据
|
||||
handle_event(cast, {data, DataList}, _, State = #state{queue = Q}) ->
|
||||
NQ = lists:foldl(fun(Data, Q0) ->
|
||||
case queue:len(Q0) > ?MAX_SIZE of
|
||||
true ->
|
||||
{_, Q1} = queue:out(Q0),
|
||||
queue:in(Data, Q1);
|
||||
false ->
|
||||
queue:in(Data, Q0)
|
||||
end
|
||||
end, Q, DataList),
|
||||
|
||||
{keep_state, State#state{queue = NQ}};
|
||||
|
||||
%% 查询device里面的数据
|
||||
handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, State = #state{queue = Q}) ->
|
||||
L = queue:to_list(Q),
|
||||
|
||||
%% 过滤
|
||||
L1 = filter_timestamp(L, StartTs, StopTs),
|
||||
L2 = filter_tags(L1, Tags),
|
||||
|
||||
%% 处理数据截取
|
||||
L3 = lists:reverse(L2),
|
||||
L4 = case Limit =:= 0 of
|
||||
true ->
|
||||
L3;
|
||||
false ->
|
||||
lists:sublist(L3, 1, Limit)
|
||||
end,
|
||||
{keep_state, State, [{reply, From, {ok, L4}}]};
|
||||
|
||||
%% 处理授权
|
||||
handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) ->
|
||||
case {StateName, Auth} of
|
||||
@ -217,4 +283,26 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt
|
||||
<<"timestamp">> => Timestamp
|
||||
}],
|
||||
iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp),
|
||||
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
|
||||
lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]).
|
||||
|
||||
%% 过滤时间
|
||||
-spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list().
|
||||
filter_timestamp(L, 0, 0) ->
|
||||
L;
|
||||
filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs ->
|
||||
[];
|
||||
filter_timestamp(L, StartTs0, StopTs0) ->
|
||||
%% 时间的比较统一到毫秒级别
|
||||
StartTs = StartTs0 * 1000,
|
||||
StopTs = StopTs0 * 1000,
|
||||
lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L).
|
||||
|
||||
%% 过滤标签
|
||||
-spec filter_tags(L :: list(), Tags :: map()) -> list().
|
||||
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
|
||||
L;
|
||||
filter_tags(L, Tags) when map_size(Tags) =:= 0 ->
|
||||
lists:filter(fun(#device_data{tags = Tags0}) ->
|
||||
lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags))
|
||||
end, L).
|
||||
|
||||
|
||||
@ -584,7 +584,14 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName
|
||||
NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID},
|
||||
influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp),
|
||||
|
||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE);
|
||||
iot_device:change_status(DevicePid, ?DEVICE_ONLINE),
|
||||
|
||||
%% 将数据写入到设备的内存缓存中
|
||||
DeviceVals = iot_device:serialize(FieldsList),
|
||||
%% 时间同样精确到毫秒
|
||||
FormattedTimestamp = iot_util:format_timestamp(Timestamp),
|
||||
DeviceDataList = lists:map(fun(Val) -> #device_data{tags = NTags, timestamp = FormattedTimestamp, val = Val} end, DeviceVals),
|
||||
iot_device:data(DevicePid, DeviceDataList);
|
||||
false ->
|
||||
lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags])
|
||||
end
|
||||
|
||||
@ -16,11 +16,11 @@
|
||||
-spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
case redis_client:hget(RouterUUID, <<"location_code">>) of
|
||||
{ok, undefined} ->
|
||||
case redis_client:hgetall(RouterUUID) of
|
||||
{ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) ->
|
||||
iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp);
|
||||
{ok, _} ->
|
||||
lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [RouterUUID, Fields]);
|
||||
{ok, LocationCode} when is_binary(LocationCode) ->
|
||||
iot_zd_endpoint:forward(LocationCode, Fields, Timestamp);
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [RouterUUID, Reason, Fields])
|
||||
end.
|
||||
@ -14,6 +14,23 @@
|
||||
-export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]).
|
||||
-export([json_data/1, json_error/2]).
|
||||
-export([queue_limited_in/3, assert_call/2, assert/2]).
|
||||
-export([format_timestamp/1]).
|
||||
|
||||
%% 格式化时间
|
||||
-spec format_timestamp(Timestamp :: integer()) -> integer().
|
||||
format_timestamp(Timestamp) when is_integer(Timestamp) ->
|
||||
case length(integer_to_list(Timestamp)) of
|
||||
10 ->
|
||||
Timestamp * 1000;
|
||||
13 ->
|
||||
Timestamp;
|
||||
16 ->
|
||||
Timestamp div 1000;
|
||||
19 ->
|
||||
Timestamp div 1000_000;
|
||||
_ ->
|
||||
timestamp()
|
||||
end.
|
||||
|
||||
%% 时间,精确到毫秒
|
||||
timestamp() ->
|
||||
@ -65,9 +82,8 @@ chunks0([Hd | Tail], Size, Num, Target, AccTarget) ->
|
||||
chunks0(Tail, Size, Num - 1, [Hd | Target], AccTarget).
|
||||
|
||||
json_data(Data) ->
|
||||
jiffy:encode(#{
|
||||
<<"result">> => Data
|
||||
}, [force_utf8]).
|
||||
Json = jiffy:encode(#{<<"result">> => Data}, [force_utf8]),
|
||||
iolist_to_binary(Json).
|
||||
|
||||
json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) ->
|
||||
jiffy:encode(#{
|
||||
|
||||
@ -30,7 +30,7 @@ test_influxdb() ->
|
||||
end, lists:seq(1, 100)).
|
||||
|
||||
test_mqtt() ->
|
||||
iot_zd_endpoint:forward(<<"location_code_test123">>, [
|
||||
iot_zd_endpoint:forward(<<"location_code_test123">>, <<"location_code_test123">>, [
|
||||
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
|
||||
#{<<"key">> => <<"age">>, <<"value">> => 30},
|
||||
#{<<"key">> => <<"flow">>, <<"value">> => 30}
|
||||
|
||||
@ -74,7 +74,7 @@ handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) ->
|
||||
handle_info({post, ReceiverPid, {Id, Body}}, State = #state{url = Url}) ->
|
||||
Headers = [
|
||||
{<<"content-type">>, <<"application/json">>}
|
||||
],
|
||||
|
||||
@ -97,8 +97,7 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig
|
||||
end;
|
||||
|
||||
%% 转发信息
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
|
||||
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
|
||||
handle_info({post, ReceiverPid, {Id, Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) ->
|
||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||
ok ->
|
||||
|
||||
@ -80,7 +80,7 @@ handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
||||
handle_info({post, ReceiverPid, {Id, Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
||||
case catch mysql_provider:insert(ConnPid, Table, Fields, false) of
|
||||
ok ->
|
||||
ReceiverPid ! {ack, Id};
|
||||
|
||||
@ -87,6 +87,16 @@
|
||||
{port, 8086},
|
||||
{token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
|
||||
]
|
||||
},
|
||||
|
||||
%% 备份库
|
||||
{influx_pool_backup,
|
||||
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
|
||||
[
|
||||
{host, "39.98.184.67"},
|
||||
{port, 8086},
|
||||
{token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>}
|
||||
]
|
||||
}
|
||||
|
||||
]}
|
||||
|
||||
@ -76,6 +76,16 @@
|
||||
{port, 8086},
|
||||
{token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>}
|
||||
]
|
||||
},
|
||||
|
||||
%% influxdb备份库
|
||||
{influx_pool_backup,
|
||||
[{size, 100}, {max_overflow, 200}, {worker_module, influx_client}],
|
||||
[
|
||||
{host, "172.19.0.4"},
|
||||
{port, 8086},
|
||||
{token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>}
|
||||
]
|
||||
}
|
||||
|
||||
]}
|
||||
|
||||
0
docs/jinzhi_http.md
Normal file
0
docs/jinzhi_http.md
Normal file
@ -21,6 +21,8 @@
|
||||
{
|
||||
"version": "1.0",
|
||||
"location_code": "string",
|
||||
// 日期: 2024-06-20, <<新增字段>>
|
||||
"dynamic_location_code": "string",
|
||||
"ts ": 1688606685,
|
||||
"properties": [
|
||||
{
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user