fix file upload

This commit is contained in:
anlicheng 2025-11-13 16:58:21 +08:00
parent 5c8bce6b4a
commit 1746a025ed
2 changed files with 258 additions and 1 deletions

View File

@ -20,6 +20,11 @@
-record(state, {
service_id :: undefined | binary(),
service_pid :: undefined | pid(),
stream_id = 1,
%% #{stream_id => {StreamPid, StreamRef}}
stream_map = #{},
is_registered = false :: boolean()
}).
@ -61,6 +66,41 @@ websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{servi
lager:debug("[ws_channel] container_pid: ~p, exited: ~p", [ServicePid, Reason]),
{stop, State#state{service_pid = undefined}};
%% stream进程关闭
websocket_info({'DOWN', _Ref, process, StreamPid, Reason}, State = #state{stream_map = StreamMap}) ->
case search_stream_id(StreamPid, StreamMap) of
error ->
{ok, State};
{ok, StreamId} ->
case Reason of
normal ->
{ok, State#state{stream_map = maps:remove(StreamId, StreamMap)}};
_ ->
PushReply = json_push(#{
<<"stream_reply">> => #{
<<"stream_id">> => StreamId,
<<"result">> => <<"task failed">>
}
}),
{reply, {text, PushReply}, State#state{stream_map = maps:remove(StreamId, StreamMap)}}
end
end;
%% stream任务完成
websocket_info({stream_reply, StreamPid, Reply}, State = #state{stream_map = StreamMap}) ->
case search_stream_id(StreamPid, StreamMap) of
error ->
{ok, State};
{ok, StreamId} ->
PushReply = json_push(#{
<<"stream_reply">> => #{
<<"stream_id">> => StreamId,
<<"result">> => Reply
}
}),
{reply, {text, PushReply}, State}
end;
%%
websocket_info({stop, Reason}, State) ->
lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]),
@ -122,6 +162,35 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"subscribe">>, <<"params">> :
end,
{reply, {text, Reply}, State};
%%
handle_request(#{<<"id">> := Id, <<"method">> := <<"new_stream">>,
<<"params">> := #{<<"file_name">> := Filename0, <<"file_size">> := FileSize}}, State = #state{stream_id = StreamId, stream_map = StreamMap, is_registered = true}) ->
Filename = filename:basename(binary_to_list(Filename0)),
{ok, {StreamPid, StreamRef}} = efka_stream:start_monitor(self()),
{ok, Path} = efka_stream:setup(StreamPid, Filename, FileSize),
Reply = json_result(Id, #{
<<"stream_id">> => StreamId,
<<"path">> => Path
}),
{reply, {text, Reply}, State#state{stream_id = StreamId + 1, stream_map = maps:put(StreamId, {StreamPid, StreamRef}, StreamMap)}};
handle_request(#{<<"method">> := <<"stream_chunk">>,
<<"params">> := #{<<"stream_id">> := StreamId, <<"chunk_data">> := ChunkData}}, State = #state{stream_map = StreamMap, is_registered = true}) ->
case maps:find(StreamId, StreamMap) of
error ->
{ok, State};
{ok, StreamPid} ->
case ChunkData =:= <<>> of
true ->
efka_stream:finish(StreamPid);
false ->
efka_stream:data(StreamPid, ChunkData)
end,
{ok, State}
end;
%%
handle_request(#{<<"method">> := <<"metric_data">>,
<<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) ->
@ -147,4 +216,28 @@ json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(M
<<"id">> => Id,
<<"error">> => #{<<"code">> => Code, <<"message">> => Message}
},
jiffy:encode(Response, [force_utf8]).
jiffy:encode(Response, [force_utf8]).
-spec json_push(Result :: term()) -> binary().
json_push(Result) ->
Response = #{
<<"push">> => Result
},
jiffy:encode(Response, [force_utf8]).
-spec search_stream_id(StreamPid :: pid(), StreamMap :: map()) -> error | {ok, StreamId :: integer()}.
search_stream_id(StreamPid, StreamMap) when is_pid(StreamPid), is_map(StreamMap) ->
StreamIds = lists:filtermap(fun({StreamId, {StreamPid0, _}}) ->
case StreamPid0 =:= StreamPid of
true ->
{true, StreamId};
false ->
false
end
end, maps:to_list(StreamMap)),
case StreamIds of
[] ->
error;
[StreamId|_] ->
{ok, StreamId}
end.

View File

@ -0,0 +1,164 @@
%%%-------------------------------------------------------------------
%%% @author anlicheng
%%% @copyright (C) 2025, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 13. 11 2025 10:57
%%%-------------------------------------------------------------------
-module(efka_stream).
-author("anlicheng").
-behaviour(gen_server).
%% API
-export([start_monitor/1]).
-export([setup/3, data/2, finish/1]).
%% gen_server callbacks
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
-define(SERVER, ?MODULE).
-record(state, {
parent_pid :: pid(),
ref :: reference(),
file_size = 0 :: integer(),
acc_size = 0 :: integer(),
real_file :: undefined | string(),
io_device :: undefined | file:fd()
}).
%%%===================================================================
%%% API
%%%===================================================================
-spec setup(StreamPid :: pid(), FileName :: string(), FileSize :: integer()) -> Path :: string().
setup(StreamPid, FileName, FileSize) when is_pid(StreamPid), is_list(FileName), is_integer(FileSize) ->
gen_server:call(StreamPid, {setup, FileName, FileSize}).
-spec data(StreamPid :: pid(), ChunkData :: binary()) -> no_return().
data(StreamPid, ChunkData) when is_pid(StreamPid), is_binary(ChunkData) ->
gen_server:cast(StreamPid, {data, ChunkData}).
-spec finish(StreamPid :: pid()) -> no_return().
finish(StreamPid) when is_pid(StreamPid) ->
gen_server:cast(StreamPid, finish).
%% @doc Spawns the server and registers the local name (unique)
-spec(start_monitor(ParentPid :: pid()) ->
{ok, {Pid :: pid(), MonRef :: reference()}} | ignore | {error, Reason :: term()}).
start_monitor(ParentPid) when is_pid(ParentPid) ->
gen_server:start_monitor(?MODULE, [ParentPid], []).
%%%===================================================================
%%% gen_server callbacks
%%%===================================================================
%% @private
%% @doc Initializes the server
-spec(init(Args :: term()) ->
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
{stop, Reason :: term()} | ignore).
init([ParentPid]) ->
Ref = erlang:monitor(process, ParentPid),
{ok, #state{parent_pid = ParentPid, ref = Ref}}.
%% @private
%% @doc Handling call messages
-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()},
State :: #state{}) ->
{reply, Reply :: term(), NewState :: #state{}} |
{reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} |
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_call({setup, FileName, FileSize}, _From, State = #state{}) ->
{RealFileName, Path} = make_file(filename:basename(FileName)),
{ok, IoDevice} = file:open(RealFileName, [write]),
{reply, {ok, Path}, State#state{io_device = IoDevice, real_file = RealFileName, file_size = FileSize, acc_size = 0}}.
%% @private
%% @doc Handling cast messages
-spec(handle_cast(Request :: term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_cast({data, ChunkData}, State = #state{io_device = IoDevice, acc_size = AccSize}) ->
Data = base64:decode(ChunkData),
Len = byte_size(Data),
ok = file:write(IoDevice, Data),
{noreply, State#state{acc_size = AccSize + Len}};
handle_cast(finish, State = #state{parent_pid = ParentPid, io_device = IoDevice, acc_size = AccSize, file_size = FileSize, real_file = RealFile}) ->
case AccSize == FileSize of
true ->
ok = file:close(IoDevice),
ParentPid ! {stream_reply, self(), ok};
false ->
ok = file:close(IoDevice),
ok = file:delete(RealFile),
ParentPid ! {stream_reply, self(), invalid}
end,
{stop, normal, State}.
%% @private
%% @doc Handling all non call/cast messages
-spec(handle_info(Info :: timeout() | term(), State :: #state{}) ->
{noreply, NewState :: #state{}} |
{noreply, NewState :: #state{}, timeout() | hibernate} |
{stop, Reason :: term(), NewState :: #state{}}).
handle_info({'DOWN', Ref, process, Pid, normal}, State = #state{ref = Ref, parent_pid = Pid}) ->
{noreply, State};
handle_info({'DOWN', Ref, process, Pid, Reason}, State = #state{ref = Ref, parent_pid = Pid, io_device = IoDevice, real_file = RealFile}) ->
lager:debug("[efka_stream] ws_channel close with reason: ~p", [Reason]),
case IoDevice =:= undefined of
true ->
ok;
false ->
ok = file:close(IoDevice),
RealFile /= undefined andalso file:delete(RealFile)
end,
{stop, normal, State};
handle_info(_Info, State = #state{}) ->
{noreply, State}.
%% @private
%% @doc This function is called by a gen_server when it is about to
%% terminate. It should be the opposite of Module:init/1 and do any
%% necessary cleaning up. When it returns, the gen_server terminates
%% with Reason. The return value is ignored.
-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()),
State :: #state{}) -> term()).
terminate(_Reason, _State = #state{}) ->
ok.
%% @private
%% @doc Convert process state when code is changed
-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{},
Extra :: term()) ->
{ok, NewState :: #state{}} | {error, Reason :: term()}).
code_change(_OldVsn, State = #state{}, _Extra) ->
{ok, State}.
%%%===================================================================
%%% Internal functions
%%%===================================================================
-spec make_file(Basename :: string()) -> {string(), string()}.
make_file(Basename) when is_list(Basename) ->
{ok, UploadDir} = application:get_env(efka, upload_dir),
{{Y, M, D}, _} = calendar:local_time(),
DateDir = io_lib:format("~p-~p-~p", [Y, M, D]),
BaseDir = UploadDir ++ DateDir,
case filelib:is_dir(BaseDir) of
true ->
ok;
false ->
ok = file:make_dir(BaseDir)
end,
Path = DateDir ++ "/" ++ Basename,
{UploadDir ++ Path, Path}.