From 1746a025edfed666cd63f862d0c0ae5ae0f35f96 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Thu, 13 Nov 2025 16:58:21 +0800 Subject: [PATCH] fix file upload --- apps/efka/src/channel/ws_channel.erl | 95 +++++++++++++++- apps/efka/src/efka_stream.erl | 164 +++++++++++++++++++++++++++ 2 files changed, 258 insertions(+), 1 deletion(-) create mode 100644 apps/efka/src/efka_stream.erl diff --git a/apps/efka/src/channel/ws_channel.erl b/apps/efka/src/channel/ws_channel.erl index b0dfa4a..dcb6b2b 100644 --- a/apps/efka/src/channel/ws_channel.erl +++ b/apps/efka/src/channel/ws_channel.erl @@ -20,6 +20,11 @@ -record(state, { service_id :: undefined | binary(), service_pid :: undefined | pid(), + + stream_id = 1, + %% #{stream_id => {StreamPid, StreamRef}} + stream_map = #{}, + is_registered = false :: boolean() }). @@ -61,6 +66,41 @@ websocket_info({'DOWN', _Ref, process, ServicePid, Reason}, State = #state{servi lager:debug("[ws_channel] container_pid: ~p, exited: ~p", [ServicePid, Reason]), {stop, State#state{service_pid = undefined}}; +%% stream进程关闭 +websocket_info({'DOWN', _Ref, process, StreamPid, Reason}, State = #state{stream_map = StreamMap}) -> + case search_stream_id(StreamPid, StreamMap) of + error -> + {ok, State}; + {ok, StreamId} -> + case Reason of + normal -> + {ok, State#state{stream_map = maps:remove(StreamId, StreamMap)}}; + _ -> + PushReply = json_push(#{ + <<"stream_reply">> => #{ + <<"stream_id">> => StreamId, + <<"result">> => <<"task failed">> + } + }), + {reply, {text, PushReply}, State#state{stream_map = maps:remove(StreamId, StreamMap)}} + end + end; + +%% stream任务完成 +websocket_info({stream_reply, StreamPid, Reply}, State = #state{stream_map = StreamMap}) -> + case search_stream_id(StreamPid, StreamMap) of + error -> + {ok, State}; + {ok, StreamId} -> + PushReply = json_push(#{ + <<"stream_reply">> => #{ + <<"stream_id">> => StreamId, + <<"result">> => Reply + } + }), + {reply, {text, PushReply}, State} + end; + %% 处理关闭信号 websocket_info({stop, Reason}, State) -> lager:debug("[ws_channel] the channel will be closed with reason: ~p", [Reason]), @@ -122,6 +162,35 @@ handle_request(#{<<"id">> := Id, <<"method">> := <<"subscribe">>, <<"params">> : end, {reply, {text, Reply}, State}; +%% 文件上传 +handle_request(#{<<"id">> := Id, <<"method">> := <<"new_stream">>, + <<"params">> := #{<<"file_name">> := Filename0, <<"file_size">> := FileSize}}, State = #state{stream_id = StreamId, stream_map = StreamMap, is_registered = true}) -> + Filename = filename:basename(binary_to_list(Filename0)), + + {ok, {StreamPid, StreamRef}} = efka_stream:start_monitor(self()), + {ok, Path} = efka_stream:setup(StreamPid, Filename, FileSize), + + Reply = json_result(Id, #{ + <<"stream_id">> => StreamId, + <<"path">> => Path + }), + {reply, {text, Reply}, State#state{stream_id = StreamId + 1, stream_map = maps:put(StreamId, {StreamPid, StreamRef}, StreamMap)}}; + +handle_request(#{<<"method">> := <<"stream_chunk">>, + <<"params">> := #{<<"stream_id">> := StreamId, <<"chunk_data">> := ChunkData}}, State = #state{stream_map = StreamMap, is_registered = true}) -> + case maps:find(StreamId, StreamMap) of + error -> + {ok, State}; + {ok, StreamPid} -> + case ChunkData =:= <<>> of + true -> + efka_stream:finish(StreamPid); + false -> + efka_stream:data(StreamPid, ChunkData) + end, + {ok, State} + end; + %% 数据项 handle_request(#{<<"method">> := <<"metric_data">>, <<"params">> := #{<<"device_uuid">> := DeviceUUID, <<"route_key">> := RouteKey, <<"metric">> := Metric}}, State = #state{service_pid = ServicePid, is_registered = true}) -> @@ -147,4 +216,28 @@ json_error(Id, Code, Message) when is_integer(Id), is_integer(Code), is_binary(M <<"id">> => Id, <<"error">> => #{<<"code">> => Code, <<"message">> => Message} }, - jiffy:encode(Response, [force_utf8]). \ No newline at end of file + jiffy:encode(Response, [force_utf8]). + +-spec json_push(Result :: term()) -> binary(). +json_push(Result) -> + Response = #{ + <<"push">> => Result + }, + jiffy:encode(Response, [force_utf8]). + +-spec search_stream_id(StreamPid :: pid(), StreamMap :: map()) -> error | {ok, StreamId :: integer()}. +search_stream_id(StreamPid, StreamMap) when is_pid(StreamPid), is_map(StreamMap) -> + StreamIds = lists:filtermap(fun({StreamId, {StreamPid0, _}}) -> + case StreamPid0 =:= StreamPid of + true -> + {true, StreamId}; + false -> + false + end + end, maps:to_list(StreamMap)), + case StreamIds of + [] -> + error; + [StreamId|_] -> + {ok, StreamId} + end. diff --git a/apps/efka/src/efka_stream.erl b/apps/efka/src/efka_stream.erl new file mode 100644 index 0000000..b3541ae --- /dev/null +++ b/apps/efka/src/efka_stream.erl @@ -0,0 +1,164 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 13. 11月 2025 10:57 +%%%------------------------------------------------------------------- +-module(efka_stream). +-author("anlicheng"). + +-behaviour(gen_server). + +%% API +-export([start_monitor/1]). +-export([setup/3, data/2, finish/1]). + +%% gen_server callbacks +-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). + +-define(SERVER, ?MODULE). + +-record(state, { + parent_pid :: pid(), + ref :: reference(), + file_size = 0 :: integer(), + acc_size = 0 :: integer(), + real_file :: undefined | string(), + io_device :: undefined | file:fd() +}). + +%%%=================================================================== +%%% API +%%%=================================================================== + +-spec setup(StreamPid :: pid(), FileName :: string(), FileSize :: integer()) -> Path :: string(). +setup(StreamPid, FileName, FileSize) when is_pid(StreamPid), is_list(FileName), is_integer(FileSize) -> + gen_server:call(StreamPid, {setup, FileName, FileSize}). + +-spec data(StreamPid :: pid(), ChunkData :: binary()) -> no_return(). +data(StreamPid, ChunkData) when is_pid(StreamPid), is_binary(ChunkData) -> + gen_server:cast(StreamPid, {data, ChunkData}). + +-spec finish(StreamPid :: pid()) -> no_return(). +finish(StreamPid) when is_pid(StreamPid) -> + gen_server:cast(StreamPid, finish). + +%% @doc Spawns the server and registers the local name (unique) +-spec(start_monitor(ParentPid :: pid()) -> + {ok, {Pid :: pid(), MonRef :: reference()}} | ignore | {error, Reason :: term()}). +start_monitor(ParentPid) when is_pid(ParentPid) -> + gen_server:start_monitor(?MODULE, [ParentPid], []). + +%%%=================================================================== +%%% gen_server callbacks +%%%=================================================================== + +%% @private +%% @doc Initializes the server +-spec(init(Args :: term()) -> + {ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} | + {stop, Reason :: term()} | ignore). +init([ParentPid]) -> + Ref = erlang:monitor(process, ParentPid), + {ok, #state{parent_pid = ParentPid, ref = Ref}}. + +%% @private +%% @doc Handling call messages +-spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, + State :: #state{}) -> + {reply, Reply :: term(), NewState :: #state{}} | + {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_call({setup, FileName, FileSize}, _From, State = #state{}) -> + {RealFileName, Path} = make_file(filename:basename(FileName)), + {ok, IoDevice} = file:open(RealFileName, [write]), + + {reply, {ok, Path}, State#state{io_device = IoDevice, real_file = RealFileName, file_size = FileSize, acc_size = 0}}. + +%% @private +%% @doc Handling cast messages +-spec(handle_cast(Request :: term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_cast({data, ChunkData}, State = #state{io_device = IoDevice, acc_size = AccSize}) -> + Data = base64:decode(ChunkData), + Len = byte_size(Data), + + ok = file:write(IoDevice, Data), + {noreply, State#state{acc_size = AccSize + Len}}; +handle_cast(finish, State = #state{parent_pid = ParentPid, io_device = IoDevice, acc_size = AccSize, file_size = FileSize, real_file = RealFile}) -> + case AccSize == FileSize of + true -> + ok = file:close(IoDevice), + ParentPid ! {stream_reply, self(), ok}; + false -> + ok = file:close(IoDevice), + ok = file:delete(RealFile), + ParentPid ! {stream_reply, self(), invalid} + end, + {stop, normal, State}. + +%% @private +%% @doc Handling all non call/cast messages +-spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> + {noreply, NewState :: #state{}} | + {noreply, NewState :: #state{}, timeout() | hibernate} | + {stop, Reason :: term(), NewState :: #state{}}). +handle_info({'DOWN', Ref, process, Pid, normal}, State = #state{ref = Ref, parent_pid = Pid}) -> + {noreply, State}; +handle_info({'DOWN', Ref, process, Pid, Reason}, State = #state{ref = Ref, parent_pid = Pid, io_device = IoDevice, real_file = RealFile}) -> + lager:debug("[efka_stream] ws_channel close with reason: ~p", [Reason]), + case IoDevice =:= undefined of + true -> + ok; + false -> + ok = file:close(IoDevice), + RealFile /= undefined andalso file:delete(RealFile) + end, + {stop, normal, State}; +handle_info(_Info, State = #state{}) -> + {noreply, State}. + +%% @private +%% @doc This function is called by a gen_server when it is about to +%% terminate. It should be the opposite of Module:init/1 and do any +%% necessary cleaning up. When it returns, the gen_server terminates +%% with Reason. The return value is ignored. +-spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), + State :: #state{}) -> term()). +terminate(_Reason, _State = #state{}) -> + ok. + +%% @private +%% @doc Convert process state when code is changed +-spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, + Extra :: term()) -> + {ok, NewState :: #state{}} | {error, Reason :: term()}). +code_change(_OldVsn, State = #state{}, _Extra) -> + {ok, State}. + +%%%=================================================================== +%%% Internal functions +%%%=================================================================== + +-spec make_file(Basename :: string()) -> {string(), string()}. +make_file(Basename) when is_list(Basename) -> + {ok, UploadDir} = application:get_env(efka, upload_dir), + {{Y, M, D}, _} = calendar:local_time(), + DateDir = io_lib:format("~p-~p-~p", [Y, M, D]), + BaseDir = UploadDir ++ DateDir, + case filelib:is_dir(BaseDir) of + true -> + ok; + false -> + ok = file:make_dir(BaseDir) + end, + Path = DateDir ++ "/" ++ Basename, + + {UploadDir ++ Path, Path}. \ No newline at end of file