From 0b670819b37cfbbbfb097eb7ab5700566799ec04 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 20 Jun 2024 22:48:07 +0800 Subject: [PATCH 01/11] fix north_data --- apps/iot/include/iot.hrl | 2 +- apps/iot/src/endpoint/iot_http_endpoint.erl | 2 +- apps/iot/src/endpoint/iot_zd_endpoint.erl | 17 +++++++++-------- apps/iot/src/iot_router.erl | 8 ++++---- apps/iot/src/mocker/iot_mock.erl | 2 +- apps/iot/src/postman/mqtt_postman.erl | 3 +-- 6 files changed, 17 insertions(+), 17 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 39cd286..8aaf1af 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -87,6 +87,7 @@ -record(north_data, { id = 0 :: integer(), location_code :: binary(), + real_location_code :: binary(), %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] fields :: [{K :: binary(), V :: any()}], timestamp = 0 :: integer() @@ -103,7 +104,6 @@ %% 发送数据 -record(post_data, { id = 0 :: integer(), - location_code :: binary(), %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] body :: binary() | list() }). \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_http_endpoint.erl b/apps/iot/src/endpoint/iot_http_endpoint.erl index 03d7df9..8a7eeba 100644 --- a/apps/iot/src/endpoint/iot_http_endpoint.erl +++ b/apps/iot/src/endpoint/iot_http_endpoint.erl @@ -166,5 +166,5 @@ do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_typ <<"params">> => Params }, Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, + PostmanPid ! {post, self(), #post_data{id = Id, body = Body}}, ok. \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index eeeaefd..799d976 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([get_pid/0, forward/3, get_stat/0]). +-export([get_pid/0, forward/4, get_stat/0]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -46,9 +46,9 @@ get_pid() -> whereis(?MODULE). --spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). -forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> - gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}). +-spec forward(LocationCode :: binary(), RealLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(LocationCode, RealLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(RealLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(?MODULE, {forward, LocationCode, RealLocationCode, Fields, Timestamp}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() -> @@ -91,8 +91,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> - mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), +handle_event(cast, {forward, LocationCode, RealLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> + mnesia_queue:insert(#north_data{location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}), %% 避免不必要的内部消息 Actions = case StateName =:= connected andalso not IsBusy of true -> [{next_event, info, fetch_next}]; @@ -228,16 +228,17 @@ create_postman(Opts) -> mqtt_postman:start_link(PostmanOpts, Topic, Qos). -spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). -do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> +do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> Data = #{ <<"version">> => <<"1.0">>, <<"location_code">> => LocationCode, + <<"real_location_code">> => RealLocationCode, <<"ts">> => Timestamp, <<"properties">> => Fields }, try Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}} + PostmanPid ! {post, self(), #post_data{id = Id, body = Body}} catch _:_ -> self() ! {ack, Id, <<"json error">>} end. \ No newline at end of file diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index 9364ec6..5e9a1cb 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -16,11 +16,11 @@ -spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) -> %% 查找终端设备对应的点位信息 - case redis_client:hget(RouterUUID, <<"location_code">>) of - {ok, undefined} -> + case redis_client:hgetall(RouterUUID) of + {ok, #{<<"location_code">> := LocationCode, <<"real_location_code">> := RealLocationCode}} when is_binary(LocationCode) -> + iot_zd_endpoint:forward(LocationCode, RealLocationCode, Fields, Timestamp); + {ok, _} -> lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [RouterUUID, Fields]); - {ok, LocationCode} when is_binary(LocationCode) -> - iot_zd_endpoint:forward(LocationCode, Fields, Timestamp); {error, Reason} -> lager:warning("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [RouterUUID, Reason, Fields]) end. \ No newline at end of file diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index c5493c3..21a280f 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -30,7 +30,7 @@ test_influxdb() -> end, lists:seq(1, 100)). test_mqtt() -> - iot_zd_endpoint:forward(<<"location_code_test123">>, [ + iot_zd_endpoint:forward({<<"location_code_test123">>, <<"location_code_test123">>}, [ #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, #{<<"key">> => <<"age">>, <<"value">> => 30}, #{<<"key">> => <<"flow">>, <<"value">> => 30} diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 6e0177b..55be2cb 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -97,8 +97,7 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig end; %% 转发信息 -handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> - Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), +handle_info({post, ReceiverPid, #post_data{id = Id, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) -> lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> From 89214c6918f680b378549278ef3a90c5c8449454 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 20 Jun 2024 22:53:23 +0800 Subject: [PATCH 02/11] remove post_data --- apps/iot/include/iot.hrl | 7 ------- apps/iot/src/endpoint/iot_http_endpoint.erl | 4 ++-- apps/iot/src/endpoint/iot_zd_endpoint.erl | 2 +- apps/iot/src/mocker/iot_mock.erl | 2 +- apps/iot/src/postman/http_postman.erl | 2 +- apps/iot/src/postman/mqtt_postman.erl | 2 +- apps/iot/src/postman/mysql_postman.erl | 2 +- 7 files changed, 7 insertions(+), 14 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 8aaf1af..53e214f 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -99,11 +99,4 @@ location_code :: binary(), event_type :: integer(), params :: map() -}). - -%% 发送数据 --record(post_data, { - id = 0 :: integer(), - %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] - body :: binary() | list() }). \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_http_endpoint.erl b/apps/iot/src/endpoint/iot_http_endpoint.erl index 8a7eeba..8565a17 100644 --- a/apps/iot/src/endpoint/iot_http_endpoint.erl +++ b/apps/iot/src/endpoint/iot_http_endpoint.erl @@ -159,12 +159,12 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== -spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return(). -do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) -> +do_post(PostmanPid, #event_data{id = Id, event_type = EventType, params = Params}) when is_pid(PostmanPid) -> Data = #{ <<"version">> => <<"1.0">>, <<"event_type">> => EventType, <<"params">> => Params }, Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, body = Body}}, + PostmanPid ! {post, self(), {Id, Body}}, ok. \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index 799d976..202664a 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -238,7 +238,7 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, real_loca }, try Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, body = Body}} + PostmanPid ! {post, self(), {Id, Body}} catch _:_ -> self() ! {ack, Id, <<"json error">>} end. \ No newline at end of file diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index 21a280f..4c2a26b 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -30,7 +30,7 @@ test_influxdb() -> end, lists:seq(1, 100)). test_mqtt() -> - iot_zd_endpoint:forward({<<"location_code_test123">>, <<"location_code_test123">>}, [ + iot_zd_endpoint:forward(<<"location_code_test123">>, <<"location_code_test123">>, [ #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, #{<<"key">> => <<"age">>, <<"value">> => 30}, #{<<"key">> => <<"flow">>, <<"value">> => 30} diff --git a/apps/iot/src/postman/http_postman.erl b/apps/iot/src/postman/http_postman.erl index edbe9ba..ef2b095 100644 --- a/apps/iot/src/postman/http_postman.erl +++ b/apps/iot/src/postman/http_postman.erl @@ -74,7 +74,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) -> +handle_info({post, ReceiverPid, {Id, Body}}, State = #state{url = Url}) -> Headers = [ {<<"content-type">>, <<"application/json">>} ], diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 55be2cb..e73bb1f 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -97,7 +97,7 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig end; %% 转发信息 -handle_info({post, ReceiverPid, #post_data{id = Id, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) -> +handle_info({post, ReceiverPid, {Id, Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) -> lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok -> diff --git a/apps/iot/src/postman/mysql_postman.erl b/apps/iot/src/postman/mysql_postman.erl index 9217397..dacd0ee 100644 --- a/apps/iot/src/postman/mysql_postman.erl +++ b/apps/iot/src/postman/mysql_postman.erl @@ -80,7 +80,7 @@ handle_cast(_Request, State = #state{}) -> {noreply, NewState :: #state{}} | {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). -handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) -> +handle_info({post, ReceiverPid, {Id, Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) -> case catch mysql_provider:insert(ConnPid, Table, Fields, false) of ok -> ReceiverPid ! {ack, Id}; From bab0089190d2852fb2e25472c4805a742c0194a4 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 20 Jun 2024 23:03:20 +0800 Subject: [PATCH 03/11] fix jinzhi --- apps/iot/include/iot.hrl | 1 + apps/iot/src/endpoint/iot_http_endpoint.erl | 12 ++++++------ apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 15 ++++++++------- apps/iot/src/iot_ai_router.erl | 8 ++++---- 4 files changed, 19 insertions(+), 17 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 53e214f..efeef44 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -97,6 +97,7 @@ -record(event_data, { id = 0 :: integer(), location_code :: binary(), + real_location_code :: binary(), event_type :: integer(), params :: map() }). \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_http_endpoint.erl b/apps/iot/src/endpoint/iot_http_endpoint.erl index 8565a17..d68d38a 100644 --- a/apps/iot/src/endpoint/iot_http_endpoint.erl +++ b/apps/iot/src/endpoint/iot_http_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/2]). --export([get_pid/1, forward/4, get_stat/0]). +-export([get_pid/1, forward/5, get_stat/0]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -42,9 +42,9 @@ get_pid(Name) when is_atom(Name) -> whereis(Name). --spec forward(Pid :: pid(), LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -forward(Pid, LocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_integer(EventType), is_map(Params) -> - gen_statem:cast(Pid, {forward, LocationCode, EventType, Params}). +-spec forward(Pid :: pid(), LocationCode :: binary(), RealLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). +forward(Pid, LocationCode, RealLocationCode, EventType, Params) when is_pid(Pid), is_binary(LocationCode), is_binary(RealLocationCode), is_integer(EventType), is_map(Params) -> + gen_statem:cast(Pid, {forward, LocationCode, RealLocationCode, EventType, Params}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() -> @@ -82,8 +82,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}, +handle_event(cast, {forward, LocationCode, RealLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> + EventData = #event_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, event_type = EventType, params = Params}, %% 避免不必要的内部消息 Actions = case FlightNum < PoolSize of true -> [{next_event, info, fetch_next}]; diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl index fd1a5d9..83a4cff 100644 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([get_pid/0, forward/3, get_stat/0]). +-export([get_pid/0, forward/4, get_stat/0]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -46,9 +46,9 @@ get_pid() -> whereis(?MODULE). --spec forward(LocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -forward(LocationCode, EventType, Params) when is_binary(LocationCode), is_integer(EventType), is_map(Params) -> - gen_statem:cast(?MODULE, {forward, LocationCode, EventType, Params}). +-spec forward(LocationCode :: binary(), RealLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). +forward(LocationCode, RealLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(RealLocationCode), is_integer(EventType), is_map(Params) -> + gen_statem:cast(?MODULE, {forward, LocationCode, RealLocationCode, EventType, Params}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() -> @@ -92,8 +92,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}, +handle_event(cast, {forward, LocationCode, RealLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> + EventData = #event_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, event_type = EventType, params = Params}, %% 避免不必要的内部消息 Actions = case FlightNum < PoolSize of true -> [{next_event, info, fetch_next}]; @@ -176,7 +176,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== -spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return(). -do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventType, +do_post(#event_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, event_type = EventType, params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}, #state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) -> @@ -190,6 +190,7 @@ do_post(#event_data{id = Id, location_code = LocationCode, event_type = EventTyp DeviceInfo = #{ <<"location">> => LocationCode, + <<"real_location">> => RealLocationCode, <<"category">> => EventCode, <<"description">> => Description, <<"occurrenceTime">> => Datetime, diff --git a/apps/iot/src/iot_ai_router.erl b/apps/iot/src/iot_ai_router.erl index 964dba3..b095c23 100644 --- a/apps/iot/src/iot_ai_router.erl +++ b/apps/iot/src/iot_ai_router.erl @@ -16,11 +16,11 @@ -spec route_uuid(RouterUUID :: binary(), EventType :: integer(), Params :: map()) -> no_return(). route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) -> %% 查找终端设备对应的点位信息 - case redis_client:hget(RouterUUID, <<"location_code">>) of - {ok, undefined} -> + case redis_client:hgetall(RouterUUID) of + {ok, #{<<"location_code">> := LocationCode, <<"real_location_code">> := RealLocationCode}} when is_binary(LocationCode), is_binary(RealLocationCode) -> + iot_jinzhi_endpoint:forward(LocationCode, RealLocationCode, EventType, Params); + {ok, _} -> lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); - {ok, LocationCode} when is_binary(LocationCode) -> - iot_jinzhi_endpoint:forward(LocationCode, EventType, Params); {error, Reason} -> lager:debug("[iot_ai_router] the event_data hget location_code uuid: ~p, get error: ~p", [RouterUUID, Reason]) end. \ No newline at end of file From c8c95b68ce0988b0773139c5b02a07fb71746d67 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 21 Jun 2024 01:14:59 +0800 Subject: [PATCH 04/11] =?UTF-8?q?=E5=B0=86=E6=95=B0=E6=8D=AE=E5=AD=98?= =?UTF-8?q?=E5=82=A8=E5=88=B0=E8=AE=BE=E5=A4=87=E5=86=85=E5=AD=98=E4=B8=AD?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/include/iot.hrl | 7 +++ apps/iot/src/iot_device.erl | 92 +++++++++++++++++++++++++++++++++++-- apps/iot/src/iot_host.erl | 9 +++- apps/iot/src/iot_util.erl | 17 +++++++ docs/jinzhi_http.md | 0 docs/zhongdian_mqtt.md | 2 + 6 files changed, 123 insertions(+), 4 deletions(-) create mode 100644 docs/jinzhi_http.md diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index efeef44..124f60d 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -100,4 +100,11 @@ real_location_code :: binary(), event_type :: integer(), params :: map() +}). + +%% 设备数据 +-record(device_data, { + tags :: map(), + val :: any(), + timestamp :: integer() }). \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 3a09fed..8c0cafc 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -12,8 +12,8 @@ -behaviour(gen_statem). %% API --export([get_name/1, get_pid/1]). --export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2]). +-export([get_name/1, get_pid/1, serialize/1]). +-export([start_link/2, is_activated/1, is_alive/1, change_status/2, reload/1, auth/2, data/2, query/5]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -22,12 +22,17 @@ -define(DEVICE_AUTH_DENIED, 0). -define(DEVICE_AUTH_AUTHED, 1). +%% 存储数据的上限 +-define(MAX_SIZE, 2000). + %% 状态 -define(STATE_DENIED, denied). -define(STATE_ACTIVATED, activated). -record(state, { device_uuid :: binary(), + %% 用来保存数据,作为存储在influxdb里面的数据的备份 + queue = queue:new(), status = ?DEVICE_OFFLINE }). @@ -35,6 +40,26 @@ %%% API %%%=================================================================== +%% 格式化efka上传的数据格式 +%"fields": [ +%{ +% "key": "test" +% "value": 124, +% "unit": "U", +% "type": "AI:遥测值,DI:遥信值,SOE:事件", +% "timestamp": int +%} +%], +-spec serialize(FieldsList :: [map()]) -> [Val :: binary()]. +serialize(FieldsList) when is_list(FieldsList) -> + lists:flatmap(fun serialize0/1, FieldsList). +serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> -> + Values = maps:remove(<<"key">>, Fields), + S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))), + [<<"base64:", S/binary>>]; +serialize0(_) -> + []. + -spec is_alive(DeviceUUID :: binary()) -> error | {ok, Pid :: pid()}. is_alive(DeviceUUID) when is_binary(DeviceUUID) -> case iot_device:get_pid(DeviceUUID) of @@ -77,6 +102,13 @@ reload(Pid) when is_pid(Pid) -> auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> gen_statem:cast(Pid, {auth, Auth}). +-spec data(Pid :: pid(), DataList :: [#device_data{}]) -> no_return(). +data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> + gen_statem:cast(Pid, {data, DataList}). + +query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit) -> + gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}). + %% @doc Creates a gen_statem process which calls Module:init/1 to %% initialize. To ensure a synchronized start-up procedure, this %% function does not return until Module:init/1 has returned. @@ -164,6 +196,38 @@ handle_event(cast, reload, _, State = #state{device_uuid = DeviceUUID}) -> {stop, normal, State} end; +%% 向设备中追加数据 +handle_event(cast, {data, DataList}, _, State = #state{queue = Q}) -> + NQ = lists:foldl(fun(Data, Q0) -> + case queue:len(Q0) > ?MAX_SIZE of + true -> + {_, Q1} = queue:out(Q0), + queue:in(Data, Q1); + false -> + queue:in(Data, Q0) + end + end, Q, DataList), + + {keep_state, State#state{queue = NQ}}; + +%% 查询device里面的数据 +handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, State = #state{queue = Q}) -> + L = queue:to_list(Q), + + %% 过滤 + L1 = filter_timestamp(L, StartTs, StopTs), + L2 = filter_tags(L1, Tags), + + %% 处理数据截取 + L3 = lists:reverse(L2), + L4 = case Limit =:= 0 of + true -> + L3; + false -> + lists:sublist(L3, 1, Limit) + end, + {keep_state, State, [{reply, From, L4}]}; + %% 处理授权 handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) -> case {StateName, Auth} of @@ -217,4 +281,26 @@ report_event(DeviceUUID, NewStatus) when is_binary(DeviceUUID), is_integer(NewSt <<"timestamp">> => Timestamp }], iot_router:route_uuid(DeviceUUID, FieldsList, Timestamp), - lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). \ No newline at end of file + lager:debug("[iot_device] device_uuid: ~p, route fields: ~p", [DeviceUUID, FieldsList]). + +%% 过滤时间 +-spec filter_timestamp(L :: list(), StartTs :: integer(), StopTs :: integer()) -> list(). +filter_timestamp(L, 0, 0) -> + L; +filter_timestamp(_L, StartTs, StopTs) when StartTs > StopTs -> + []; +filter_timestamp(L, StartTs0, StopTs0) -> + %% 时间的比较统一到毫秒级别 + StartTs = StartTs0 * 1000, + StopTs = StopTs0 * 1000, + lists:filter(fun(#device_data{timestamp = Timestamp}) -> Timestamp >= StartTs andalso Timestamp =< StopTs end, L). + +%% 过滤标签 +-spec filter_tags(L :: list(), Tags :: map()) -> list(). +filter_tags(L, Tags) when map_size(Tags) =:= 0 -> + L; +filter_tags(L, Tags) when map_size(Tags) =:= 0 -> + lists:filter(fun(#device_data{tags = Tags0}) -> + lists:all(fun({TagName, TagVal}) -> maps:is_key(TagName, Tags0) andalso maps:get(TagName, Tags0) =:= TagVal end, maps:to_list(Tags)) + end, L). + diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 718a023..f92cec9 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -584,7 +584,14 @@ handle_data(#{<<"device_uuid">> := DeviceUUID, <<"service_name">> := ServiceName NTags = Tags#{<<"uuid">> => UUID, <<"service_name">> => ServiceName, <<"device_uuid">> => DeviceUUID}, influx_client:write_data(DeviceUUID, NTags, FieldsList, Timestamp), - iot_device:change_status(DevicePid, ?DEVICE_ONLINE); + iot_device:change_status(DevicePid, ?DEVICE_ONLINE), + + %% 将数据写入到设备的内存缓存中 + DeviceVals = iot_device:serialize(FieldsList), + %% 时间同样精确到毫秒 + FormattedTimestamp = iot_util:format_timestamp(Timestamp), + DeviceDataList = lists:map(fun(Val) -> #device_data{tags = NTags, timestamp = FormattedTimestamp, val = Val} end, DeviceVals), + iot_device:data(DevicePid, DeviceDataList); false -> lager:warning("[iot_host] host uuid: ~p, device_uuid: ~p not activated, fields: ~p, tags: ~p", [UUID, DeviceUUID, FieldsList, Tags]) end diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index 2b3404b..026816e 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -14,6 +14,23 @@ -export([step/3, chunks/2, rand_bytes/1, uuid/0, md5/1, parse_mapper/1]). -export([json_data/1, json_error/2]). -export([queue_limited_in/3, assert_call/2, assert/2]). +-export([format_timestamp/1]). + +%% 格式化时间 +-spec format_timestamp(Timestamp :: integer()) -> integer(). +format_timestamp(Timestamp) when is_integer(Timestamp) -> + case length(integer_to_list(Timestamp)) of + 10 -> + Timestamp * 1000; + 13 -> + Timestamp; + 16 -> + Timestamp div 1000; + 19 -> + Timestamp div 1000_000; + _ -> + timestamp() + end. %% 时间,精确到毫秒 timestamp() -> diff --git a/docs/jinzhi_http.md b/docs/jinzhi_http.md new file mode 100644 index 0000000..e69de29 diff --git a/docs/zhongdian_mqtt.md b/docs/zhongdian_mqtt.md index c055798..010aa02 100644 --- a/docs/zhongdian_mqtt.md +++ b/docs/zhongdian_mqtt.md @@ -21,6 +21,8 @@ { "version": "1.0", "location_code": "string", + // 日期: 2024-06-20, <<新增字段>> + "real_location_code": "string", "ts ": 1688606685, "properties": [ { From c8142256e8a6d6e62c006fdd59ef11f9cc7cf377 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 21 Jun 2024 01:31:51 +0800 Subject: [PATCH 05/11] =?UTF-8?q?=E5=A2=9E=E5=8A=A0=E8=AE=BE=E5=A4=87?= =?UTF-8?q?=E6=95=B0=E6=8D=AE=E6=9F=A5=E8=AF=A2=E6=8E=A5=E5=8F=A3?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- apps/iot/src/http_handler/device_handler.erl | 26 ++++++++++++++++++++ apps/iot/src/iot_device.erl | 10 +++++--- apps/iot/src/iot_util.erl | 5 ++-- 3 files changed, 34 insertions(+), 7 deletions(-) diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl index 3c4308c..d5dcf30 100644 --- a/apps/iot/src/http_handler/device_handler.erl +++ b/apps/iot/src/http_handler/device_handler.erl @@ -63,6 +63,32 @@ handle_request("POST", "/device/activate", _, #{<<"host_id">> := HostId, <<"devi end end; +%% 重新加载对应的主机信息 +handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := DeviceUUID}) when is_binary(DeviceUUID) -> + lager:debug("[device_handler] query device uuid: ~p, params: ~p", [DeviceUUID, Params]), + case iot_device:get_pid(DeviceUUID) of + undefined -> + {ok, 200, iot_util:json_error(404, <<"device not found">>)}; + DevicePid when is_pid(DevicePid) -> + Tags = maps:get(<<"tags">>, Params, #{}), + StartTs = maps:get(<<"start_ts">>, Params, 0), + StopTs = maps:get(<<"stop_ts">>, Params, 0), + Limit = maps:get(<<"limit">>, Params, 0), + + case is_map(Tags) andalso is_integer(StartTs) andalso is_integer(StopTs) andalso is_integer(Limit) + andalso StartTs >= 0 andalso StopTs >= 0 andalso Limit >= 0 of + + true -> + {ok, DeviceDataList} = iot_device:query(DevicePid, Tags, StartTs, StopTs, Limit), + DataItems = lists:map(fun(#device_data{tags = Tags, val = Val, timestamp = Timestamp}) -> + #{<<"tags">> => Tags, <<"val">> => Val, <<"timestamp">> => Timestamp} + end, DeviceDataList), + {ok, 200, iot_util:json_data(DataItems)}; + false -> + {ok, 200, iot_util:json_error(404, <<"invalid params">>)} + end + end; + handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 8c0cafc..08debcf 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -55,8 +55,9 @@ serialize(FieldsList) when is_list(FieldsList) -> lists:flatmap(fun serialize0/1, FieldsList). serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> -> Values = maps:remove(<<"key">>, Fields), - S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))), - [<<"base64:", S/binary>>]; + %S = base64:encode(iolist_to_binary(jiffy:encode(#{Key => Values}, [force_utf8]))), + %[<<"base64:", S/binary>>]; + [#{Key => Values}]; serialize0(_) -> []. @@ -106,7 +107,8 @@ auth(Pid, Auth) when is_pid(Pid), is_boolean(Auth) -> data(Pid, DataList) when is_pid(Pid), is_list(DataList) -> gen_statem:cast(Pid, {data, DataList}). -query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit) -> +-spec query(Pid :: pid(), Tags :: map(), StartTs :: integer(), StopTs :: integer(), Limit :: integer()) -> {ok, [#device_data{}]}. +query(Pid, Tags, StartTs, StopTs, Limit) when is_pid(Pid), is_map(Tags), is_integer(StartTs), is_integer(StopTs), is_integer(Limit), StartTs >= 0, StopTs >= 0, Limit >= 0 -> gen_statem:call(Pid, {query, Tags, StartTs, StopTs, Limit}). %% @doc Creates a gen_statem process which calls Module:init/1 to @@ -226,7 +228,7 @@ handle_event({call, From}, {query, Tags, StartTs, StopTs, Limit}, _StateName, St false -> lists:sublist(L3, 1, Limit) end, - {keep_state, State, [{reply, From, L4}]}; + {keep_state, State, [{reply, From, {ok, L4}}]}; %% 处理授权 handle_event(cast, {auth, Auth}, StateName, State = #state{device_uuid = DeviceUUID}) -> diff --git a/apps/iot/src/iot_util.erl b/apps/iot/src/iot_util.erl index 026816e..63e7196 100644 --- a/apps/iot/src/iot_util.erl +++ b/apps/iot/src/iot_util.erl @@ -82,9 +82,8 @@ chunks0([Hd | Tail], Size, Num, Target, AccTarget) -> chunks0(Tail, Size, Num - 1, [Hd | Target], AccTarget). json_data(Data) -> - jiffy:encode(#{ - <<"result">> => Data - }, [force_utf8]). + Json = jiffy:encode(#{<<"result">> => Data}, [force_utf8]), + iolist_to_binary(Json). json_error(ErrCode, ErrMessage) when is_integer(ErrCode), is_binary(ErrMessage) -> jiffy:encode(#{ From a3f759034184e3e68be9adbac63366074dd10ddd Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 21 Jun 2024 01:35:23 +0800 Subject: [PATCH 06/11] fix device query --- apps/iot/src/http_handler/device_handler.erl | 6 +++--- apps/iot/src/iot_device.erl | 2 +- 2 files changed, 4 insertions(+), 4 deletions(-) diff --git a/apps/iot/src/http_handler/device_handler.erl b/apps/iot/src/http_handler/device_handler.erl index d5dcf30..2252058 100644 --- a/apps/iot/src/http_handler/device_handler.erl +++ b/apps/iot/src/http_handler/device_handler.erl @@ -80,8 +80,8 @@ handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := Devic true -> {ok, DeviceDataList} = iot_device:query(DevicePid, Tags, StartTs, StopTs, Limit), - DataItems = lists:map(fun(#device_data{tags = Tags, val = Val, timestamp = Timestamp}) -> - #{<<"tags">> => Tags, <<"val">> => Val, <<"timestamp">> => Timestamp} + DataItems = lists:map(fun(#device_data{tags = Tags0, val = Val, timestamp = Timestamp}) -> + #{<<"tags">> => Tags0, <<"val">> => Val, <<"timestamp">> => Timestamp} end, DeviceDataList), {ok, 200, iot_util:json_data(DataItems)}; false -> @@ -91,4 +91,4 @@ handle_request("POST", "/device/query", _, Params = #{<<"device_uuid">> := Devic handle_request(_, Path, _, _) -> Path1 = list_to_binary(Path), - {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. \ No newline at end of file + {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. diff --git a/apps/iot/src/iot_device.erl b/apps/iot/src/iot_device.erl index 08debcf..f6ed8d6 100644 --- a/apps/iot/src/iot_device.erl +++ b/apps/iot/src/iot_device.erl @@ -50,7 +50,7 @@ % "timestamp": int %} %], --spec serialize(FieldsList :: [map()]) -> [Val :: binary()]. +-spec serialize(FieldsList :: [map()]) -> [Val :: map()]. serialize(FieldsList) when is_list(FieldsList) -> lists:flatmap(fun serialize0/1, FieldsList). serialize0(Fields = #{<<"key">> := Key}) when is_binary(Key) andalso Key /= <<>> -> From 476c7da467fcb7d9b6f6c6a63e0e50c51df2db15 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 22 Jun 2024 10:33:56 +0800 Subject: [PATCH 07/11] rename dynamic location code --- apps/iot/include/iot.hrl | 4 ++-- apps/iot/src/endpoint/iot_http_endpoint.erl | 4 ++-- apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 8 ++++---- apps/iot/src/endpoint/iot_zd_endpoint.erl | 8 ++++---- apps/iot/src/iot_ai_router.erl | 4 ++-- apps/iot/src/iot_router.erl | 4 ++-- 6 files changed, 16 insertions(+), 16 deletions(-) diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 124f60d..6ff7095 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -87,7 +87,7 @@ -record(north_data, { id = 0 :: integer(), location_code :: binary(), - real_location_code :: binary(), + dynamic_location_code :: binary(), %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] fields :: [{K :: binary(), V :: any()}], timestamp = 0 :: integer() @@ -97,7 +97,7 @@ -record(event_data, { id = 0 :: integer(), location_code :: binary(), - real_location_code :: binary(), + dynamic_location_code :: binary(), event_type :: integer(), params :: map() }). diff --git a/apps/iot/src/endpoint/iot_http_endpoint.erl b/apps/iot/src/endpoint/iot_http_endpoint.erl index d68d38a..1f72f1f 100644 --- a/apps/iot/src/endpoint/iot_http_endpoint.erl +++ b/apps/iot/src/endpoint/iot_http_endpoint.erl @@ -82,8 +82,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, RealLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, event_type = EventType, params = Params}, +handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> + EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params}, %% 避免不必要的内部消息 Actions = case FlightNum < PoolSize of true -> [{next_event, info, fetch_next}]; diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl index 83a4cff..8ca2bcb 100644 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl @@ -92,8 +92,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, RealLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> - EventData = #event_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, event_type = EventType, params = Params}, +handle_event(cast, {forward, LocationCode, DynamicLocationCode, EventType, Params}, _, State = #state{id = Id, flight_num = FlightNum, pool_size = PoolSize, queue = Q}) -> + EventData = #event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params}, %% 避免不必要的内部消息 Actions = case FlightNum < PoolSize of true -> [{next_event, info, fetch_next}]; @@ -176,7 +176,7 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) -> %%%=================================================================== -spec do_post(EventData :: #event_data{}, State :: #state{}) -> no_return(). -do_post(#event_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, event_type = EventType, +do_post(#event_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, event_type = EventType, params = Params = #{<<"event_code">> := EventCode, <<"description">> := Description, <<"datetime">> := Datetime, <<"attachments">> := Attachments0}}, #state{pri_key = PriKey, url = Url, logger_pid = LoggerPid}) -> @@ -190,7 +190,7 @@ do_post(#event_data{id = Id, location_code = LocationCode, real_location_code = DeviceInfo = #{ <<"location">> => LocationCode, - <<"real_location">> => RealLocationCode, + <<"dynamic_location">> => DynamicLocationCode, <<"category">> => EventCode, <<"description">> => Description, <<"occurrenceTime">> => Datetime, diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index 202664a..94eeb6c 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -91,8 +91,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, RealLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> - mnesia_queue:insert(#north_data{location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}), +handle_event(cast, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> + mnesia_queue:insert(#north_data{location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}), %% 避免不必要的内部消息 Actions = case StateName =:= connected andalso not IsBusy of true -> [{next_event, info, fetch_next}]; @@ -228,11 +228,11 @@ create_postman(Opts) -> mqtt_postman:start_link(PostmanOpts, Topic, Qos). -spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). -do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> +do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, dynamic_location_code = DynamicLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> Data = #{ <<"version">> => <<"1.0">>, <<"location_code">> => LocationCode, - <<"real_location_code">> => RealLocationCode, + <<"dynamic_location_code">> => DynamicLocationCode, <<"ts">> => Timestamp, <<"properties">> => Fields }, diff --git a/apps/iot/src/iot_ai_router.erl b/apps/iot/src/iot_ai_router.erl index b095c23..42075e1 100644 --- a/apps/iot/src/iot_ai_router.erl +++ b/apps/iot/src/iot_ai_router.erl @@ -17,8 +17,8 @@ route_uuid(RouterUUID, EventType, Params) when is_binary(RouterUUID), is_integer(EventType), is_map(Params) -> %% 查找终端设备对应的点位信息 case redis_client:hgetall(RouterUUID) of - {ok, #{<<"location_code">> := LocationCode, <<"real_location_code">> := RealLocationCode}} when is_binary(LocationCode), is_binary(RealLocationCode) -> - iot_jinzhi_endpoint:forward(LocationCode, RealLocationCode, EventType, Params); + {ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) -> + iot_jinzhi_endpoint:forward(LocationCode, DynamicLocationCode, EventType, Params); {ok, _} -> lager:debug("[iot_ai_router] the event_data hget location_code, uuid: ~p, not found", [RouterUUID]); {error, Reason} -> diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index 5e9a1cb..45a4759 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -17,8 +17,8 @@ route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) -> %% 查找终端设备对应的点位信息 case redis_client:hgetall(RouterUUID) of - {ok, #{<<"location_code">> := LocationCode, <<"real_location_code">> := RealLocationCode}} when is_binary(LocationCode) -> - iot_zd_endpoint:forward(LocationCode, RealLocationCode, Fields, Timestamp); + {ok, #{<<"location_code">> := LocationCode, <<"dynamic_location_code">> := DynamicLocationCode}} when is_binary(LocationCode), is_binary(DynamicLocationCode) -> + iot_zd_endpoint:forward(LocationCode, DynamicLocationCode, Fields, Timestamp); {ok, _} -> lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [RouterUUID, Fields]); {error, Reason} -> From ecfb6f812b0181cc3f285dc523889c299a85ff3a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 22 Jun 2024 10:50:58 +0800 Subject: [PATCH 08/11] fix influxdb backup --- apps/iot/src/influxdb/influx_client.erl | 3 ++- config/sys-dev.config | 10 ++++++++++ config/sys-prod.config | 10 ++++++++++ 3 files changed, 22 insertions(+), 1 deletion(-) diff --git a/apps/iot/src/influxdb/influx_client.erl b/apps/iot/src/influxdb/influx_client.erl index 4b950c1..a4e39ad 100644 --- a/apps/iot/src/influxdb/influx_client.erl +++ b/apps/iot/src/influxdb/influx_client.erl @@ -71,7 +71,8 @@ write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement) end, NFieldsList), Precision = influx_client:get_precision(Timestamp), - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end); + poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end), + poolboy:transaction(influx_pool_backup, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end); false -> ok end. diff --git a/config/sys-dev.config b/config/sys-dev.config index c49c0a0..2fb8c42 100644 --- a/config/sys-dev.config +++ b/config/sys-dev.config @@ -87,6 +87,16 @@ {port, 8086}, {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} ] + }, + + %% 备份库 + {influx_pool_backup, + [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], + [ + {host, "39.98.184.67"}, + {port, 8086}, + {token, <<"IUQ04qecTie7LSuX1EDFBeqspClOdoRBfmXDQxhoEjiJFeW8M-Ui66t981YvviI5qOBpf_ZLgJlBx7nid2lyJQ==">>} + ] } ]} diff --git a/config/sys-prod.config b/config/sys-prod.config index 2c1a1f0..e7f331d 100644 --- a/config/sys-prod.config +++ b/config/sys-prod.config @@ -76,6 +76,16 @@ {port, 8086}, {token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>} ] + }, + + %% influxdb备份库 + {influx_pool_backup, + [{size, 100}, {max_overflow, 200}, {worker_module, influx_client}], + [ + {host, "172.19.0.4"}, + {port, 8086}, + {token, <<"A-ZRjqMK_7NR45lXXEiR7AEtYCd1ETzq9Z61FTMQLb5O4-1hSf8sCrjdPB84e__xsrItKHL3qjJALgbYN-H_VQ==">>} + ] } ]} From e947976ecc999a83bc61aefaf047502e3765ba7a Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 22 Jun 2024 10:54:08 +0800 Subject: [PATCH 09/11] fix --- apps/iot/src/influxdb/influx_client.erl | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/apps/iot/src/influxdb/influx_client.erl b/apps/iot/src/influxdb/influx_client.erl index a4e39ad..ce67900 100644 --- a/apps/iot/src/influxdb/influx_client.erl +++ b/apps/iot/src/influxdb/influx_client.erl @@ -71,8 +71,8 @@ write_data(Measurement, Tags, FieldsList, Timestamp) when is_binary(Measurement) end, NFieldsList), Precision = influx_client:get_precision(Timestamp), - poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end), - poolboy:transaction(influx_pool_backup, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end); + %poolboy:transaction(influx_pool_backup, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end); + poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, ?DEFAULT_BUCKET, ?DEFAULT_ORG, Precision, Points) end); false -> ok end. From 7c6f4a7198b5cae1801055d8f7dfe1365abb6795 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 22 Jun 2024 10:56:53 +0800 Subject: [PATCH 10/11] fix readme --- docs/zhongdian_mqtt.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/docs/zhongdian_mqtt.md b/docs/zhongdian_mqtt.md index 010aa02..c39c3f1 100644 --- a/docs/zhongdian_mqtt.md +++ b/docs/zhongdian_mqtt.md @@ -22,7 +22,7 @@ "version": "1.0", "location_code": "string", // 日期: 2024-06-20, <<新增字段>> - "real_location_code": "string", + "dynamic_location_code": "string", "ts ": 1688606685, "properties": [ { From 718599b9a061677769fdd32734d5b38257c45581 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Sat, 22 Jun 2024 11:00:36 +0800 Subject: [PATCH 11/11] fix code --- apps/iot/src/endpoint/iot_jinzhi_endpoint.erl | 6 +++--- apps/iot/src/endpoint/iot_zd_endpoint.erl | 6 +++--- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl index 8ca2bcb..e11fd00 100644 --- a/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl +++ b/apps/iot/src/endpoint/iot_jinzhi_endpoint.erl @@ -46,9 +46,9 @@ get_pid() -> whereis(?MODULE). --spec forward(LocationCode :: binary(), RealLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). -forward(LocationCode, RealLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(RealLocationCode), is_integer(EventType), is_map(Params) -> - gen_statem:cast(?MODULE, {forward, LocationCode, RealLocationCode, EventType, Params}). +-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), EventType :: integer(), Params :: map()) -> no_return(). +forward(LocationCode, DynamicLocationCode, EventType, Params) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_integer(EventType), is_map(Params) -> + gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, EventType, Params}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() -> diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index 94eeb6c..a4c0d96 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -46,9 +46,9 @@ get_pid() -> whereis(?MODULE). --spec forward(LocationCode :: binary(), RealLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). -forward(LocationCode, RealLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(RealLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> - gen_statem:cast(?MODULE, {forward, LocationCode, RealLocationCode, Fields, Timestamp}). +-spec forward(LocationCode :: binary(), DynamicLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(LocationCode, DynamicLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(DynamicLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(?MODULE, {forward, LocationCode, DynamicLocationCode, Fields, Timestamp}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() ->