fix line protocol
This commit is contained in:
parent
686c415dc8
commit
ef7d4bf627
@ -15,7 +15,7 @@
|
|||||||
%% <<"cpu,name=xyz,uuid=\"this,=isx=test,\" key=345,key1=23.5,key2=yes 123457">>,
|
%% <<"cpu,name=xyz,uuid=\"this,=isx=test,\" key=345,key1=23.5,key2=yes 123457">>,
|
||||||
-spec parse(Metric :: binary()) -> map().
|
-spec parse(Metric :: binary()) -> map().
|
||||||
parse(Metric) when is_binary(Metric) ->
|
parse(Metric) when is_binary(Metric) ->
|
||||||
case lexer(Metric) of
|
case catch lexer(Metric) of
|
||||||
[[Measurement|Tags], Fields, [Timestamp|_]] ->
|
[[Measurement|Tags], Fields, [Timestamp|_]] ->
|
||||||
{ok, #{
|
{ok, #{
|
||||||
<<"measurement">> => Measurement,
|
<<"measurement">> => Measurement,
|
||||||
|
|||||||
@ -175,13 +175,28 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
|
|
||||||
-spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> {ok, Sql :: binary(), Values :: list()}.
|
-spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> {ok, Sql :: binary(), Values :: list()}.
|
||||||
insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) ->
|
insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) ->
|
||||||
{Keys, Values} = kvs(Fields),
|
case line_format:parse(Metric) of
|
||||||
|
error ->
|
||||||
|
error;
|
||||||
|
{ok, #{<<"measurement">> := Measurement, <<"tags">> := Tags, <<"fields">> := Fields, <<"timestamp">> := Timestamp}} ->
|
||||||
|
Map = maps:merge(Tags, Fields),
|
||||||
|
NMap = Map#{<<"measurement">> => Measurement, <<"timestamp">> => Timestamp},
|
||||||
|
lists:flatmap(fun({TableField, F}) ->
|
||||||
|
case maps:find(F, NMap) of
|
||||||
|
error ->
|
||||||
|
[];
|
||||||
|
{ok, Val} ->
|
||||||
|
[{TableField, Val}]
|
||||||
|
end
|
||||||
|
end, maps:to_list(FieldsMap)),
|
||||||
|
|
||||||
FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)),
|
{Keys, Values} = kvs(Fields),
|
||||||
Placeholders = lists:duplicate(length(Keys), <<"?">>),
|
FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)),
|
||||||
ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)),
|
Placeholders = lists:duplicate(length(Keys), <<"?">>),
|
||||||
|
ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)),
|
||||||
|
|
||||||
{ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values}.
|
{ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values}
|
||||||
|
end.
|
||||||
|
|
||||||
-spec kvs(Fields :: list()) -> {Keys :: list(), Values :: list()}.
|
-spec kvs(Fields :: list()) -> {Keys :: list(), Values :: list()}.
|
||||||
kvs(Fields) when is_list(Fields) ->
|
kvs(Fields) when is_list(Fields) ->
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user