增加stream处理结果

This commit is contained in:
anlicheng 2025-10-31 13:38:31 +08:00
parent 616cad6daf
commit a500d75af2
3 changed files with 10 additions and 10 deletions

View File

@ -41,7 +41,7 @@ receiver_events(TaskId, Req) ->
ok = cowboy_req:stream_body(Body, nofin, Req), ok = cowboy_req:stream_body(Body, nofin, Req),
receiver_events(TaskId, Req); receiver_events(TaskId, Req);
{stream_close, TaskId} -> {stream_close, TaskId, Reason} ->
CloseFrame = iolist_to_binary([<<"event: close\n">>, <<"data: bye\n">>, <<"\n">>]), CloseFrame = iolist_to_binary([<<"event: close\n">>, <<"data: ", Reason/binary, "\n">>, <<"\n">>]),
ok = cowboy_req:stream_body(CloseFrame, fin, Req) ok = cowboy_req:stream_body(CloseFrame, fin, Req)
end. end.

View File

@ -13,7 +13,7 @@
%% API %% API
-export([start_link/0]). -export([start_link/0]).
-export([add_listener/2, stream_data/3, stream_close/1]). -export([add_listener/2, stream_data/3, stream_close/2]).
%% gen_server callbacks %% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
@ -36,9 +36,9 @@ add_listener(ListenerPid, TaskId) when is_pid(ListenerPid), is_integer(TaskId) -
stream_data(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) -> stream_data(TaskId, Type, Stream) when is_integer(TaskId), is_binary(Type), is_binary(Stream) ->
gen_server:cast(?SERVER, {stream_data, TaskId, Type, Stream}). gen_server:cast(?SERVER, {stream_data, TaskId, Type, Stream}).
-spec stream_close(TaskId :: integer()) -> no_return(). -spec stream_close(TaskId :: integer(), Reason :: binary()) -> no_return().
stream_close(TaskId) when is_integer(TaskId) -> stream_close(TaskId, Reason) when is_integer(TaskId), is_binary(Reason) ->
gen_server:cast(?SERVER, {stream_close, TaskId}). gen_server:cast(?SERVER, {stream_close, TaskId, Reason}).
%% @doc Spawns the server and registers the local name (unique) %% @doc Spawns the server and registers the local name (unique)
-spec(start_link() -> -spec(start_link() ->
@ -86,12 +86,12 @@ handle_cast({stream_data, TaskId, Type, Stream}, State = #state{listeners = List
is_process_alive(ListenerPid) andalso ListenerPid ! {stream_data, TaskId, Type, Stream} is_process_alive(ListenerPid) andalso ListenerPid ! {stream_data, TaskId, Type, Stream}
end, end,
{noreply, State}; {noreply, State};
handle_cast({stream_close, TaskId}, State = #state{listeners = Listeners}) -> handle_cast({stream_close, TaskId, Reason}, State = #state{listeners = Listeners}) ->
case maps:find(TaskId, Listeners) of case maps:find(TaskId, Listeners) of
error -> error ->
ok; ok;
{ok, ListenerPid} -> {ok, ListenerPid} ->
is_process_alive(ListenerPid) andalso ListenerPid ! {stream_close, TaskId} is_process_alive(ListenerPid) andalso ListenerPid ! {stream_close, TaskId, Reason}
end, end,
{noreply, State}. {noreply, State}.

View File

@ -139,8 +139,8 @@ handle_info({tcp, Socket, <<?PACKET_CAST, CastBin/binary>>}, State = #state{sock
iot_host:handle(HostPid, {data, Data}); iot_host:handle(HostPid, {data, Data});
#event{} = Event -> #event{} = Event ->
iot_host:handle(HostPid, {event, Event}); iot_host:handle(HostPid, {event, Event});
#task_event_stream{task_id = TaskId, type = <<"close">>, stream = <<>>} -> #task_event_stream{task_id = TaskId, type = <<"close">>, stream = Reason} ->
iot_event_stream_observer:stream_close(TaskId); iot_event_stream_observer:stream_close(TaskId, Reason);
#task_event_stream{task_id = TaskId, type = Type, stream = Stream} -> #task_event_stream{task_id = TaskId, type = Type, stream = Stream} ->
lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream]), lager:debug("[tcp_channel] get task_id: ~p, type: ~ts, stream: ~ts", [TaskId, Type, Stream]),
iot_event_stream_observer:stream_data(TaskId, Type, Stream) iot_event_stream_observer:stream_data(TaskId, Type, Stream)