diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 39cd286..8aaf1af 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -87,6 +87,7 @@ -record(north_data, { id = 0 :: integer(), location_code :: binary(), + real_location_code :: binary(), %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] fields :: [{K :: binary(), V :: any()}], timestamp = 0 :: integer() @@ -103,7 +104,6 @@ %% 发送数据 -record(post_data, { id = 0 :: integer(), - location_code :: binary(), %% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}] body :: binary() | list() }). \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_http_endpoint.erl b/apps/iot/src/endpoint/iot_http_endpoint.erl index 03d7df9..8a7eeba 100644 --- a/apps/iot/src/endpoint/iot_http_endpoint.erl +++ b/apps/iot/src/endpoint/iot_http_endpoint.erl @@ -166,5 +166,5 @@ do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_typ <<"params">> => Params }, Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}, + PostmanPid ! {post, self(), #post_data{id = Id, body = Body}}, ok. \ No newline at end of file diff --git a/apps/iot/src/endpoint/iot_zd_endpoint.erl b/apps/iot/src/endpoint/iot_zd_endpoint.erl index eeeaefd..799d976 100644 --- a/apps/iot/src/endpoint/iot_zd_endpoint.erl +++ b/apps/iot/src/endpoint/iot_zd_endpoint.erl @@ -14,7 +14,7 @@ %% API -export([start_link/0]). --export([get_pid/0, forward/3, get_stat/0]). +-export([get_pid/0, forward/4, get_stat/0]). %% gen_statem callbacks -export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]). @@ -46,9 +46,9 @@ get_pid() -> whereis(?MODULE). --spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). -forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> - gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}). +-spec forward(LocationCode :: binary(), RealLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). +forward(LocationCode, RealLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(RealLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) -> + gen_statem:cast(?MODULE, {forward, LocationCode, RealLocationCode, Fields, Timestamp}). -spec get_stat() -> {ok, Stat :: #{}}. get_stat() -> @@ -91,8 +91,8 @@ callback_mode() -> %% functions is called when gen_statem receives and event from %% call/2, cast/2, or as a normal process message. -handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> - mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}), +handle_event(cast, {forward, LocationCode, RealLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) -> + mnesia_queue:insert(#north_data{location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}), %% 避免不必要的内部消息 Actions = case StateName =:= connected andalso not IsBusy of true -> [{next_event, info, fetch_next}]; @@ -228,16 +228,17 @@ create_postman(Opts) -> mqtt_postman:start_link(PostmanOpts, Topic, Qos). -spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return(). -do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> +do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) -> Data = #{ <<"version">> => <<"1.0">>, <<"location_code">> => LocationCode, + <<"real_location_code">> => RealLocationCode, <<"ts">> => Timestamp, <<"properties">> => Fields }, try Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])), - PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}} + PostmanPid ! {post, self(), #post_data{id = Id, body = Body}} catch _:_ -> self() ! {ack, Id, <<"json error">>} end. \ No newline at end of file diff --git a/apps/iot/src/iot_router.erl b/apps/iot/src/iot_router.erl index 9364ec6..5e9a1cb 100644 --- a/apps/iot/src/iot_router.erl +++ b/apps/iot/src/iot_router.erl @@ -16,11 +16,11 @@ -spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return(). route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) -> %% 查找终端设备对应的点位信息 - case redis_client:hget(RouterUUID, <<"location_code">>) of - {ok, undefined} -> + case redis_client:hgetall(RouterUUID) of + {ok, #{<<"location_code">> := LocationCode, <<"real_location_code">> := RealLocationCode}} when is_binary(LocationCode) -> + iot_zd_endpoint:forward(LocationCode, RealLocationCode, Fields, Timestamp); + {ok, _} -> lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [RouterUUID, Fields]); - {ok, LocationCode} when is_binary(LocationCode) -> - iot_zd_endpoint:forward(LocationCode, Fields, Timestamp); {error, Reason} -> lager:warning("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [RouterUUID, Reason, Fields]) end. \ No newline at end of file diff --git a/apps/iot/src/mocker/iot_mock.erl b/apps/iot/src/mocker/iot_mock.erl index c5493c3..21a280f 100644 --- a/apps/iot/src/mocker/iot_mock.erl +++ b/apps/iot/src/mocker/iot_mock.erl @@ -30,7 +30,7 @@ test_influxdb() -> end, lists:seq(1, 100)). test_mqtt() -> - iot_zd_endpoint:forward(<<"location_code_test123">>, [ + iot_zd_endpoint:forward({<<"location_code_test123">>, <<"location_code_test123">>}, [ #{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>}, #{<<"key">> => <<"age">>, <<"value">> => 30}, #{<<"key">> => <<"flow">>, <<"value">> => 30} diff --git a/apps/iot/src/postman/mqtt_postman.erl b/apps/iot/src/postman/mqtt_postman.erl index 6e0177b..55be2cb 100644 --- a/apps/iot/src/postman/mqtt_postman.erl +++ b/apps/iot/src/postman/mqtt_postman.erl @@ -97,8 +97,7 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig end; %% 转发信息 -handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) -> - Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]), +handle_info({post, ReceiverPid, #post_data{id = Id, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) -> lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]), case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of ok ->