diff --git a/apps/efka/include/message.hrl b/apps/efka/include/message.hrl index 5bc6e2b..6673fa3 100644 --- a/apps/efka/include/message.hrl +++ b/apps/efka/include/message.hrl @@ -55,6 +55,7 @@ -record(pub, { topic :: binary(), + qos = 0 :: integer(), content :: binary() }). diff --git a/apps/efka/src/docker/docker_manager.erl b/apps/efka/src/docker/docker_manager.erl index a5aec54..fb359cc 100644 --- a/apps/efka/src/docker/docker_manager.erl +++ b/apps/efka/src/docker/docker_manager.erl @@ -192,7 +192,7 @@ handle_info({'DOWN', _Ref, process, TaskPid, Reason}, State = #state{task_map = Error0 -> Error = iolist_to_binary(io_lib:format("~p", [Error0])), efka_remote_agent:task_event_stream(TaskId, <<"error">>, <<"任务失败: "/utf8, Error/binary>>), - efka_remote_agent:close_task_event_stream(TaskId), + efka_remote_agent:close_task_event_stream(TaskId, <<"task exited">>), lager:notice("[docker_manager] task_id: ~p, exit with error: ~p", [TaskId, Error]), ok end, diff --git a/apps/efka/src/efka_remote_agent.erl b/apps/efka/src/efka_remote_agent.erl index 7c31c07..6485960 100644 --- a/apps/efka/src/efka_remote_agent.erl +++ b/apps/efka/src/efka_remote_agent.erl @@ -326,10 +326,10 @@ handle_event(info, {server_cast, #command{command_type = ?COMMAND_AUTH, command end; %% 处理Pub/Sub机制 -handle_event(info, {server_cast, #pub{topic = Topic, content = Content}}, ?STATE_ACTIVATED, State) -> - lager:debug("[efka_remote_agent] get pub topic: ~p, content: ~p", [Topic, Content]), +handle_event(info, {server_cast, #pub{topic = Topic, qos = Qos, content = Content}}, ?STATE_ACTIVATED, State) -> + lager:debug("[efka_remote_agent] get pub topic: ~p, qos: ~p, content: ~p", [Topic, Qos, Content]), %% 消息发送到订阅系统 - efka_subscription:publish(Topic, Content), + efka_subscription:publish(Topic, Qos, Content), {keep_state, State}; %% transport进程退出 diff --git a/apps/efka/src/efka_subscription.erl b/apps/efka/src/efka_subscription.erl index 1b0c244..88eaec6 100644 --- a/apps/efka/src/efka_subscription.erl +++ b/apps/efka/src/efka_subscription.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([subscribe/2, publish/2]). +-export([subscribe/2, publish/3, debug_info/0]). -export([match_components/2, is_valid_components/1, of_components/1]). %% gen_server callbacks @@ -34,7 +34,9 @@ }). -record(state, { - subscribers = [] + subscribers = [], + %% qos未1,并且未被消费的消息 + remaining_messages = [] }). %%%=================================================================== @@ -45,9 +47,13 @@ subscribe(Topic, SubscriberPid) when is_binary(Topic), is_pid(SubscriberPid) -> gen_server:call(?SERVER, {subscribe, Topic, SubscriberPid}). --spec publish(Topic :: binary(), Content :: binary()) -> no_return(). -publish(Topic, Content) when is_binary(Topic), is_binary(Content) -> - gen_server:cast(?SERVER, {publish, Topic, Content}). +-spec publish(Topic :: binary(), Qos :: integer(), Content :: binary()) -> no_return(). +publish(Topic, Qos, Content) when is_binary(Topic), is_integer(Qos), is_binary(Content) -> + gen_server:cast(?SERVER, {publish, Topic, Qos, Content}). + +-spec debug_info() -> {ok, Info :: map()}. +debug_info() -> + gen_server:call(?SERVER, debug_info). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> @@ -78,18 +84,25 @@ init([]) -> {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). %% 同一个SubscriberPid只能订阅同一个topic一次 -handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers = Subscribers}) -> +handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers = Subscribers, remaining_messages = RemainingMessages}) -> Components = of_components(Topic), case is_valid_components(Components) of true -> Sub = #subscriber{topic = Topic, subscriber_pid = SubscriberPid, components = Components, order = order_num(Components)}, %% 建立到SubscriberPid的monitor,进程退出需要清理订阅 erlang:monitor(process, SubscriberPid), - - {reply, ok, State#state{subscribers = Subscribers ++ [Sub]}}; + %% 处理遗留的消息 + RestRemainingMessages = dispatch_remaining_messages(Sub, RemainingMessages), + {reply, ok, State#state{subscribers = Subscribers ++ [Sub], remaining_messages = RestRemainingMessages}}; false -> {reply, {error, <<"invalid topic name">>}, State} - end. + end; +handle_call(debug_info, _From, State = #state{subscribers = Subscribers, remaining_messages = RemainingMessages}) -> + Info = #{ + subscribes => Subscribers, + remaining_messages => RemainingMessages + }, + {reply, {ok, Info}, State}. %% @private %% @doc Handling cast messages @@ -98,14 +111,18 @@ handle_call({subscribe, Topic, SubscriberPid}, _From, State = #state{subscribers {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), NewState :: #state{}}). %% 发布消息 -handle_cast({publish, Topic, Content}, State = #state{subscribers = Subscribers}) -> +handle_cast({publish, Topic, Qos, Content}, State = #state{subscribers = Subscribers, remaining_messages = RemainingMessages}) -> MatchedSubscribers = match_subscribers(Subscribers, Topic), - lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> - SubscriberPid ! {topic_broadcast, Topic, Content} - end, MatchedSubscribers), - lager:debug("[efka_subscription] topic: ~p, content: ~p, match subscribers: ~p", [Topic, Content, MatchedSubscribers]), - {noreply, State}. + case length(MatchedSubscribers) > 0 of + true -> + broadcast(Topic, Content, MatchedSubscribers), + {noreply, State}; + false when Qos =:= 0 -> + {noreply, State}; + false -> + {noreply, State#state{remaining_messages = [{Topic, Content}|RemainingMessages]}} + end. %% @private %% @doc Handling all non call/cast messages @@ -198,4 +215,23 @@ order_num([<<$*>>|_]) -> order_num([<<$+>>|_]) -> 3; order_num([_|Tail]) -> - order_num(Tail). \ No newline at end of file + order_num(Tail). + +broadcast(Topic, Content, MatchedSubscribers) -> + lists:foreach(fun(#subscriber{subscriber_pid = SubscriberPid}) -> + SubscriberPid ! {topic_broadcast, Topic, Content} + end, MatchedSubscribers). + +-spec dispatch_remaining_messages(Subscriber :: #subscriber{}, RemainingMessages :: list()) -> RestRemainingMessages :: list(). +dispatch_remaining_messages(#subscriber{subscriber_pid = SubscriberPid, components = Components}, RemainingMessages) when is_list(RemainingMessages) -> + %% 处理遗留的消息 + lists:foldl(fun({Topic0, Content0}, Acc) -> + Components0 = of_components(Topic0), + case match_components(Components0, Components) of + true -> + SubscriberPid ! {topic_broadcast, Topic0, Content0}, + Acc; + false -> + [{Topic0, Content0}|Acc] + end + end, [], RemainingMessages). \ No newline at end of file diff --git a/apps/efka/src/message/message_codec.erl b/apps/efka/src/message/message_codec.erl index 3de252d..c4c6cc9 100644 --- a/apps/efka/src/message/message_codec.erl +++ b/apps/efka/src/message/message_codec.erl @@ -10,8 +10,10 @@ -author("anlicheng"). -include("message.hrl"). --define(I32, 1). --define(Bytes, 2). +-define(I8, 1). +-define(I16, 2). +-define(I32, 3). +-define(Bytes, 4). %% API -export([encode/2, decode/1]). @@ -43,9 +45,10 @@ encode0(#jsonrpc_reply{result = undefined, error = Error}) -> iolist_to_binary([ marshal(?Bytes, ResultBin) ]); -encode0(#pub{topic = Topic, content = Content}) -> +encode0(#pub{topic = Topic, qos = Qos, content = Content}) -> iolist_to_binary([ marshal(?Bytes, Topic), + marshal(?I8, Qos), marshal(?Bytes, Content) ]); encode0(#command{command_type = CommandType, command = Command}) -> @@ -90,8 +93,8 @@ decode0(?MESSAGE_JSONRPC_REPLY, [ReplyBin]) -> _ -> error end; -decode0(?MESSAGE_PUB, [Topic, Content]) -> - {ok, #pub{topic = Topic, content = Content}}; +decode0(?MESSAGE_PUB, [Topic, Qos, Content]) -> + {ok, #pub{topic = Topic, qos = Qos, content = Content}}; decode0(?MESSAGE_COMMAND, [CommandType, Command]) -> {ok, #command{command_type = CommandType, command = Command}}; decode0(?MESSAGE_AUTH_REPLY, [Code, Payload]) -> @@ -111,6 +114,10 @@ decode0(_, _) -> %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -spec marshal(Type :: integer(), Field :: any()) -> binary(). +marshal(?I8, Field) when is_integer(Field) -> + <>; +marshal(?I16, Field) when is_integer(Field) -> + <>; marshal(?I32, Field) when is_integer(Field) -> <>; marshal(?Bytes, Field) when is_binary(Field) -> @@ -122,6 +129,10 @@ unmarshal(Bin) when is_binary(Bin) -> unmarshal(Bin, []). unmarshal(<<>>, Acc) -> {ok, lists:reverse(Acc)}; +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); +unmarshal(<>, Acc) -> + unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) -> unmarshal(Rest, [F|Acc]); unmarshal(<>, Acc) ->