fix service

This commit is contained in:
anlicheng 2025-05-20 11:49:01 +08:00
parent c62a8bd0e1
commit 9418ae70bd
9 changed files with 174 additions and 128 deletions

View File

@ -37,6 +37,9 @@
-define(PACKET_ASYNC_CALL, 16#05).
-define(PACKET_ASYNC_CALL_REPLY, 16#06).
%% ping包
-define(PACKET_PING, 16#FF).
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%%%%
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -67,125 +67,6 @@ handle_request("POST", "/host/delete", _, #{<<"uuid">> := UUID}) when is_binary(
{ok, 200, iot_util:json_error(404, <<"error">>)}
end;
%%
handle_request("POST", "/host/set_service_config", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"last_edit_user">> := LastEditUser})
when is_binary(ServiceId), is_binary(ConfigJson), is_integer(LastEditUser) ->
lager:debug("[service_handler] set_service_config service_id: ~p, config_json: ~p, last_edit_user:~p", [ServiceId, ConfigJson, LastEditUser]),
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]),
{ok, 200, iot_util:json_error(-1, <<"set service config failed">>)}
end
end;
%% config.json
handle_request("POST", "/host/push_service_config", _,
PostParams = #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0})
when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) ->
lager:debug("[http_host_handler] async_service_config body is: ~p", [PostParams]),
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
Timeout = Timeout0 * 1000,
case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of
{ok, Ref} ->
case iot_host:await_reply(Ref, Timeout) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request("GET", "/host/get_service_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) ->
case service_config_model:get_config(ServiceId) of
error ->
{ok, 200, iot_util:json_error(-1, <<"service config not found">>)};
{ok, Config} ->
{ok, 200, iot_util:json_data(service_config_model:as_map(Config))}
end;
%%
handle_request("POST", "/host/delete_service_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) ->
case service_config_model:delete(ServiceId) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]),
{ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)}
end;
handle_request("POST", "/host/async_deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl})
when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) ->
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case iot_host:async_deploy(Pid, TaskId, ServiceId, TarUrl) of
{ok, Ref} ->
case iot_host:await_reply(Ref, 5000) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request("POST", "/host/async_invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0})
when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) ->
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
Timeout = Timeout0 * 1000,
case iot_host:async_invoke(Pid, ServiceId, Payload, Timeout) of
{ok, Ref} ->
case iot_host:await_reply(Ref, Timeout) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request("POST", "/host/async_task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) ->
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case iot_host:async_task_log(Pid, TaskId) of
{ok, Ref} ->
case iot_host:await_reply(Ref, 5000) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
%%
handle_request("POST", "/host/activate", _, #{<<"uuid">> := UUID, <<"auth">> := true}) when is_binary(UUID) ->
case iot_host_sup:ensured_host_started(UUID) of

View File

@ -24,6 +24,7 @@ start() ->
Dispatcher = cowboy_router:compile([
{'_', [
{"/host/[...]", ?MODULE, [host_handler]},
{"/service/[...]", ?MODULE, [service_handler]},
{"/device/[...]", ?MODULE, [device_handler]}
]}
]),

View File

@ -0,0 +1,141 @@
%%%-------------------------------------------------------------------
%%% @author licheng5
%%% @copyright (C) 2020, <COMPANY>
%%% @doc
%%%
%%% @end
%%% Created : 26. 4 2020 3:36
%%%-------------------------------------------------------------------
-module(service_handler).
-author("licheng5").
-include("iot.hrl").
%% API
-export([handle_request/4]).
%%
handle_request("POST", "/service/set_config", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"last_edit_user">> := LastEditUser})
when is_binary(ServiceId), is_binary(ConfigJson), is_integer(LastEditUser) ->
lager:debug("[service_handler] set_service_config service_id: ~p, config_json: ~p, last_edit_user:~p", [ServiceId, ConfigJson, LastEditUser]),
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case service_config_model:update(ServiceId, UUID, ConfigJson, LastEditUser) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[service_handler] set_config service_id: ~p, get error: ~p", [ServiceId, Reason]),
{ok, 200, iot_util:json_error(-1, <<"set service config failed">>)}
end
end;
%% config.json
handle_request("POST", "/service/push_config", _,
PostParams = #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"config_json">> := ConfigJson, <<"timeout">> := Timeout0})
when is_binary(UUID), is_binary(ServiceId), is_binary(ConfigJson), is_integer(Timeout0) ->
lager:debug("[http_host_handler] async_service_config body is: ~p", [PostParams]),
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
Timeout = Timeout0 * 1000,
case iot_host:async_service_config(Pid, ServiceId, ConfigJson, Timeout) of
{ok, Ref} ->
case iot_host:await_reply(Ref, Timeout) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request("GET", "/service/get_config", #{<<"service_id">> := ServiceId}, _) when is_binary(ServiceId) ->
case service_config_model:get_config(ServiceId) of
error ->
{ok, 200, iot_util:json_error(-1, <<"service config not found">>)};
{ok, Config} ->
{ok, 200, iot_util:json_data(service_config_model:as_map(Config))}
end;
%%
handle_request("POST", "/service/delete_config", _, #{<<"service_id">> := ServiceId}) when is_binary(ServiceId) ->
case service_config_model:delete(ServiceId) of
ok ->
{ok, 200, iot_util:json_data(<<"success">>)};
{error, Reason} ->
lager:debug("[service_handler] delete config of service_id: ~p, error: ~p", [ServiceId, Reason]),
{ok, 200, iot_util:json_error(-1, <<"delete service config errror">>)}
end;
handle_request("POST", "/service/deploy", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId, <<"service_id">> := ServiceId, <<"tar_url">> := TarUrl})
when is_binary(UUID), is_integer(TaskId), is_binary(ServiceId), is_binary(TarUrl) ->
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case iot_host:async_deploy(Pid, TaskId, ServiceId, TarUrl) of
{ok, Ref} ->
case iot_host:await_reply(Ref, 5000) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request("POST", "/service/invoke", _, #{<<"uuid">> := UUID, <<"service_id">> := ServiceId, <<"payload">> := Payload, <<"timeout">> := Timeout0})
when is_binary(UUID), is_binary(ServiceId), is_binary(Payload), is_integer(Timeout0) ->
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
Timeout = Timeout0 * 1000,
case iot_host:async_invoke(Pid, ServiceId, Payload, Timeout) of
{ok, Ref} ->
case iot_host:await_reply(Ref, Timeout) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request("POST", "/service/task_log", _, #{<<"uuid">> := UUID, <<"task_id">> := TaskId}) when is_binary(UUID), is_integer(TaskId) ->
case iot_host:get_pid(UUID) of
undefined ->
{ok, 200, iot_util:json_error(404, <<"host not found">>)};
Pid when is_pid(Pid) ->
case iot_host:async_task_log(Pid, TaskId) of
{ok, Ref} ->
case iot_host:await_reply(Ref, 5000) of
{ok, Result} ->
{ok, 200, iot_util:json_data(Result)};
{error, Reason} ->
{ok, 200, iot_util:json_error(400, Reason)}
end;
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(400, Reason)}
end
end;
handle_request(_, Path, _, _) ->
Path1 = list_to_binary(Path),
{ok, 200, iot_util:json_error(-1, <<"url: ", Path1/binary, " not found">>)}.
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% helper methods
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%

View File

@ -30,10 +30,27 @@ stop(_State) ->
%%
start_mnesia() ->
ok = ensure_mnesia_schema(),
%%
ok = mnesia:start(),
Tables = mnesia:system_info(tables),
lager:debug("[iot_app] tables: ~p", [Tables]),
%%
not lists:member(service_config, Tables) andalso service_config_model:create_table(),
ok.
ok.
-spec ensure_mnesia_schema() -> any().
ensure_mnesia_schema() ->
case mnesia:system_info(use_dir) of
true ->
ok;
false ->
mnesia:stop(),
case mnesia:create_schema([node()]) of
ok -> ok;
{error, {_, {already_exists, _}}} -> ok;
Error ->
lager:debug("[iot_app] create mnesia schema failed with error: ~p", [Error]),
throw({init_schema, Error})
end
end.

View File

@ -453,7 +453,8 @@ report_event(UUID, NewStatus) when is_binary(UUID), is_integer(NewStatus) ->
<<"name">> => <<"主机状态"/utf8>>,
<<"timestamp">> => Timestamp
}],
iot_router:route_uuid(UUID, FieldsList, Timestamp),
%% todo
% iot_router:route_uuid(UUID, FieldsList, Timestamp),
lager:debug("[iot_host] host_uuid: ~p, route fields: ~p", [UUID, FieldsList]).
%% state转换成map

View File

@ -31,7 +31,6 @@ hgetall(Key) when is_binary(Key) ->
end
end).
to_map(Items) when is_list(Items), length(Items) rem 2 == 0 ->
to_map(Items, #{}).
to_map([], Target) ->

View File

@ -112,20 +112,20 @@ handle_info({tcp, Socket, <<?PACKET_REQUEST, PacketId:32, ?METHOD_AUTH:8, AuthRe
%% host的monitor
erlang:monitor(process, HostPid),
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 0, message = <<"ok">>}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH:8, AuthReplyBin/binary>>),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{denied, Reason} when is_binary(Reason) ->
erlang:monitor(process, HostPid),
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 1, message = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH:8, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
{noreply, State#state{uuid = UUID, host_pid = HostPid}};
{error, Reason} when is_binary(Reason) ->
AuthReplyBin = message_pb:encode_msg(#auth_reply{code = 2, message = Reason}),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, 0:8, AuthReplyBin/binary>>),
Transport:send(Socket, <<?PACKET_RESPONSE, PacketId:32, ?METHOD_AUTH:8, AuthReplyBin/binary>>),
lager:debug("[ws_channel] uuid: ~p, attach channel get error: ~p, stop channel", [UUID, Reason]),
{stop, State}
@ -174,6 +174,10 @@ handle_info({tcp, Socket, <<?PACKET_ASYNC_CALL_REPLY, PacketId:32, ResponseBin/b
{noreply, State#state{inflight = NInflight}}
end;
%% efka的ping包
handle_info({tcp, Socket, <<?PACKET_PING>>}, State = #state{socket = Socket}) ->
{noreply, State};
handle_info({tcp_error, Sock, Reason}, State = #state{socket = Sock}) ->
lager:notice("[sdlan_channel] tcp_error: ~p", [Reason]),
{stop, normal, State};

View File

@ -54,8 +54,7 @@
{connect_mode, synchronous},
{keep_alive, true},
{password, "r3a-7Qrh#3Q"},
{database, "nannong"},
{queries, [<<"set names utf8">>]}
{database, "nannong_demo"}
]
},
@ -63,7 +62,7 @@
{redis_pool,
[{size, 10}, {max_overflow, 20}, {worker_module, eredis}],
[
{host, "39.98.184.67"},
{host, "127.0.0.1"},
{port, 26379},
{database, 1}
]