simple host code
This commit is contained in:
parent
069e4cac02
commit
39a3a91023
@ -14,7 +14,7 @@
|
||||
|
||||
%% API
|
||||
-export([start_link/2, get_name/1, get_pid/1, handle/2, reload/1, activate/2]).
|
||||
-export([get_metric/1, publish_message/4, get_aes/1, rsa_encode/3]).
|
||||
-export([get_metric/1, publish_message/4, get_aes/1]).
|
||||
-export([has_session/1, create_session/2, attach_channel/2]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
@ -24,11 +24,7 @@
|
||||
host_id :: integer(),
|
||||
%% 从数据库里面读取到的数据
|
||||
uuid :: binary(),
|
||||
%% 当前的状态
|
||||
status :: integer(),
|
||||
|
||||
%% rsa公钥
|
||||
pub_key = <<>> :: binary(),
|
||||
%% aes的key, 后续通讯需要基于这个加密
|
||||
aes = <<>> :: binary(),
|
||||
|
||||
@ -87,12 +83,6 @@ attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
|
||||
create_session(Pid, PubKey) when is_pid(Pid), is_binary(PubKey) ->
|
||||
gen_statem:call(Pid, {create_session, PubKey}).
|
||||
|
||||
%% 基于rsa加密的指令都是不需要会话存在的
|
||||
-spec rsa_encode(Pid :: pid(), CommandType :: integer(), PlainText :: binary()) ->
|
||||
{ok, EncText :: binary()} | {error, Reason :: binary()}.
|
||||
rsa_encode(Pid, CommandType, PlainText) when is_pid(Pid), is_integer(CommandType), is_binary(PlainText) ->
|
||||
gen_statem:call(Pid, {rsa_encode, CommandType, PlainText}).
|
||||
|
||||
%% 这里占用的的调用进程的时间
|
||||
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
|
||||
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
|
||||
@ -138,7 +128,7 @@ init([UUID]) ->
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||
activated
|
||||
end,
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes, status = Status}};
|
||||
{ok, StateName, #state{host_id = HostId, uuid = UUID, aes = Aes}};
|
||||
undefined ->
|
||||
lager:warning("[iot_host] host uuid: ~p, loaded from mysql failed", [UUID]),
|
||||
ignore
|
||||
@ -174,14 +164,7 @@ handle_event({call, From}, has_session, StateName, State) ->
|
||||
HasSession = (StateName =:= session) orelse false,
|
||||
{keep_state, State, [{reply, From, HasSession}]};
|
||||
|
||||
%% 基于rsa加密
|
||||
handle_event({call, From}, {rsa_encode, CommandType, PlainText}, session, State = #state{pub_key = PubKey}) ->
|
||||
Reply = iot_cipher_rsa:encode(PlainText, PubKey),
|
||||
{keep_state, State, [{reply, From, {ok, <<CommandType:8, Reply/binary>>}}]};
|
||||
handle_event({call, From}, {rsa_encode, _, _}, _, State) ->
|
||||
{keep_state, State, [{reply, From, {error, <<"会话未建立"/utf8>>}}]};
|
||||
|
||||
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的
|
||||
%% 发送普通格式的消息, 激活的时候,会话时创建不成功的; 发送aes类型的命令的时候,必须要求session是存在的
|
||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Command0}}, session, State = #state{aes = AES, channel_pid = ChannelPid}) ->
|
||||
Command = iot_cipher_aes:encrypt(AES, Command0),
|
||||
%% 通过websocket发送请求
|
||||
@ -189,6 +172,7 @@ handle_event({call, From}, {publish_message, ReceiverPid, CommandType, {aes, Com
|
||||
|
||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||
|
||||
%% 只要channel存在,就负责将消息推送到边缘端主机
|
||||
handle_event({call, From}, {publish_message, ReceiverPid, CommandType, Command}, _, State = #state{channel_pid = ChannelPid}) when is_binary(Command), is_pid(ChannelPid) ->
|
||||
%% 通过websocket发送请求
|
||||
lager:debug("[iot_host] will publish_message: ~p", [Command]),
|
||||
@ -206,9 +190,9 @@ handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
|
||||
lager:debug("[iot_host] reload host uuid: ~p, successed", [Host]),
|
||||
case StateName == denied andalso Status =/= ?HOST_STATUS_INACTIVE of
|
||||
true ->
|
||||
{next_state, activated, State#state{status = Status}, [{reply, From, ok}]};
|
||||
{next_state, activated, State, [{reply, From, ok}]};
|
||||
false ->
|
||||
{keep_state, State#state{status = Status}, [{reply, From, ok}]}
|
||||
{keep_state, State, [{reply, From, ok}]}
|
||||
end;
|
||||
undefined ->
|
||||
lager:debug("[iot_host] reload host uuid: ~p, failed", [UUID]),
|
||||
@ -217,7 +201,7 @@ handle_event({call, From}, reload, StateName, State = #state{uuid = UUID}) ->
|
||||
|
||||
%% 关闭授权
|
||||
handle_event({call, From}, {activate, false}, _, State) ->
|
||||
{next_state, denied, State#state{pub_key = <<>>}, [{reply, From, ok}]};
|
||||
{next_state, denied, State, [{reply, From, ok}]};
|
||||
%% 开启授权
|
||||
handle_event({call, From}, {activate, true}, denied, State) ->
|
||||
{next_state, activated, State, [{reply, From, ok}]};
|
||||
@ -252,7 +236,7 @@ handle_event({call, From}, {create_session, PubKey}, _StateName, State = #state{
|
||||
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
|
||||
{ok, _} = host_bo:change_status(UUID, ?HOST_STATUS_ONLINE),
|
||||
|
||||
{next_state, session, State#state{status = ?HOST_STATUS_ONLINE}, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
{next_state, session, State, [{reply, From, {ok, <<10:8, EncReply/binary>>}}]};
|
||||
|
||||
%% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理, 数据带有props,服务端暂时未用到
|
||||
handle_event(cast, {handle, {data, Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
|
||||
@ -294,7 +278,7 @@ handle_event(cast, {handle, {ping, CipherMetric}}, session, State = #state{uuid
|
||||
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
|
||||
{keep_state, State#state{metrics = Metrics}};
|
||||
Other ->
|
||||
lager:debug("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
|
||||
lager:warning("[iot_host] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
@ -366,9 +350,9 @@ handle_event(info, {'DOWN', Ref, process, ChannelPid, Reason}, StateName, State
|
||||
%% 会话状态如果链接丢失,需要切换到activated状态,其他情况保持不变
|
||||
case StateName =:= session of
|
||||
true ->
|
||||
{next_state, activated, State#state{status = ?HOST_STATUS_OFFLINE, channel_pid = undefined}};
|
||||
{next_state, activated, State#state{channel_pid = undefined}};
|
||||
false ->
|
||||
{keep_state, State#state{status = ?HOST_STATUS_OFFLINE, channel_pid = undefined}}
|
||||
{keep_state, State#state{channel_pid = undefined}}
|
||||
end;
|
||||
|
||||
handle_event(EventType, EventContent, StateName, State) ->
|
||||
@ -381,10 +365,9 @@ handle_event(EventType, EventContent, StateName, State) ->
|
||||
%% terminate. It should be the opposite of Module:init/1 and do any
|
||||
%% necessary cleaning up. When it returns, the gen_statem terminates with
|
||||
%% Reason. The return value is ignored.
|
||||
terminate(Reason, _StateName, _State = #state{uuid = UUID}) ->
|
||||
lager:debug("[iot_host] terminate with reason: ~p", [Reason]),
|
||||
ChangeResult = host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||
lager:debug("[iot_host] change host: ~p, status result is: ~p", [UUID, ChangeResult]),
|
||||
terminate(Reason, StateName, _State = #state{uuid = UUID}) ->
|
||||
lager:debug("[iot_host] host: ~p, terminate with reason: ~p, state_name: ~p", [UUID, Reason, StateName]),
|
||||
host_bo:change_status(UUID, ?HOST_STATUS_OFFLINE),
|
||||
ok.
|
||||
|
||||
%% @private
|
||||
@ -411,6 +394,4 @@ router_uuid(_, UUID) ->
|
||||
with_device_uuid(Tags, #{<<"device_uuid">> := DeviceUUID}) when DeviceUUID /= <<>> ->
|
||||
Tags#{<<"device_uuid">> => DeviceUUID};
|
||||
with_device_uuid(Tags, _) ->
|
||||
Tags.
|
||||
|
||||
|
||||
Tags.
|
||||
Loading…
x
Reference in New Issue
Block a user