diff --git a/apps/endpoint/include/endpoint.hrl b/apps/endpoint/include/endpoint.hrl index 75af57c..57eb34b 100644 --- a/apps/endpoint/include/endpoint.hrl +++ b/apps/endpoint/include/endpoint.hrl @@ -28,6 +28,8 @@ -record(kafka_endpoint, { username = <<>> :: binary(), password = <<>> :: binary(), + %% 加密算法 + mechanism = plain :: plain | scram_sha_256 | scram_sha_512, bootstrap_servers = [] :: [binary()], topic = <<>> :: binary() }). diff --git a/apps/endpoint/src/endpoint_kafka_client.erl b/apps/endpoint/src/endpoint_kafka_client.erl index 2d6021e..921c1fb 100644 --- a/apps/endpoint/src/endpoint_kafka_client.erl +++ b/apps/endpoint/src/endpoint_kafka_client.erl @@ -95,7 +95,7 @@ handle_cast({forward, LocationCode, Fields, Timestamp}, State = #state{buffer = {stop, Reason :: term(), NewState :: #state{}}). %% bootstrap_servers的格式为: [<<"localhost:9092">>, <<"localhost:9093">>] handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status = ?DISCONNECTED, client_id = ClientId, - endpoint = #endpoint{title = Title, config = #kafka_endpoint{bootstrap_servers = BootstrapServers0, topic = Topic, username = Username, password = Password}}}) -> + endpoint = #endpoint{title = Title, config = #kafka_endpoint{bootstrap_servers = BootstrapServers0, topic = Topic, mechanism = Mechanism, username = Username, password = Password}}}) -> BootstrapServers = lists:flatmap(fun(S) -> case binary:split(S, <<":">>, [global, trim]) of [Host, Port] -> @@ -104,27 +104,29 @@ handle_info({timeout, _, create_postman}, State = #state{buffer = Buffer, status [] end end, BootstrapServers0), - lager:debug("[kafka_client] endpoint: ~p, create postman, bootstap_servers: ~p", [Title, BootstrapServers]), - ClientConfig = [{reconnect_cool_down_seconds, 10}], - case brod:start_client(BootstrapServers, ClientId, ClientConfig) of + KafkaConfig = [ + {sasl, { + Mechanism, + Username, + Password + }}, + {ssl, false}, + {auto_start_producers, true}, + {reconnect_cool_down_seconds, 10} + ], + lager:debug("[kafka_client] endpoint: ~p, create postman, bootstap_servers: ~p, kafka config: ~p", [Title, BootstrapServers, KafkaConfig]), + case brod:start_client(BootstrapServers, ClientId, KafkaConfig) of ok -> - case brod:start_producer(ClientId, Topic, []) of - ok -> - ClientPid = whereis(ClientId), - erlang:monitor(process, ClientPid), + ClientPid = whereis(ClientId), + erlang:monitor(process, ClientPid), - lager:debug("[kafka_client] start producer success, topic: ~p", [Topic]), - NBuffer = endpoint_buffer:trigger_n(Buffer), - {noreply, State#state{client_pid = ClientPid, status = ?CONNECTED, buffer = NBuffer}}; - {error, Reason} -> - lager:debug("[kafka_client] start producer, get error: ~p", [Reason]), - erlang:start_timer(5000, self(), create_postman), - {noreply, State} - end; + lager:debug("[kafka_client] start producer success, topic: ~p", [Topic]), + NBuffer = endpoint_buffer:trigger_n(Buffer), + {noreply, State#state{client_pid = ClientPid, status = ?CONNECTED, buffer = NBuffer}}; {error, Reason} -> lager:debug("[kafka_client] start client, get error: ~p", [Reason]), - erlang:start_timer(5000, self(), create_postman), + reconnect_ticker(), {noreply, State} end; @@ -135,14 +137,18 @@ handle_info({next_data, _Id, _Metadata, _Metric}, State = #state{status = ?DISCO handle_info({next_data, Id, Metadata, Metric}, State = #state{client_id = ClientId, status = ?CONNECTED, buffer = Buffer, endpoint = #endpoint{config = #kafka_endpoint{topic = Topic}}}) -> + XMetadata = base64:encode(iolist_to_binary(jiffy:encode(Metadata, [force_utf8]))), + Len = byte_size(XMetadata), + Payload = <>, + %% 使用service_id左右hash的key Args = case maps:get(<<"service_id">>, Metadata, undefined) of undefined -> - [ClientId, Topic, random, <<"">>, Metric]; + [ClientId, Topic, random, <<"">>, Payload]; Key -> - [ClientId, Topic, undefined, Key, Metric] + [ClientId, Topic, undefined, Key, Payload] end, - lager:debug("[kafka_client] will publish topic: ~p, message: ~p", [Topic, Metric]), + lager:debug("[kafka_client] will publish topic: ~p, message: ~p", [Topic, Payload]), case erlang:apply(brod, produce_sync, Args)of ok -> @@ -188,21 +194,6 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== --spec format_props(Props :: [{Key :: any(), Val :: any()}]) -> [{Key1 :: binary(), Val1 :: binary()}]. -format_props(Props) when is_list(Props) -> - format_props(Props, []). -format_props([], Acc) -> - Acc; -format_props([{Key, Val}|T], Acc) when is_binary(Key), is_integer(Val) -> - format_props(T, [{Key, integer_to_binary(Val)}|Acc]); -format_props([{Key, Val}|T], Acc) when is_binary(Key), is_float(Val) -> - NVal = list_to_binary(float_to_list(Val, [{decimals, 2}, compact])), - format_props(T, [{Key, NVal}|Acc]); -format_props([{Key, Val}|T], Acc) when is_binary(Key), is_binary(Val) -> - format_props(T, [{Key, Val}|Acc]); -format_props([_|T], Acc) -> - format_props(T, Acc). - -spec reconnect_ticker() -> no_return(). reconnect_ticker() -> erlang:start_timer(?RETRY_INTERVAL, self(), create_postman). \ No newline at end of file