fix north_data
This commit is contained in:
parent
dbe8bc8dfa
commit
0b670819b3
@ -87,6 +87,7 @@
|
||||
-record(north_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
real_location_code :: binary(),
|
||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
||||
fields :: [{K :: binary(), V :: any()}],
|
||||
timestamp = 0 :: integer()
|
||||
@ -103,7 +104,6 @@
|
||||
%% 发送数据
|
||||
-record(post_data, {
|
||||
id = 0 :: integer(),
|
||||
location_code :: binary(),
|
||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
||||
body :: binary() | list()
|
||||
}).
|
||||
@ -166,5 +166,5 @@ do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_typ
|
||||
<<"params">> => Params
|
||||
},
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}},
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, body = Body}},
|
||||
ok.
|
||||
@ -14,7 +14,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/0]).
|
||||
-export([get_pid/0, forward/3, get_stat/0]).
|
||||
-export([get_pid/0, forward/4, get_stat/0]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
-export([init/1, handle_event/4, terminate/3, code_change/4, callback_mode/0]).
|
||||
@ -46,9 +46,9 @@
|
||||
get_pid() ->
|
||||
whereis(?MODULE).
|
||||
|
||||
-spec forward(LocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
forward(LocationCode, Fields, Timestamp) when is_binary(LocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, Fields, Timestamp}).
|
||||
-spec forward(LocationCode :: binary(), RealLocationCode :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
forward(LocationCode, RealLocationCode, Fields, Timestamp) when is_binary(LocationCode), is_binary(RealLocationCode), is_list(Fields); is_binary(Fields), is_integer(Timestamp) ->
|
||||
gen_statem:cast(?MODULE, {forward, LocationCode, RealLocationCode, Fields, Timestamp}).
|
||||
|
||||
-spec get_stat() -> {ok, Stat :: #{}}.
|
||||
get_stat() ->
|
||||
@ -91,8 +91,8 @@ callback_mode() ->
|
||||
%% functions is called when gen_statem receives and event from
|
||||
%% call/2, cast/2, or as a normal process message.
|
||||
|
||||
handle_event(cast, {forward, LocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) ->
|
||||
mnesia_queue:insert(#north_data{location_code = LocationCode, fields = Fields, timestamp = Timestamp}),
|
||||
handle_event(cast, {forward, LocationCode, RealLocationCode, Fields, Timestamp}, StateName, State = #state{is_busy = IsBusy}) ->
|
||||
mnesia_queue:insert(#north_data{location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}),
|
||||
%% 避免不必要的内部消息
|
||||
Actions = case StateName =:= connected andalso not IsBusy of
|
||||
true -> [{next_event, info, fetch_next}];
|
||||
@ -228,16 +228,17 @@ create_postman(Opts) ->
|
||||
mqtt_postman:start_link(PostmanOpts, Topic, Qos).
|
||||
|
||||
-spec do_post(PostmanPid :: pid(), NorthData :: #north_data{}) -> no_return().
|
||||
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
|
||||
do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, real_location_code = RealLocationCode, fields = Fields, timestamp = Timestamp}) when is_pid(PostmanPid) ->
|
||||
Data = #{
|
||||
<<"version">> => <<"1.0">>,
|
||||
<<"location_code">> => LocationCode,
|
||||
<<"real_location_code">> => RealLocationCode,
|
||||
<<"ts">> => Timestamp,
|
||||
<<"properties">> => Fields
|
||||
},
|
||||
try
|
||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, location_code = LocationCode, body = Body}}
|
||||
PostmanPid ! {post, self(), #post_data{id = Id, body = Body}}
|
||||
catch _:_ ->
|
||||
self() ! {ack, Id, <<"json error">>}
|
||||
end.
|
||||
@ -16,11 +16,11 @@
|
||||
-spec route_uuid(RouterUUID :: binary(), Fields :: list(), Timestamp :: integer()) -> no_return().
|
||||
route_uuid(RouterUUID, Fields, Timestamp) when is_binary(RouterUUID), is_list(Fields), is_integer(Timestamp) ->
|
||||
%% 查找终端设备对应的点位信息
|
||||
case redis_client:hget(RouterUUID, <<"location_code">>) of
|
||||
{ok, undefined} ->
|
||||
case redis_client:hgetall(RouterUUID) of
|
||||
{ok, #{<<"location_code">> := LocationCode, <<"real_location_code">> := RealLocationCode}} when is_binary(LocationCode) ->
|
||||
iot_zd_endpoint:forward(LocationCode, RealLocationCode, Fields, Timestamp);
|
||||
{ok, _} ->
|
||||
lager:warning("[iot_host] the north_data hget location_code, uuid: ~p, not found, fields: ~p", [RouterUUID, Fields]);
|
||||
{ok, LocationCode} when is_binary(LocationCode) ->
|
||||
iot_zd_endpoint:forward(LocationCode, Fields, Timestamp);
|
||||
{error, Reason} ->
|
||||
lager:warning("[iot_host] the north_data hget location_code uuid: ~p, get error: ~p, fields: ~p", [RouterUUID, Reason, Fields])
|
||||
end.
|
||||
@ -30,7 +30,7 @@ test_influxdb() ->
|
||||
end, lists:seq(1, 100)).
|
||||
|
||||
test_mqtt() ->
|
||||
iot_zd_endpoint:forward(<<"location_code_test123">>, [
|
||||
iot_zd_endpoint:forward({<<"location_code_test123">>, <<"location_code_test123">>}, [
|
||||
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
|
||||
#{<<"key">> => <<"age">>, <<"value">> => 30},
|
||||
#{<<"key">> => <<"flow">>, <<"value">> => 30}
|
||||
|
||||
@ -97,8 +97,7 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig
|
||||
end;
|
||||
|
||||
%% 转发信息
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, location_code = LocationCode, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic0, qos = Qos}) ->
|
||||
Topic = re:replace(Topic0, <<"\\${location_code}">>, LocationCode, [global, {return, binary}]),
|
||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) ->
|
||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||
ok ->
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user