remove post_data
This commit is contained in:
parent
0b670819b3
commit
89214c6918
@ -100,10 +100,3 @@
|
|||||||
event_type :: integer(),
|
event_type :: integer(),
|
||||||
params :: map()
|
params :: map()
|
||||||
}).
|
}).
|
||||||
|
|
||||||
%% 发送数据
|
|
||||||
-record(post_data, {
|
|
||||||
id = 0 :: integer(),
|
|
||||||
%% 数据库类型的endpoint, 可以返回list: [{K, V}, {K1, V1}]
|
|
||||||
body :: binary() | list()
|
|
||||||
}).
|
|
||||||
@ -159,12 +159,12 @@ code_change(_OldVsn, StateName, State = #state{}, _Extra) ->
|
|||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
-spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return().
|
-spec do_post(PostmanPid :: pid(), EventData :: #event_data{}) -> no_return().
|
||||||
do_post(PostmanPid, #event_data{id = Id, location_code = LocationCode, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
|
do_post(PostmanPid, #event_data{id = Id, event_type = EventType, params = Params}) when is_pid(PostmanPid) ->
|
||||||
Data = #{
|
Data = #{
|
||||||
<<"version">> => <<"1.0">>,
|
<<"version">> => <<"1.0">>,
|
||||||
<<"event_type">> => EventType,
|
<<"event_type">> => EventType,
|
||||||
<<"params">> => Params
|
<<"params">> => Params
|
||||||
},
|
},
|
||||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||||
PostmanPid ! {post, self(), #post_data{id = Id, body = Body}},
|
PostmanPid ! {post, self(), {Id, Body}},
|
||||||
ok.
|
ok.
|
||||||
@ -238,7 +238,7 @@ do_post(PostmanPid, #north_data{id = Id, location_code = LocationCode, real_loca
|
|||||||
},
|
},
|
||||||
try
|
try
|
||||||
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
Body = iolist_to_binary(jiffy:encode(Data, [force_utf8])),
|
||||||
PostmanPid ! {post, self(), #post_data{id = Id, body = Body}}
|
PostmanPid ! {post, self(), {Id, Body}}
|
||||||
catch _:_ ->
|
catch _:_ ->
|
||||||
self() ! {ack, Id, <<"json error">>}
|
self() ! {ack, Id, <<"json error">>}
|
||||||
end.
|
end.
|
||||||
@ -30,7 +30,7 @@ test_influxdb() ->
|
|||||||
end, lists:seq(1, 100)).
|
end, lists:seq(1, 100)).
|
||||||
|
|
||||||
test_mqtt() ->
|
test_mqtt() ->
|
||||||
iot_zd_endpoint:forward({<<"location_code_test123">>, <<"location_code_test123">>}, [
|
iot_zd_endpoint:forward(<<"location_code_test123">>, <<"location_code_test123">>, [
|
||||||
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
|
#{<<"key">> => <<"name">>, <<"value">> => <<"anlicheng">>},
|
||||||
#{<<"key">> => <<"age">>, <<"value">> => 30},
|
#{<<"key">> => <<"age">>, <<"value">> => 30},
|
||||||
#{<<"key">> => <<"flow">>, <<"value">> => 30}
|
#{<<"key">> => <<"flow">>, <<"value">> => 30}
|
||||||
|
|||||||
@ -74,7 +74,7 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Body}}, State = #state{url = Url}) ->
|
handle_info({post, ReceiverPid, {Id, Body}}, State = #state{url = Url}) ->
|
||||||
Headers = [
|
Headers = [
|
||||||
{<<"content-type">>, <<"application/json">>}
|
{<<"content-type">>, <<"application/json">>}
|
||||||
],
|
],
|
||||||
|
|||||||
@ -97,7 +97,7 @@ handle_info({puback, #{packet_id := PacketId}}, State = #state{inflight = Inflig
|
|||||||
end;
|
end;
|
||||||
|
|
||||||
%% 转发信息
|
%% 转发信息
|
||||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) ->
|
handle_info({post, ReceiverPid, {Id, Message}}, State = #state{conn_pid = ConnPid, inflight = InFlight, topic = Topic, qos = Qos}) ->
|
||||||
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
lager:debug("[mqtt_postman] will publish topic: ~p, message: ~ts, qos: ~p", [Topic, Message, Qos]),
|
||||||
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
case emqtt:publish(ConnPid, Topic, #{}, Message, [{qos, Qos}, {retain, true}]) of
|
||||||
ok ->
|
ok ->
|
||||||
|
|||||||
@ -80,7 +80,7 @@ handle_cast(_Request, State = #state{}) ->
|
|||||||
{noreply, NewState :: #state{}} |
|
{noreply, NewState :: #state{}} |
|
||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_info({post, ReceiverPid, #post_data{id = Id, body = Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
handle_info({post, ReceiverPid, {Id, Fields}}, State = #state{mysql_pid = ConnPid, table = Table}) when is_list(Fields) ->
|
||||||
case catch mysql_provider:insert(ConnPid, Table, Fields, false) of
|
case catch mysql_provider:insert(ConnPid, Table, Fields, false) of
|
||||||
ok ->
|
ok ->
|
||||||
ReceiverPid ! {ack, Id};
|
ReceiverPid ! {ack, Id};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user