diff --git a/apps/efka/src/channel/upload_channel.erl b/apps/efka/src/channel/upload_channel.erl new file mode 100644 index 0000000..ec3769a --- /dev/null +++ b/apps/efka/src/channel/upload_channel.erl @@ -0,0 +1,79 @@ +%%%------------------------------------------------------------------- +%%% @author anlicheng +%%% @copyright (C) 2025, +%%% @doc +%%% +%%% @end +%%% Created : 12. 11月 2025 17:08 +%%%------------------------------------------------------------------- +-module(upload_channel). +-author("anlicheng"). + +%% API +-export([init/2]). + +init(Req0, Opts) -> + Method = binary_to_list(cowboy_req:method(Req0)), + lager:debug("[upload_channel] method is: ~p", [Method]), + + Headers = cowboy_req:headers(Req0), + lager:debug("headers is: ~p", [Headers]), + case maps:find(<<"content-type">>, Headers) of + {ok, <<"application/octet-stream">>} -> + Filename = maps:get(<<"x-filename">>, Headers), + case filename:extension(Filename) of + <<>> -> + Req = cowboy_req:reply(400, #{ + <<"Content-Type">> => <<"text/html;charset=utf-8">> + }, <<"Miss file extension">>, Req0), + {ok, Req, Opts}; + _ -> + Basename = filename:basename(Filename), + handle_raw_file(Req0, binary_to_list(Basename)), + Req2 = cowboy_req:reply(400, #{ + <<"Content-Type">> => <<"text/html;charset=utf-8">> + }, <<"ok">>, Req0), + {ok, Req2, Opts} + end; + {ok, ContentType} -> + lager:debug("[upload_channel] unexpect content-type: ~p", [ContentType]), + Req = cowboy_req:reply(400, #{ + <<"Content-Type">> => <<"text/html;charset=utf-8">> + }, <<"Expected application/octet-stream">>, Req0), + {ok, Req, Opts}; + error -> + Req = cowboy_req:reply(400, #{ + <<"Content-Type">> => <<"text/html;charset=utf-8">> + }, <<"Miss content-type header">>, Req0), + {ok, Req, Opts} + end. + +%% 读取请求体 +handle_raw_file(Req, Basename) -> + Filename = make_file(Basename), + {ok, IoDevice} = file:open(Filename, [write]), + ok = handle_raw_file0(Req, IoDevice), + ok = file:close(IoDevice). + +handle_raw_file0(Req, IoDevice) -> + case cowboy_req:read_body(Req) of + {ok, Data, Req1} -> + file:write(IoDevice, Data), + ok; + {more, Data, Req1} -> + file:write(IoDevice, Data), + handle_raw_file0(Req1, IoDevice) + end. + +make_file(Basename) when is_list(Basename) -> + {ok, UploadDir} = application:get_env(efka, upload_dir), + {{Y, M, D}, _} = calendar:local_time(), + DateDir = io_lib:format("~p-~p-~p", [Y, M, D]), + BaseDir = UploadDir ++ DateDir, + case filelib:is_dir(BaseDir) of + true -> + ok; + false -> + ok = file:make_dir(BaseDir) + end, + BaseDir ++ "/" ++ Basename. diff --git a/apps/efka/src/ws_channel.erl b/apps/efka/src/channel/ws_channel.erl similarity index 99% rename from apps/efka/src/ws_channel.erl rename to apps/efka/src/channel/ws_channel.erl index 2eec84b..b0dfa4a 100644 --- a/apps/efka/src/ws_channel.erl +++ b/apps/efka/src/channel/ws_channel.erl @@ -45,7 +45,7 @@ websocket_handle({text, Data}, State) -> websocket_handle(Info, State) -> lager:error("[ws_channel] get a unknown message: ~p, channel will closed", [Info]), - {stop, State}. + {ok, State}. %% 订阅的消息 websocket_info({topic_broadcast, Topic, Content}, State = #state{}) -> diff --git a/apps/efka/src/efka_app.erl b/apps/efka/src/efka_app.erl index e9fd753..2fb66df 100644 --- a/apps/efka/src/efka_app.erl +++ b/apps/efka/src/efka_app.erl @@ -11,9 +11,10 @@ start(_StartType, _StartArgs) -> io:setopts([{encoding, unicode}]), + ensure_upload_dir(), %% 加速内存的回收 erlang:system_flag(fullsweep_after, 16), - start_ws_server(), + start_http_server(), efka_sup:start_link(). @@ -21,15 +22,19 @@ stop(_State) -> ok. %% 微服务和efka之间通过websocket协议通讯 -start_ws_server() -> - {ok, Props} = application:get_env(efka, ws_server), +start_http_server() -> + {ok, Props} = application:get_env(efka, http_server), Acceptors = proplists:get_value(acceptors, Props, 50), MaxConnections = proplists:get_value(max_connections, Props, 10240), Backlog = proplists:get_value(backlog, Props, 1024), Port = proplists:get_value(port, Props), Dispatcher = cowboy_router:compile([ - {'_', [{"/ws", ws_channel, []}]} + {'_', [ + {"/ws", ws_channel, []}, + {"/files/[...]", cowboy_static, {dir, "/usr/local/code/downloads"}}, + {"/upload", upload_channel, []} + ]} ]), TransOpts = [ @@ -40,4 +45,13 @@ start_ws_server() -> ], {ok, Pid} = cowboy:start_clear(ws_listener, TransOpts, #{env => #{dispatch => Dispatcher}}), - lager:debug("[efka_app] websocket server start at: ~p, pid is: ~p", [Port, Pid]). \ No newline at end of file + lager:debug("[efka_app] websocket server start at: ~p, pid is: ~p", [Port, Pid]). + +ensure_upload_dir() -> + {ok, UploadDir} = application:get_env(efka, upload_dir), + case filelib:is_dir(UploadDir) of + true -> + ok; + false -> + ok = file:make_dir(UploadDir) + end. \ No newline at end of file diff --git a/apps/efka/src/efka_sup.erl b/apps/efka/src/efka_sup.erl index 37b1e52..31a37ca 100644 --- a/apps/efka/src/efka_sup.erl +++ b/apps/efka/src/efka_sup.erl @@ -89,17 +89,17 @@ init([]) -> shutdown => 2000, type => worker, modules => ['docker_manager'] - }, - - #{ - id => 'efka_remote_agent', - start => {'efka_remote_agent', start_link, []}, - restart => permanent, - shutdown => 2000, - type => worker, - modules => ['efka_remote_agent'] } + %#{ + % id => 'efka_remote_agent', + % start => {'efka_remote_agent', start_link, []}, + % restart => permanent, + % shutdown => 2000, + % type => worker, + % modules => ['efka_remote_agent'] + %} + ], {ok, {SupFlags, ChildSpecs}}. diff --git a/config/sys.config b/config/sys.config index 844c2e2..d6f8714 100644 --- a/config/sys.config +++ b/config/sys.config @@ -4,11 +4,13 @@ {dets_dir, "/usr/local/code/tmp/dets/"}, + {upload_dir, "/usr/local/code/tmp/upload/"}, + {tcp_server, [ {port, 18088} ]}, - {ws_server, [ + {http_server, [ {port, 18080}, {acceptors, 10}, {max_connections, 1024},