This commit is contained in:
安礼成 2023-04-19 11:56:55 +08:00
parent 88f52d596f
commit 77f938e9df
5 changed files with 95 additions and 63 deletions

View File

@ -18,60 +18,79 @@
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
%% %%
handle_request("POST", "/host/create", _, handle_request("POST", "/host/create", _, HostInfo) ->
PostParams = #{<<"name">> := Name, <<"model">> := Model, <<"cell_id">> := CellId, <<"serial_number">> := SerialNumber}) -> lager:debug("[host_handler] create post params: ~p", [HostInfo]),
lager:debug("[host_handler] create post params: ~p", [PostParams]), case valid_host_info(HostInfo) of
HostId = iot_util:uuid(), true ->
Host = #host{ HostId = iot_util:uuid(),
host_id = HostId, #{<<"name">> := Name, <<"model">> := Model, <<"cell_id">> := CellId, <<"serial_number">> := SerialNumber} = HostInfo,
serial_number = SerialNumber, Host = #host{
name = Name, host_id = HostId,
model = Model, serial_number = SerialNumber,
cell_id = CellId, name = Name,
status = 1 model = Model,
}, cell_id = CellId,
case host_model:add_host(Host) of status = ?HOST_STATUS_INACTIVE
ok -> },
{ok, 200, iot_util:json_data(HostId)}; case host_model:add_host(Host) of
{error, Reason} -> ok ->
lager:warning("[host_handler] get a error: ~p", [Reason]), {ok, 200, iot_util:json_data(HostId)};
{ok, 200, iot_util:json_error(404, <<"database error">>)} {error, Reason} when is_binary(Reason) ->
lager:warning("[host_handler] get a error: ~p", [Reason]),
{ok, 200, iot_util:json_error(-1, Reason)};
{error, Reason} when is_binary(Reason) ->
{ok, 200, iot_util:json_error(-1, <<"database error">>)}
end;
false ->
Error = host_error(HostInfo),
{ok, 200, iot_util:json_error(-1, Error)}
end; end;
%% %%
handle_request("POST", "/host/batch_create", _, HostInfos) -> handle_request("POST", "/host/batch_create", _, HostInfos) ->
lager:debug("[host_handler] batch_create post params: ~p", [HostInfos]), lager:debug("[host_handler] batch_create post params: ~p", [HostInfos]),
case lists:all(fun valid_host_info/1, HostInfos) of %% serial_number是必填项
case lists:any(fun(Info) -> not is_map_key(<<"serial_number">>, Info) orelse maps:get(<<"serial_number">>, Info) == <<"">> end, HostInfos) of
true -> true ->
Result = lists:map(fun(#{<<"name">> := Name, <<"model">> := Model, <<"cell_id">> := CellId, <<"serial_number">> := SerialNumber}) -> {ok, 200, iot_util:json_error(-1, <<"serial_number missed">>)};
HostId = iot_util:uuid(),
Host = #host{
host_id = HostId,
serial_number = SerialNumber,
name = Name,
model = Model,
cell_id = CellId,
status = 1
},
case host_model:add_host(Host) of
ok ->
{SerialNumber, HostId};
{error, Reason} ->
lager:debug("[host_handler] add_host get error: ~p", [Reason]),
{SerialNumber, <<"failed">>}
end
end, HostInfos),
{ok, 200, iot_util:json_data(Result)};
false -> false ->
{ok, 200, iot_util:json_error(-1, <<"invalid arguments">>)} case lists:all(fun valid_host_info/1, HostInfos) of
true ->
Result = lists:map(fun(#{<<"name">> := Name, <<"model">> := Model, <<"cell_id">> := CellId, <<"serial_number">> := SerialNumber}) ->
HostId = iot_util:uuid(),
Host = #host{
host_id = HostId,
serial_number = SerialNumber,
name = Name,
model = Model,
cell_id = CellId,
status = ?HOST_STATUS_INACTIVE
},
case host_model:add_host(Host) of
ok ->
{SerialNumber, HostId};
{error, Reason} when is_binary(Reason) ->
lager:debug("[host_handler] add_host get error: ~p", [Reason]),
{SerialNumber, Reason};
{error, Reason} ->
{SerialNumber, <<"failed">>}
end
end, HostInfos),
{ok, 200, iot_util:json_data(Result)};
false ->
%%
InvalidHostInfos = lists:filter(fun(Info) -> not valid_host_info(Info) end, HostInfos),
ErrorInfos = lists:map(fun(Info = #{<<"serial_number">> := SerialNumber}) -> {SerialNumber, host_error(Info)} end, InvalidHostInfos),
{ok, 200, iot_util:json_error(-1, maps:from_list(ErrorInfos))}
end
end; end;
handle_request(_, "/host/list", Params, PostParams) -> handle_request(_, "/host/list", GetParams, PostParams) ->
Page0 = maps:get(<<"page">>, Params, <<"1">>), Page0 = maps:get(<<"page">>, GetParams, <<"1">>),
Size0 = maps:get(<<"size">>, Params, <<"10">>), Size0 = maps:get(<<"size">>, GetParams, <<"10">>),
Page = binary_to_integer(Page0), Page = binary_to_integer(Page0),
Size = binary_to_integer(Size0), Size = binary_to_integer(Size0),
@ -149,9 +168,17 @@ handle_request("POST", "/host/update", _, _Params) ->
%% %%
valid_host_info(#{<<"name">> := Name, <<"model">> := Model, <<"cell_id">> := CellId, <<"serial_number">> := SerialNumber}) -> valid_host_info(#{<<"name">> := Name, <<"model">> := Model, <<"cell_id">> := CellId, <<"serial_number">> := SerialNumber}) ->
Name =/= <<"">> andalso Model =/= <<"">> andalso CellId > 0 and SerialNumber =/= <<"">>. Name =/= <<"">> andalso Model =/= <<"">> andalso CellId > 0 andalso SerialNumber =/= <<"">>.
host_error(M = #{<<"name">> := <<>>}) when not is_map_key(<<"name">>, M) -> %%
host_error(M) when is_map(M) ->
host_error(M = #{<<"name">> := <<>>}) when not is_map_key(<<"name">>, M) -> if
ok. not is_map_key(<<"name">>, M) orelse map_get(<<"name">>, M) == <<>> ->
<<"name is empty">>;
not is_map_key(<<"model">>, M) orelse map_get(<<"model">>, M) == <<>> ->
<<"model is empty">>;
not is_map_key(<<"serial_number">>, M) orelse map_get(<<"serial_number">>, M) == <<>> ->
<<"serial_number is empty">>;
true ->
<<"unknown error">>
end.

View File

@ -66,7 +66,7 @@ init([]) ->
case emqtt:start_link(Opts) of case emqtt:start_link(Opts) of
{ok, ConnPid} -> {ok, ConnPid} ->
lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]), lager:debug("[iot_mqtt_publisher] connect success, pid: ~p", [ConnPid]),
{ok, _} = emqtt:connect(ConnPid), % {ok, _} = emqtt:connect(ConnPid),
{ok, #state{conn_pid = ConnPid}}; {ok, #state{conn_pid = ConnPid}};
ignore -> ignore ->
lager:debug("[iot_mqtt_publisher] connect emqx get ignore"), lager:debug("[iot_mqtt_publisher] connect emqx get ignore"),

View File

@ -75,15 +75,16 @@ init([]) ->
Opts = iot_config:emqt_opts(), Opts = iot_config:emqt_opts(),
case emqtt:start_link(Opts) of case emqtt:start_link(Opts) of
{ok, ConnPid} -> {ok, ConnPid} ->
lager:debug("[iot_mqtt_subscriber] connect success, pid: ~p", [ConnPid]), lager:debug("[iot_mqtt_subscriber] start connecting"),
%% host相关的全部事件 %% host相关的全部事件
{ok, _} = emqtt:connect(ConnPid), %{ok, _} = emqtt:connect(ConnPid),
lager:debug("[iot_mqtt_subscriber] connect success, pid: ~p", [ConnPid]),
Topics = [ Topics = [
{<<"$share/nodes_group//server/register">>, 1}, {<<"$share/nodes_group//server/register">>, 1},
{<<"$share/nodes_group//host/+/upstream">>, 1} {<<"$share/nodes_group//host/+/upstream">>, 1}
], ],
SubscribeResult = emqtt:subscribe(ConnPid, Topics), %SubscribeResult = emqtt:subscribe(ConnPid, Topics),
lager:debug("[iot_mqtt_subscriber] subscribe result is: ~p", [SubscribeResult]), % lager:debug("[iot_mqtt_subscriber] subscribe topics: ~p, result is: ~p", [Topics, SubscribeResult]),
{ok, #state{conn_pid = ConnPid}}; {ok, #state{conn_pid = ConnPid}};
ignore -> ignore ->

View File

@ -13,7 +13,7 @@
-export([timestamp/0, number_format/2, current_time/0]). -export([timestamp/0, number_format/2, current_time/0]).
-export([step/3, chunks/2, rand_bytes/1, uuid/0]). -export([step/3, chunks/2, rand_bytes/1, uuid/0]).
-export([json_data/1, json_error/2]). -export([json_data/1, json_error/2]).
-export([queue_limited_in/3, assert_exec/2]). -export([queue_limited_in/3]).
%% %%
timestamp() -> timestamp() ->
@ -81,9 +81,3 @@ queue_limited_in(Item, Q, Num) when is_integer(Num) ->
false -> false ->
queue:in(Item, Q) queue:in(Item, Q)
end. end.
-spec assert_exec(boolean(), any()) -> any().
assert_exec(true, Fun) when is_function(Fun, 0) ->
{ok, Fun()};
assert_exec(_, _) ->
aborted.

View File

@ -83,8 +83,18 @@ find_hosts(Pred, Start, Limit) when is_function(Pred, 1), is_integer(Limit), is_
end. end.
-spec add_host(Host :: #host{}) -> ok | {error, Reason :: any()}. -spec add_host(Host :: #host{}) -> ok | {error, Reason :: any()}.
add_host(Host = #host{}) -> add_host(Host = #host{serial_number = SerialNumber}) ->
case mnesia:transaction(fun() -> mnesia:write(host, Host, write) end) of Fun = fun() ->
Q = qlc:q([E || E <- mnesia:table(host), E#host.serial_number =:= SerialNumber]),
case qlc:e(Q) of
[_|_] ->
mnesia:abort(<<"serial_number exists">>);
[] ->
mnesia:write(host, Host, write)
end
end,
case mnesia:transaction(Fun) of
{atomic, _} -> {atomic, _} ->
ok; ok;
{aborted, Error} -> {aborted, Error} ->