diff --git a/apps/efka/src/channel/ws_channel.erl b/apps/efka/src/channel/ws_channel.erl index 839a62d..38ea1e5 100644 --- a/apps/efka/src/channel/ws_channel.erl +++ b/apps/efka/src/channel/ws_channel.erl @@ -193,8 +193,13 @@ handle_request(#{<<"method">> := <<"stream_chunk">>, %% 数据项 handle_request(#{<<"method">> := <<"metric_data">>, - <<"params">> := #{<<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> - efka_service:metric_data(ServicePid, RouteKey, Metric), + <<"params">> := #{<<"route_key">> := RouteKey, <<"metric">> := Metric0}}, State = #state{service_pid = ServicePid, is_registered = true}) -> + case map_metric(Metric0) of + {ok, Metric} -> + efka_service:metric_data(ServicePid, RouteKey, Metric); + error -> + lager:debug("[ws_channel] metric_data get invalid metric: ~p", Metric0) + end, {ok, State}. -spec json_result(Id :: integer(), Result :: term()) -> binary(). @@ -236,3 +241,15 @@ search_stream_id(StreamPid, StreamMap) when is_pid(StreamPid), is_map(StreamMap) [StreamId|_] -> {ok, StreamId} end. + +-spec map_metric(Metric :: any()) -> {ok, binary()} | error. +map_metric(Metric) when is_binary(Metric) -> + Metric; +map_metric(Metric) when is_map(Metric) orelse is_list(Metric) -> + jiffy:encode(Metric, [force_utf8]); +map_metric(Metric) when is_integer(Metric) -> + integer_to_binary(Metric); +map_metric(Metric) when is_float(Metric) -> + erlang:float_to_binary(Metric, [compact, {decimals, 10}]); +map_metric(_) -> + error. \ No newline at end of file