From ef7d4bf627e3411ab9bbec9250034497f92a3cbd Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 15 Aug 2025 18:49:40 +0800 Subject: [PATCH] fix line protocol --- apps/iot/src/data_format/line_format.erl | 2 +- apps/iot/src/endpoint/endpoint_mysql.erl | 25 +++++++++++++++++++----- 2 files changed, 21 insertions(+), 6 deletions(-) diff --git a/apps/iot/src/data_format/line_format.erl b/apps/iot/src/data_format/line_format.erl index d780ad5..775ad3c 100644 --- a/apps/iot/src/data_format/line_format.erl +++ b/apps/iot/src/data_format/line_format.erl @@ -15,7 +15,7 @@ %% <<"cpu,name=xyz,uuid=\"this,=isx=test,\" key=345,key1=23.5,key2=yes 123457">>, -spec parse(Metric :: binary()) -> map(). parse(Metric) when is_binary(Metric) -> - case lexer(Metric) of + case catch lexer(Metric) of [[Measurement|Tags], Fields, [Timestamp|_]] -> {ok, #{ <<"measurement">> => Measurement, diff --git a/apps/iot/src/endpoint/endpoint_mysql.erl b/apps/iot/src/endpoint/endpoint_mysql.erl index 6689690..428da5f 100644 --- a/apps/iot/src/endpoint/endpoint_mysql.erl +++ b/apps/iot/src/endpoint/endpoint_mysql.erl @@ -175,13 +175,28 @@ code_change(_OldVsn, State = #state{}, _Extra) -> -spec insert_sql(Table :: binary(), ServiceId :: binary(), Format :: binary(), FieldsMap :: map(), Metric :: binary()) -> {ok, Sql :: binary(), Values :: list()}. insert_sql(Table, ServiceId, <<"line">>, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) -> - {Keys, Values} = kvs(Fields), + case line_format:parse(Metric) of + error -> + error; + {ok, #{<<"measurement">> := Measurement, <<"tags">> := Tags, <<"fields">> := Fields, <<"timestamp">> := Timestamp}} -> + Map = maps:merge(Tags, Fields), + NMap = Map#{<<"measurement">> => Measurement, <<"timestamp">> => Timestamp}, + lists:flatmap(fun({TableField, F}) -> + case maps:find(F, NMap) of + error -> + []; + {ok, Val} -> + [{TableField, Val}] + end + end, maps:to_list(FieldsMap)), - FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), - Placeholders = lists:duplicate(length(Keys), <<"?">>), - ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), + {Keys, Values} = kvs(Fields), + FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), + Placeholders = lists:duplicate(length(Keys), <<"?">>), + ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), - {ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values}. + {ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values} + end. -spec kvs(Fields :: list()) -> {Keys :: list(), Values :: list()}. kvs(Fields) when is_list(Fields) ->