From d7e2cbf5c41b8a28b1aad832ca222cd9d3a83560 Mon Sep 17 00:00:00 2001 From: anlicheng <244108715@qq.com> Date: Fri, 19 Sep 2025 15:43:49 +0800 Subject: [PATCH] fix --- backup/endpoint_mnesia.erl | 97 --------------- backup/endpoint_mysql.erl | 214 -------------------------------- backup/service_config_model.erl | 121 ------------------ backup/service_handler.erl | 176 -------------------------- rebar.config | 1 - rebar.lock | 99 --------------- 6 files changed, 708 deletions(-) delete mode 100644 backup/endpoint_mnesia.erl delete mode 100644 backup/endpoint_mysql.erl delete mode 100644 backup/service_config_model.erl delete mode 100644 backup/service_handler.erl delete mode 100644 rebar.lock diff --git a/backup/endpoint_mnesia.erl b/backup/endpoint_mnesia.erl deleted file mode 100644 index a1abbdb..0000000 --- a/backup/endpoint_mnesia.erl +++ /dev/null @@ -1,97 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author anlicheng -%%% @copyright (C) 2025, -%%% @doc -%%% -%%% @end -%%% Created : 12. 8月 2025 15:12 -%%%------------------------------------------------------------------- --module(endpoint_mnesia). --author("aresei"). --include("endpoint.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --define(TAB, endpoint). - -%% API --export([create_table/0]). --export([insert/1, delete/1, check_name/1]). --export([get_endpoint/1]). --export([as_map/1]). - -create_table() -> - %% id生成器 - mnesia:create_table(endpoint, [ - {attributes, record_info(fields, endpoint)}, - {record_name, endpoint}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - --spec check_name(Name :: binary()) -> boolean() | {error, Reason :: any()}. -check_name(Name) when is_binary(Name) -> - Fun = fun() -> - Q = qlc:q([E || E <- mnesia:table(?TAB), E#endpoint.name =:= Name]), - case qlc:e(Q) of - [] -> - false; - [_|_] -> - true - end - end, - case mnesia:transaction(Fun) of - {'atomic', Res} -> - Res; - {'aborted', Reason} -> - {error, Reason} - end. - --spec get_endpoint(Id :: integer()) -> error | {ok, Endpoint :: #endpoint{}}. -get_endpoint(Id) when is_integer(Id) -> - case mnesia:dirty_read(?TAB, Id) of - [] -> - error; - [Endpoint | _] -> - {ok, Endpoint} - end. - --spec insert(Endpoint :: #endpoint{}) -> ok | {error, Reason :: term()}. -insert(Endpoint = #endpoint{}) -> - case mnesia:transaction(fun() -> mnesia:write(?TAB, Endpoint, write) end) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec delete(Id :: integer()) -> ok | {error, Reason :: any()}. -delete(Id) when is_integer(Id) -> - case mnesia:transaction(fun() -> mnesia:delete(?TAB, Id, write) end) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec as_map(Endpoint :: #endpoint{}) -> map(). -as_map(#endpoint{id = Id, name = Name, title = Title, config = Config, updated_at = UpdateTs, created_at = CreateTs}) -> - {ConfigKey, ConfigMap} = - case Config of - #http_endpoint{url = Url, pool_size = PoolSize} -> - {<<"http">>, #{<<"url">> => Url, <<"pool_size">> => PoolSize}}; - #mqtt_endpoint{host = Host, port = Port, client_id = ClientId, username = Username, password = Password, topic = Topic, qos = Qos} -> - {<<"mqtt">>, #{<<"host">> => Host, <<"port">> => Port, <<"client_id">> => ClientId, <<"username">> => Username, <<"password">> => Password, <<"topic">> => Topic, <<"qos">> => Qos}}; - #kafka_endpoint{username = Username, password = Password, bootstrap_servers = BootstrapServers, topic = Topic} -> - {<<"kafka">>, #{<<"username">> => Username, <<"password">> => Password, <<"bootstrap_servers">> => BootstrapServers, <<"topic">> => Topic}}; - #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database, table_name = TableName} -> - {<<"mysql">>, #{<<"host">> => Host, <<"port">> => Port, <<"username">> => Username, <<"password">> => Password, <<"database">> => Database, <<"table_name">> => TableName}} - end, - - Map = #{ - <<"id">> => Id, - <<"name">> => Name, - <<"title">> => Title, - <<"update_ts">> => UpdateTs, - <<"create_ts">> => CreateTs - }, - Map#{ConfigKey => ConfigMap}. \ No newline at end of file diff --git a/backup/endpoint_mysql.erl b/backup/endpoint_mysql.erl deleted file mode 100644 index c9b8a93..0000000 --- a/backup/endpoint_mysql.erl +++ /dev/null @@ -1,214 +0,0 @@ - -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 06. 7月 2023 12:02 -%%%------------------------------------------------------------------- --module(endpoint_mysql). - --include("endpoint.hrl"). --behaviour(gen_server). - -%% API --export([start_link/3]). - -%% gen_server callbacks --export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). - -%% 消息重发间隔 --define(RETRY_INTERVAL, 5000). - --define(DISCONNECTED, disconnected). --define(CONNECTED, connected). - --record(state, { - endpoint :: #endpoint{}, - buffer :: endpoint_buffer:buffer(), - pool_pid :: undefined | pid(), - - status = ?DISCONNECTED -}). - -%%%=================================================================== -%%% API -%%%=================================================================== - -%% @doc Creates a gen_statem process which calls Module:init/1 to -%% initialize. To ensure a synchronized start-up procedure, this -%% function does not return until Module:init/1 has returned. -start_link(LocalName, AliasName, Endpoint = #endpoint{}) when is_atom(LocalName), is_atom(AliasName) -> - gen_statem:start_link({local, LocalName}, ?MODULE, [AliasName, Endpoint], []). - -%%%=================================================================== -%%% gen_statem callbacks -%%%=================================================================== - -%% @private -%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or -%% gen_statem:start_link/[3,4], this function is called by the new -%% process to initialize. -init([AliasName, Endpoint]) -> - iot_name_server:register(AliasName, self()), - erlang:process_flag(trap_exit, true), - %% 创建转发器, 避免阻塞当前进程的创建,因此采用了延时初始化的机制 - erlang:start_timer(0, self(), create_postman), - %% 初始化存储 - Buffer = endpoint_buffer:new(Endpoint, 10), - - {ok, #state{endpoint = Endpoint, buffer = Buffer, status = ?DISCONNECTED}}. - -%% @private -%% @doc Handling call messages --spec(handle_call(Request :: term(), From :: {pid(), Tag :: term()}, - State :: #state{}) -> - {reply, Reply :: term(), NewState :: #state{}} | - {reply, Reply :: term(), NewState :: #state{}, timeout() | hibernate} | - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_call(get_stat, _From, State = #state{buffer = Buffer}) -> - Stat = endpoint_buffer:stat(Buffer), - {reply, {ok, Stat}, State}. - -%% @private -%% @doc Handling cast messages --spec(handle_cast(Request :: term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_cast({forward, ServiceId, Format, Metric}, State = #state{buffer = Buffer}) -> - NBuffer = endpoint_buffer:append({ServiceId, Format, Metric}, Buffer), - {noreply, State#state{buffer = NBuffer}}; - -handle_cast(cleanup, State = #state{buffer = Buffer}) -> - endpoint_buffer:cleanup(Buffer), - {noreply, State}. - -%% @private -%% @doc Handling all non call/cast messages --spec(handle_info(Info :: timeout() | term(), State :: #state{}) -> - {noreply, NewState :: #state{}} | - {noreply, NewState :: #state{}, timeout() | hibernate} | - {stop, Reason :: term(), NewState :: #state{}}). -handle_info({timeout, _, create_postman}, State = #state{status = ?DISCONNECTED, buffer = Buffer, - endpoint = #endpoint{title = Title, config = #mysql_endpoint{host = Host, port = Port, username = Username, password = Password, database = Database}}}) -> - - lager:debug("[iot_endpoint] endpoint: ~p, create postman", [Title]), - WorkerArgs = [ - {host, binary_to_list(Host)}, - {port, Port}, - {user, binary_to_list(Username)}, - {password, binary_to_list(Password)}, - {keep_alive, true}, - {database, binary_to_list(Database)}, - {queries, [<<"set names utf8">>]} - ], - - %% 启动工作的线程池 - PoolSize = 5, - case poolboy:start_link([{size, PoolSize}, {max_overflow, PoolSize}, {worker_module, mysql}], WorkerArgs) of - {ok, PoolPid} -> - NBuffer = endpoint_buffer:trigger_n(Buffer), - {noreply, State#state{pool_pid = PoolPid, buffer = NBuffer, status = ?CONNECTED}}; - ignore -> - retry_connect(), - {noreply, State}; - {error, Reason} -> - lager:warning("[mqtt_postman] start connect pool, get error: ~p", [Reason]), - retry_connect(), - {noreply, State} - end; - -%% 离线时,忽略数据发送逻辑 -handle_info({next_data, _Id, _Tuple}, State = #state{status = ?DISCONNECTED}) -> - {noreply, State}; -%% 发送数据到mqtt服务器 -handle_info({next_data, Id, {ServiceId, Metric}}, State = #state{status = ?CONNECTED, pool_pid = PoolPid, buffer = Buffer, - endpoint = #endpoint{title = Title, config = #mysql_endpoint{table_name = Table, fields_map = FieldsMap}}}) -> - - case insert_sql(Table, ServiceId, FieldsMap, Metric) of - {ok, InsertSql, Values} -> - case poolboy:transaction(PoolPid, fun(ConnPid) -> mysql:query(ConnPid, InsertSql, Values) end) of - ok -> - NBuffer = endpoint_buffer:ack(Id, Buffer), - {noreply, State#state{buffer = NBuffer}}; - Error -> - lager:warning("[endpoint_mysql] endpoint: ~p, insert mysql get error: ~p", [Title, Error]), - {noreply, State} - end; - error -> - lager:debug("[endpoint_mysql] endpoint: ~p, make sql error", [Title]), - {noreply, State} - end; - -%% postman进程挂掉时,重新建立新的 -handle_info({'EXIT', PoolPid, Reason}, State = #state{endpoint = #endpoint{title = Title}, pool_pid = PoolPid}) -> - lager:warning("[enpoint_mqtt] endpoint: ~p, conn pid exit with reason: ~p", [Title, Reason]), - retry_connect(), - {noreply, disconnected, State#state{pool_pid = undefined, status = ?DISCONNECTED}}; - -handle_info(Info, State = #state{status = Status}) -> - lager:warning("[iot_endpoint] unknown message: ~p, status: ~p", [Info, Status]), - {noreply, State}. - -%% @private -%% @doc This function is called by a gen_server when it is about to -%% terminate. It should be the opposite of Module:init/1 and do any -%% necessary cleaning up. When it returns, the gen_server terminates -%% with Reason. The return value is ignored. --spec(terminate(Reason :: (normal | shutdown | {shutdown, term()} | term()), - State :: #state{}) -> term()). -terminate(Reason, #state{endpoint = #endpoint{title = Title}, buffer = Buffer}) -> - lager:debug("[iot_endpoint] endpoint: ~p, terminate with reason: ~p", [Title, Reason]), - endpoint_buffer:cleanup(Buffer), - ok. - -%% @private -%% @doc Convert process state when code is changed --spec(code_change(OldVsn :: term() | {down, term()}, State :: #state{}, - Extra :: term()) -> - {ok, NewState :: #state{}} | {error, Reason :: term()}). -code_change(_OldVsn, State = #state{}, _Extra) -> - {ok, State}. - -%%%=================================================================== -%%% Internal functions -%%%=================================================================== - -retry_connect() -> - erlang:start_timer(?RETRY_INTERVAL, self(), create_postman). - --spec insert_sql(Table :: binary(), ServiceId :: binary(), FieldsMap :: map(), Metric :: binary()) -> - error | {ok, Sql :: binary(), Values :: list()}. -insert_sql(Table, ServiceId, FieldsMap, Metric) when is_binary(Table), is_binary(ServiceId), is_binary(Metric) -> - case line_format:parse(Metric) of - error -> - error; - {ok, #{<<"measurement">> := Measurement, <<"tags">> := Tags, <<"fields">> := Fields, <<"timestamp">> := Timestamp}} -> - Map = maps:merge(Tags, Fields), - NMap = Map#{<<"measurement">> => Measurement, <<"timestamp">> => Timestamp}, - TableFields = lists:flatmap(fun({TableField, F}) -> - case maps:find(F, NMap) of - error -> - []; - {ok, Val} -> - [{TableField, Val}] - end - end, maps:to_list(FieldsMap)), - - {Keys, Values} = kvs(TableFields), - FieldSql = iolist_to_binary(lists:join(<<", ">>, Keys)), - Placeholders = lists:duplicate(length(Keys), <<"?">>), - ValuesPlaceholder = iolist_to_binary(lists:join(<<", ">>, Placeholders)), - - {ok, <<"INSERT INTO ", Table/binary, "(", FieldSql/binary, ") VALUES(", ValuesPlaceholder/binary, ")">>, Values} - end. - --spec kvs(Fields :: list()) -> {Keys :: list(), Values :: list()}. -kvs(Fields) when is_list(Fields) -> - {Keys0, Values0} = lists:foldl(fun({K, V}, {Acc0, Acc1}) -> {[K|Acc0], [V|Acc1]} end, {[], []}, Fields), - {lists:reverse(Keys0), lists:reverse(Values0)}. \ No newline at end of file diff --git a/backup/service_config_model.erl b/backup/service_config_model.erl deleted file mode 100644 index f37fa2b..0000000 --- a/backup/service_config_model.erl +++ /dev/null @@ -1,121 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author aresei -%%% @copyright (C) 2023, -%%% @doc -%%% -%%% @end -%%% Created : 04. 7月 2023 12:31 -%%%------------------------------------------------------------------- --module(service_config_model). --author("aresei"). --include("iot_tables.hrl"). --include_lib("stdlib/include/qlc.hrl"). - --define(TAB, service_config). - -%% API --export([create_table/0]). --export([insert/4, update/4, get_config/1, delete/1]). --export([as_map/1]). - -create_table() -> - %% id生成器 - mnesia:create_table(service_config, [ - {attributes, record_info(fields, service_config)}, - {record_name, service_config}, - {disc_copies, [node()]}, - {type, ordered_set} - ]). - --spec insert(ServiceId :: binary(), HostUUID :: binary(), ConfigJson :: binary(), LastEditUser :: integer()) -> ok | {error, Reason :: term()}. -insert(ServiceId, HostUUID, ConfigJson, LastEditUser) when is_binary(ServiceId), is_binary(HostUUID), is_binary(ConfigJson), is_integer(LastEditUser) -> - ServiceConfig = #service_config{ - service_id = ServiceId, - host_uuid = HostUUID, - config_json = ConfigJson, - last_config_json = <<>>, - last_edit_user = LastEditUser, - create_ts = iot_util:current_time(), - update_ts = iot_util:current_time() - }, - case mnesia:transaction(fun() -> mnesia:write(?TAB, ServiceConfig, write) end) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec update(ServiceId :: binary(), HostUUID :: binary(), ConfigJson :: binary(), LastEditUser :: integer()) -> ok | {error, Reason :: term()}. -update(ServiceId, HostUUID, ConfigJson, LastEditUser) when is_binary(ServiceId), is_binary(HostUUID), is_binary(ConfigJson), is_integer(LastEditUser) -> - Fun = fun() -> - case mnesia:read(?TAB, ServiceId, write) of - [] -> - ServiceConfig = #service_config{ - service_id = ServiceId, - host_uuid = HostUUID, - config_json = ConfigJson, - last_config_json = <<>>, - last_edit_user = LastEditUser, - create_ts = iot_util:current_time(), - update_ts = iot_util:current_time() - }, - mnesia:write(?TAB, ServiceConfig, write); - [ServiceConfig0 = #service_config{config_json = OldConfigJson}] -> - NServiceConfig = ServiceConfig0#service_config{ - config_json = ConfigJson, - last_config_json = OldConfigJson, - last_edit_user = LastEditUser, - update_ts = iot_util:current_time() - }, - mnesia:write(?TAB, NServiceConfig, write) - end - end, - - case mnesia:transaction(Fun) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec get_config(ServiceId :: any()) -> error | {ok, Config :: #service_config{}}. -get_config(ServiceId) when is_binary(ServiceId) -> - case mnesia:dirty_read(?TAB, ServiceId) of - [] -> - error; - [Config] -> - {ok, Config} - end. - --spec delete(ServiceId :: binary()) -> ok | {error, Reason :: any()}. -delete(ServiceId) when is_binary(ServiceId) -> - Fun = fun() -> - case mnesia:read(?TAB, ServiceId, write) of - [] -> - ok; - [ServiceConfig0 = #service_config{config_json = OldConfigJson}] -> - NServiceConfig = ServiceConfig0#service_config{ - config_json = <<"">>, - last_config_json = OldConfigJson, - update_ts = iot_util:current_time() - }, - mnesia:write(?TAB, NServiceConfig, write) - end - end, - case mnesia:transaction(Fun) of - {'atomic', ok} -> - ok; - {'aborted', Reason} -> - {error, Reason} - end. - --spec as_map(ServiceConfig :: #service_config{}) -> map(). -as_map(#service_config{service_id = ServiceId, config_json = ConfigJson, last_config_json = LastConfigJson, last_edit_user = LastEditUser, update_ts = UpdateTs, create_ts = CreateTs}) -> - #{ - <<"service_id">> => ServiceId, - <<"config_json">> => ConfigJson, - <<"last_config_json">> => LastConfigJson, - <<"last_edit_user">> => LastEditUser, - <<"update_ts">> => UpdateTs, - <<"create_ts">> => CreateTs - }. \ No newline at end of file diff --git a/backup/service_handler.erl b/backup/service_handler.erl deleted file mode 100644 index f37bb55..0000000 --- a/backup/service_handler.erl +++ /dev/null @@ -1,176 +0,0 @@ -%%%------------------------------------------------------------------- -%%% @author licheng5 -%%% @copyright (C) 2020, -%%% @doc -%%% -%%% @end -%%% Created : 26. 4月 2020 3:36 下午 -%%%------------------------------------------------------------------- --module(service_handler). --author("licheng5"). --include("iot.hrl"). - -%% API --export([handle_request/4]). - -%% 下发config.json, 微服务接受后,保存服务配置 -handle_request("POST", "/service/push_config", _, - #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"last_edit_user">> := LastEditUser, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0}) - when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) -> - - %% 检查ConfigJson是否是合法的json字符串 - case iot_util:is_json(ConfigJson) of - true -> - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(-1, <<"host not found">>)}; - Pid when is_pid(Pid) -> - Timeout = Timeout0 * 1000, - case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of - {ok, Ref} -> - case iot_host:await_reply(Ref, Timeout) of - {ok, Result} -> - %% 更新配置信息到数据库 - case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of - ok -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]), - {ok, 200, iot_util:json_error(-1, <<"set service config failed">>)} - end; - {error, Reason} -> - {ok, 200, iot_util:json_error(-1, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(-1, Reason)} - end - end; - false -> - {ok, 200, iot_util:json_error(-1, <<"config is invalid json">>)} - end; - -%% 获取服务配置信息 -handle_request("GET", "/service/get_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) -> - case service_config_model:get_config(ServiceId) of - error -> - {ok, 200, iot_util:json_error(-1, <<"service config not found">>)}; - {ok, Config} -> - {ok, 200, iot_util:json_data(service_config_model:as_map(Config))} - end; - -%% 删除对应的主机信息 -handle_request("POST", "/service/delete_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) -> - case service_config_model:delete(ServiceId) of - ok -> - {ok, 200, iot_util:json_data(<<"success">>)}; - {error, Reason} -> - lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]), - {ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)} - end; - -%% 部署微服务 -handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl}) - when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) -> - - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case iot_host:deploy_service(Pid, TaskId, ServiceId, TarUrl) of - {ok, Ref} -> - case iot_host:await_reply(Ref, 5000) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -%% 启动服务 -handle_request("POST", "/service/start", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case iot_host:start_service(Pid, ServiceId) of - {ok, Ref} -> - case iot_host:await_reply(Ref, 5000) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -%% 停止服务 -handle_request("POST", "/service/stop", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId}) when is_binary(UUID), is_binary(ServiceId) -> - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case iot_host:stop_service(Pid, ServiceId) of - {ok, Ref} -> - case iot_host:await_reply(Ref, 5000) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -%% 远程调用微服务, 返回值的格式为json -handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0}) - when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) -> - - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - Timeout = Timeout0 * 1000, - case iot_host:invoke_service(Pid, ServiceId, Payload, Timeout) of - {ok, Ref} -> - case iot_host:await_reply(Ref, Timeout) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) -> - case iot_host:get_pid(UUID) of - undefined -> - {ok, 200, iot_util:json_error(404, <<"host not found">>)}; - Pid when is_pid(Pid) -> - case iot_host:task_log(Pid, TaskId) of - {ok, Ref} -> - case iot_host:await_reply(Ref, 5000) of - {ok, Result} -> - {ok, 200, iot_util:json_data(Result)}; - {error, Reason} -> - {ok, 200, iot_util:json_error(400, Reason)} - end; - {error, Reason} when is_binary(Reason) -> - {ok, 200, iot_util:json_error(400, Reason)} - end - end; - -handle_request(_, Path, _, _) -> - Path1 = list_to_binary(Path), - {ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}. - -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% -%% helper methods -%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% \ No newline at end of file diff --git a/rebar.config b/rebar.config index c87b4b2..9537196 100644 --- a/rebar.config +++ b/rebar.config @@ -9,7 +9,6 @@ {jiffy, ".*", {git, "https://github.com/davisp/jiffy.git", {tag, "1.1.1"}}}, {mysql, ".*", {git, "https://github.com/mysql-otp/mysql-otp", {tag, "1.8.0"}}}, {eredis, ".*", {git, "https://github.com/wooga/eredis.git", {tag, "v1.2.0"}}}, - {gpb, ".*", {git, "https://github.com/tomas-abrahamsson/gpb.git", {tag, "4.20.0"}}}, {emqtt, ".*", {git, "https://gitea.s5s8.com/anlicheng/emqtt.git", {branch, "main"}}}, {gproc, ".*", {git, "https://github.com/uwiger/gproc.git", {tag, "0.9.1"}}}, {parse_trans, ".*", {git, "https://github.com/uwiger/parse_trans", {tag, "3.0.0"}}}, diff --git a/rebar.lock b/rebar.lock deleted file mode 100644 index fecc206..0000000 --- a/rebar.lock +++ /dev/null @@ -1,99 +0,0 @@ -{"1.2.0", -[{<<"brod">>, - {git,"https://github.com/kafka4beam/brod.git", - {ref,"877852a175f6051b604ea7986bdb8da04ce19e76"}}, - 0}, - {<<"certifi">>,{pkg,<<"certifi">>,<<"2.5.2">>},1}, - {<<"cowboy">>, - {git,"https://github.com/ninenines/cowboy.git", - {ref,"9e600f6c1df3c440bc196b66ebbc005d70107217"}}, - 0}, - {<<"cowlib">>, - {git,"https://github.com/ninenines/cowlib", - {ref,"cc04201c1d0e1d5603cd1cde037ab729b192634c"}}, - 1}, - {<<"crc32cer">>,{pkg,<<"crc32cer">>,<<"1.0.3">>},2}, - {<<"emqtt">>, - {git,"https://gitea.s5s8.com/anlicheng/emqtt.git", - {ref,"5111914a9b1b92b0b497f825c77bdd365e3989b0"}}, - 0}, - {<<"eredis">>, - {git,"https://github.com/wooga/eredis.git", - {ref,"9ad91f149310a7d002cb966f62b7e2c3330abb04"}}, - 0}, - {<<"esockd">>, - {git,"https://github.com/emqx/esockd.git", - {ref,"d9ce4024cc42a65e9a05001997031e743442f955"}}, - 0}, - {<<"fs">>,{pkg,<<"fs">>,<<"6.1.1">>},1}, - {<<"goldrush">>,{pkg,<<"goldrush">>,<<"0.1.9">>},1}, - {<<"gpb">>, - {git,"https://github.com/tomas-abrahamsson/gpb.git", - {ref,"edda1006d863a09509673778c455d33d88e6edbc"}}, - 0}, - {<<"gproc">>, - {git,"https://github.com/uwiger/gproc.git", - {ref,"4ca45e0a97722a418a31eb1753f4e3b953f7fb1d"}}, - 0}, - {<<"hackney">>, - {git,"https://github.com/benoitc/hackney.git", - {ref,"f3e9292db22c807e73f57a8422402d6b423ddf5f"}}, - 0}, - {<<"idna">>,{pkg,<<"idna">>,<<"6.0.1">>},1}, - {<<"jiffy">>, - {git,"https://github.com/davisp/jiffy.git", - {ref,"9ea1b35b6e60ba21dfd4adbd18e7916a831fd7d4"}}, - 0}, - {<<"kafka_protocol">>,{pkg,<<"kafka_protocol">>,<<"4.2.7">>},1}, - {<<"lager">>, - {git,"https://github.com/erlang-lager/lager.git", - {ref,"459a3b2cdd9eadd29e5a7ce5c43932f5ccd6eb88"}}, - 0}, - {<<"metrics">>,{pkg,<<"metrics">>,<<"1.0.1">>},1}, - {<<"mimerl">>,{pkg,<<"mimerl">>,<<"1.2.0">>},1}, - {<<"mysql">>, - {git,"https://github.com/mysql-otp/mysql-otp", - {ref,"caf5ff96c677a8fe0ce6f4082bc036c8fd27dd62"}}, - 0}, - {<<"parse_trans">>, - {git,"https://github.com/uwiger/parse_trans", - {ref,"6f3645afb43c7c57d61b54ef59aecab288ce1013"}}, - 0}, - {<<"poolboy">>, - {git,"https://github.com/devinus/poolboy.git", - {ref,"3bb48a893ff5598f7c73731ac17545206d259fac"}}, - 0}, - {<<"ranch">>, - {git,"https://github.com/ninenines/ranch", - {ref,"a692f44567034dacf5efcaa24a24183788594eb7"}}, - 1}, - {<<"ssl_verify_fun">>,{pkg,<<"ssl_verify_fun">>,<<"1.1.6">>},1}, - {<<"sync">>, - {git,"https://github.com/rustyio/sync.git", - {ref,"f13e61a79623290219d7c10dff1dd94d91eee963"}}, - 0}, - {<<"unicode_util_compat">>,{pkg,<<"unicode_util_compat">>,<<"0.5.0">>},2}]}. -[ -{pkg_hash,[ - {<<"certifi">>, <<"B7CFEAE9D2ED395695DD8201C57A2D019C0C43ECAF8B8BCB9320B40D6662F340">>}, - {<<"crc32cer">>, <<"AD0E42BED8603F2C72DE2A00F1B5063FFE12D5988615CAD984096900431D1C1A">>}, - {<<"fs">>, <<"9D147B944D60CFA48A349F12D06C8EE71128F610C90870BDF9A6773206452ED0">>}, - {<<"goldrush">>, <<"F06E5D5F1277DA5C413E84D5A2924174182FB108DABB39D5EC548B27424CD106">>}, - {<<"idna">>, <<"1D038FB2E7668CE41FBF681D2C45902E52B3CB9E9C77B55334353B222C2EE50C">>}, - {<<"kafka_protocol">>, <<"6F53B15CD6F6A12C1D0010DB074B4A15985C71BC7F594BC2D67D9837B3B378A1">>}, - {<<"metrics">>, <<"25F094DEA2CDA98213CECC3AEFF09E940299D950904393B2A29D191C346A8486">>}, - {<<"mimerl">>, <<"67E2D3F571088D5CFD3E550C383094B47159F3EEE8FFA08E64106CDF5E981BE3">>}, - {<<"ssl_verify_fun">>, <<"CF344F5692C82D2CD7554F5EC8FD961548D4FD09E7D22F5B62482E5AEAEBD4B0">>}, - {<<"unicode_util_compat">>, <<"8516502659002CEC19E244EBD90D312183064BE95025A319A6C7E89F4BCCD65B">>}]}, -{pkg_hash_ext,[ - {<<"certifi">>, <<"3B3B5F36493004AC3455966991EAF6E768CE9884693D9968055AEEEB1E575040">>}, - {<<"crc32cer">>, <<"08FDCD5CE51ACD839A12E98742F0F0EDA19A2A679FC9FBFAF6AAB958310FB70E">>}, - {<<"fs">>, <<"EF94E95FFE79916860649FED80AC62B04C322B0BB70F5128144C026B4D171F8B">>}, - {<<"goldrush">>, <<"99CB4128CFFCB3227581E5D4D803D5413FA643F4EB96523F77D9E6937D994CEB">>}, - {<<"idna">>, <<"A02C8A1C4FD601215BB0B0324C8A6986749F807CE35F25449EC9E69758708122">>}, - {<<"kafka_protocol">>, <<"1D5E9597AD3C0776C86DC5E08D3BAAEA7DB805A52E5FD35E3F071AAD7789FC4C">>}, - {<<"metrics">>, <<"69B09ADDDC4F74A40716AE54D140F93BEB0FB8978D8636EADED0C31B6F099F16">>}, - {<<"mimerl">>, <<"F278585650AA581986264638EBF698F8BB19DF297F66AD91B18910DFC6E19323">>}, - {<<"ssl_verify_fun">>, <<"BDB0D2471F453C88FF3908E7686F86F9BE327D065CC1EC16FA4540197EA04680">>}, - {<<"unicode_util_compat">>, <<"D48D002E15F5CC105A696CF2F1BBB3FC72B4B770A184D8420C8DB20DA2674B38">>}]} -].