diff --git a/apps/iot/src/http_handlers/event_stream_handler.erl b/apps/iot/src/http_handlers/event_stream_handler.erl index 5ca6bbb..048d3ad 100644 --- a/apps/iot/src/http_handlers/event_stream_handler.erl +++ b/apps/iot/src/http_handlers/event_stream_handler.erl @@ -41,7 +41,7 @@ receiver_events(TaskId, Req) -> ok = cowboy_req:stream_body(Body, nofin, Req), receiver_events(TaskId, Req); - {stream_close, TaskId} -> - CloseFrame = iolist_to_binary([<<"event: close\n">>, <<"data: bye\n">>, <<"\n">>]), + {stream_close, TaskId, Reason} -> + CloseFrame = iolist_to_binary([<<"event: close\n">>, <<"data: ", Reason/binary, "\n">>, <<"\n">>]), ok = cowboy_req:stream_body(CloseFrame, fin, Req) end. \ No newline at end of file diff --git a/apps/iot/src/iot_event_stream_observer.erl b/apps/iot/src/iot_event_stream_observer.erl index 45bc14d..043b90d 100644 --- a/apps/iot/src/iot_event_stream_observer.erl +++ b/apps/iot/src/iot_event_stream_observer.erl @@ -13,7 +13,7 @@ %% API -export([start_link/0]). --export([add_listener/2, stream_data/3, stream_close/1]). +-export([add_listener/2, stream_data/3, stream_close/2]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -36,9 +36,9 @@ add_listener(ListenerPid, TaskId) when is_pid(ListenerPid), is_integer(TaskId) - stream_data(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> gen_server:cast(?SERVER, {stream_data, TaskId, Type, Stream}). --spec stream_close(TaskId :: integer()) -> no_return(). -stream_close(TaskId) when is_integer(TaskId) -> - gen_server:cast(?SERVER, {stream_close, TaskId}). +-spec stream_close(TaskId :: integer(), Reason :: binary()) -> no_return(). +stream_close(TaskId, Reason) when is_integer(TaskId), is_binary(Reason) -> + gen_server:cast(?SERVER, {stream_close, TaskId, Reason}). %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> @@ -86,12 +86,12 @@ handle_cast({stream_data, TaskId, Type, Stream}, State = #state{listeners = List is_process_alive(ListenerPid) andalso ListenerPid ! {stream_data, TaskId, Type, Stream} end, {noreply, State}; -handle_cast({stream_close, TaskId}, State = #state{listeners = Listeners}) -> +handle_cast({stream_close, TaskId, Reason}, State = #state{listeners = Listeners}) -> case maps:find(TaskId, Listeners) of error -> ok; {ok, ListenerPid} -> - is_process_alive(ListenerPid) andalso ListenerPid ! {stream_close, TaskId} + is_process_alive(ListenerPid) andalso ListenerPid ! {stream_close, TaskId, Reason} end, {noreply, State}. diff --git a/apps/iot/src/tcp/tcp_channel.erl b/apps/iot/src/tcp/tcp_channel.erl index ec9921e..de9cdfc 100644 --- a/apps/iot/src/tcp/tcp_channel.erl +++ b/apps/iot/src/tcp/tcp_channel.erl @@ -139,8 +139,8 @@ handle_info({tcp, Socket, <>}, State = #state{sock iot_host:handle(HostPid, {data, Data}); #event{} = Event -> iot_host:handle(HostPid, {event, Event}); - #task_event_stream{task_id = TaskId, type = <<"close">>, stream = <<>>} -> - iot_event_stream_observer:stream_close(TaskId); + #task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} -> + iot_event_stream_observer:stream_close(TaskId, Reason); #task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream]), iot_event_stream_observer:stream_data(TaskId, Type, Stream)