diff --git a/apps/efka/src/client/efka_client.erl b/apps/efka/src/client/efka_client.erl index ed4a635..27aa5ea 100644 --- a/apps/efka/src/client/efka_client.erl +++ b/apps/efka/src/client/efka_client.erl @@ -22,10 +22,12 @@ %% 上传数据 -define(PACKET_PUSH, 16#03). +-define(PACKET_PUB, 16#04). + %% API -export([start_link/3]). -export([device_offline/1, device_online/1]). --export([send_metric_data/4, request_config/0, send_event/2, controller_process/1]). +-export([send_metric_data/4, request_config/0, send_event/2, controller_process/1, subscribe/1]). -export([test/0]). @@ -53,6 +55,10 @@ controller_process(ControllerPid) when is_pid(ControllerPid) -> send_metric_data(DeviceUUID, Measurement, Tags, Fields) when is_binary(DeviceUUID), is_binary(Measurement), is_map(Fields), is_map(Tags) -> gen_server:cast(?MODULE, {send_metric_data, DeviceUUID, Measurement, Tags, Fields}). +-spec subscribe(Topic :: binary()) -> no_return(). +subscribe(Topic) when is_binary(Topic) -> + gen_server:cast(?MODULE, {subscribe, Topic}). + %% efka_server为了统一,r对象为字符串;需要2次json_decode -spec request_config() -> {ok, Result :: list() | map()} | {error, Reason :: any()}. request_config() -> @@ -103,8 +109,11 @@ init([ServiceId, Host, Port]) -> ok = gen_tcp:controlling_process(Socket, self()), PacketId = 1, - Register = #{<<"id">> => PacketId, <<"method">> => <<"register">>, <<"params">> => #{<<"service_id">> => ServiceId}}, - Packet = jiffy:encode(Register, [force_utf8]), + Packet = jiffy:encode(#{ + <<"id">> => PacketId, + <<"method">> => <<"register">>, + <<"params">> => #{<<"service_id">> => ServiceId} + }, [force_utf8]), ok = gen_tcp:send(Socket, <>), lager:debug("[efka_client] will send packet: ~p", [Packet]), @@ -136,10 +145,11 @@ handle_call({controller_process, ControllerPid}, _From, State) -> %% done handle_call({request_config, ReceiverPid}, _From, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - RequestConfig = #{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, - Packet = jiffy:encode(RequestConfig, [force_utf8]), + Packet = jiffy:encode(#{<<"id">> => PacketId, <<"method">> => <<"request_config">>}, [force_utf8]), ok = gen_tcp:send(Socket, <>), + erlang:start_timer(?EFKA_REQUEST_TIMEOUT, self(), {request_timeout, PacketId}), + Ref = make_ref(), {reply, {ok, Ref}, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {Ref, ReceiverPid}, Inflight)}}. @@ -155,35 +165,44 @@ handle_cast({send_metric_data, DeviceUUID, Measurement, Tags, Fields}, State = # Point = efka_point:new(Measurement, Tags, Fields, efka_util:timestamp()), Body = efka_point:normalized(Point), - MetricData = #{ + Packet = jiffy:encode(#{ <<"id">> => 0, <<"method">> => <<"metric_data">>, <<"params">> => #{ <<"device_uuid">> => DeviceUUID, <<"metric">> => Body } - }, - Packet = jiffy:encode(MetricData, [force_utf8]), + }, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State}; %% done handle_cast({send_event, EventType, Body}, State = #state{socket = Socket}) -> - Event = #{ + Packet = jiffy:encode(#{ <<"id">> => 0, <<"method">> => <<"event">>, <<"params">> => #{ <<"event_type">> => EventType, <<"body">> => Body } - }, - - Packet = jiffy:encode(Event, [force_utf8]), + }, [force_utf8]), ok = gen_tcp:send(Socket, <>), {noreply, State}; +handle_cast({subscribe, Topic}, State = #state{socket = Socket}) -> + Packet = jiffy:encode(#{ + <<"id">> => 0, + <<"method">> => <<"subscribe">>, + <<"params">> => #{ + <<"topic">> => Topic + } + }, [force_utf8]), + + ok = gen_tcp:send(Socket, <>), + {noreply, State}; + handle_cast(_Info, State = #state{}) -> {noreply, State}. @@ -215,10 +234,20 @@ handle_info({tcp, Socket, <>}, State = #state{s end end; -%% 收到efka推送的参数设置 +%% 请求超时 +handle_info({timeout, _, {request_timeout, PacketId}}, State = #state{inflight = Inflight}) -> + case maps:take(PacketId, Inflight) of + error -> + {noreply, State}; + {{Ref, ReceiverPid}, NInflight} -> + ReceiverPid ! {response, Ref, {error, {-1, <<"request timeout">>}}}, + {noreply, State#state{inflight = NInflight}} + end; + +%% 收到efka推送的参数设置, 必须处理该消息;不设置超时 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> case jiffy:decode(Packet, [return_maps]) of - #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson, <<"timeout">> := Timeout}} -> + #{<<"id">> := Id, <<"method">> := <<"push_config">>, <<"params">> := #{<<"config">> := ConfigJson}} -> Ref = make_ref(), ControllerPid ! {push_config, Ref, ConfigJson}, Reply = @@ -227,14 +256,12 @@ handle_info({tcp, Socket, <>}, State = #state{socke #{<<"id">> => Id, <<"result">> => <<"ok">>}; {push_config_reply, Ref, {error, Reason}} when is_binary(Reason) -> #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} - after Timeout * 1000 -> - #{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"timeout">>}} end, Packet = jiffy:encode(Reply, [force_utf8]), ok = gen_tcp:send(Socket, Packet), {noreply, State}; - #{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload, <<"timeout">> := Timeout}} -> + #{<<"id">> := Id, <<"method">> := <<"invoke">>, <<"params">> := #{<<"payload">> := Payload}} -> Ref = make_ref(), ControllerPid ! {invoke, Ref, Payload}, Reply = @@ -243,14 +270,18 @@ handle_info({tcp, Socket, <>}, State = #state{socke #{<<"id">> => Id, <<"result">> => Result}; {invoke_reply, Ref, {error, Reason}} when is_binary(Reason) -> #{<<"id">> => Id, <<"error">> => #{<<"code">> => -1, <<"message">> => Reason}} - after Timeout * 1000 -> - #{<<"id">> => Id, <<"error">> => #{<<"code">> => -2, <<"message">> => <<"invoke timeout">>}} end, Packet = jiffy:encode(Reply, [force_utf8]), ok = gen_tcp:send(Socket, Packet), {noreply, State} end; +%% pub/sub的消息 +handle_info({tcp, Socket, <>}, State = #state{socket = Socket, controller_process = ControllerPid}) -> + #{<<"topic">> := Topic, <<"content">> := Content} = jiffy:decode(Packet, [return_maps]), + ControllerPid ! {pub, Topic, Content}, + {noreply, State}; + %% 其他消息为非法消息 handle_info({tcp, Socket, Packet}, State = #state{socket = Socket}) -> lager:debug("[efka_client] get unknown packet: ~p", [Packet]), diff --git a/apps/efka/src/efka_subscription.erl b/apps/efka/src/efka_subscription.erl index d43b3cb..a49237a 100644 --- a/apps/efka/src/efka_subscription.erl +++ b/apps/efka/src/efka_subscription.erl @@ -105,7 +105,7 @@ handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers} %% 不需要回复的消息体采用广播的信息 MatchedSubscribers = match_subscribers(Subscribers, Topic), lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> - SubscriberPid ! {topic_broadcast, Content} + SubscriberPid ! {topic_broadcast, Topic, Content} end, MatchedSubscribers), efka_logger:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]), diff --git a/apps/efka/src/efka_tcp_channel.erl b/apps/efka/src/efka_tcp_channel.erl index ddfd8dd..ddf45da 100644 --- a/apps/efka/src/efka_tcp_channel.erl +++ b/apps/efka/src/efka_tcp_channel.erl @@ -20,6 +20,8 @@ -define(SERVER, ?MODULE). +%% 最大的等待时间 +-define(PENDING_TIMEOUT, 10 * 1000). %% 消息类型 %% 服务注册 @@ -29,6 +31,8 @@ %% 上传数据 -define(PACKET_PUSH, 16#03). +-define(PACKET_PUB, 16#04). + -record(state, { packet_id = 1, socket :: gen_tcp:socket(), @@ -36,7 +40,7 @@ service_pid :: undefined | pid(), is_registered = false :: boolean(), - %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}} + %% 请求响应的对应关系, #{packet_id => {ReceiverPid, Ref}}; 自身的inflight需要超时逻辑处理 inflight = #{} }). @@ -93,17 +97,20 @@ handle_call(_Request, _From, State = #state{}) -> {stop, Reason :: term(), NewState :: #state{}}). %% 推送配置项目 handle_cast({push_config, Ref, ReceiverPid, ConfigJson}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson, <<"timeout">> => 5}}, + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"push_config">>, <<"params">> => #{<<"config">> => ConfigJson}}, Packet = jiffy:encode(PushConfig, [force_utf8]), ok = gen_tcp:send(Socket, <>), + erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; +%% 远程调用 handle_cast({invoke, Ref, ReceiverPid, Payload}, State = #state{socket = Socket, packet_id = PacketId, inflight = Inflight}) -> - PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload, <<"timeout">> => 5}}, + PushConfig = #{<<"id">> => PacketId, <<"method">> => <<"invoke">>, <<"params">> => #{<<"payload">> => Payload}}, Packet = jiffy:encode(PushConfig, [force_utf8]), - ok = gen_tcp:send(Socket, <>), + + erlang:start_timer(?PENDING_TIMEOUT, self(), {pending_timeout, PacketId}), {noreply, State#state{packet_id = next_packet_id(PacketId), inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}; handle_cast(_Request, State = #state{}) -> @@ -127,10 +134,59 @@ handle_info({tcp, Socket, <>}, State = #state{so %% 处理client的响应 handle_info({tcp, Socket, <>}, State = #state{socket = Socket, inflight = Inflight}) -> - Response = jiffy:decode(Data, [return_maps]), - NInflight = reply(Response, Inflight), + Resp = jiffy:decode(Data, [return_maps]), + case Resp of + #{<<"id">> := Id, <<"result">> := Result} -> + case maps:take(Id, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), + {noreply, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {ok, Result}}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) + end, + {noreply, State#state{inflight = NInflight}} + end; + #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}} -> + case maps:take(Id, Inflight) of + error -> + lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), + {noreply, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {error, Error}}; + false -> + lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) + end, + {noreply, State#state{inflight = NInflight}} + end + end; - {noreply, State#state{inflight = NInflight}}; +%% 超时逻辑处理 +handle_info({timeout, _, {pending_timeout, Id}}, State = #state{inflight = Inflight}) -> + case maps:take(Id, Inflight) of + error -> + {noreply, State}; + {{ReceiverPid, Ref}, NInflight} -> + case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of + true -> + ReceiverPid ! {channel_reply, Ref, {error, <<"timeout">>}}; + false -> + ok + end, + {noreply, State#state{inflight = NInflight}} + end; + +%% 订阅的消息 +handle_info({topic_broadcast, Topic, Content}, State = #state{socket = Socket}) -> + Packet = jiffy:encode(#{<<"topic">> => Topic, <<"content">> => Content}, [force_utf8]), + ok = gen_tcp:send(Socket, <>), + + {noreply, State}; handle_info(Info, State = #state{}) -> lager:debug("[tcp_channel] get info: ~p", [Info]), @@ -195,6 +251,11 @@ handle_request(#{<<"id">> := 0, <<"method">> := <<"metric_data">>, <<"params">> %% Event事件 handle_request(#{<<"id">> := 0, <<"method">> := <<"event">>, <<"params">> := #{<<"event_type">> := EventType, <<"body">> := Body}}, State = #state{service_pid = ServicePid, is_registered = true}) -> efka_service:send_event(ServicePid, EventType, Body), + {ok, State}; + +%% 订阅事件 +handle_request(#{<<"id">> := 0, <<"method">> := <<"subscribe">>, <<"params">> := #{<<"topic">> := Topic}}, State = #state{is_registered = true}) -> + efka_subscription:subscribe(Topic, self()), {ok, State}. %% 采用32位编码 @@ -221,34 +282,4 @@ json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(M <<"message">> => Message } }, - jiffy:encode(Response, [force_utf8]). - --spec reply(Resp :: map(), Inflight :: map()) -> NInflight :: map(). -reply(Resp = #{<<"id">> := Id, <<"result">> := Result}, Inflight) -> - case maps:take(Id, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), - Inflight; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, {ok, Result}}; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) - end, - NInflight - end; -reply(Resp = #{<<"id">> := Id, <<"error">> := #{<<"code">> := _Code, <<"message">> := Error}}, Inflight) -> - case maps:take(Id, Inflight) of - error -> - lager:warning("[tcp_channel] get unknown publish response message: ~p, packet_id: ~p", [Resp, Id]), - Inflight; - {{ReceiverPid, Ref}, NInflight} -> - case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of - true -> - ReceiverPid ! {channel_reply, Ref, {error, Error}}; - false -> - lager:warning("[tcp_channel] get publish response message: ~p, packet_id: ~p, but receiver_pid is deaded", [Resp, Id]) - end, - NInflight - end. \ No newline at end of file + jiffy:encode(Response, [force_utf8]). \ No newline at end of file