diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 8fb6d7a..6be7269 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -18,16 +18,11 @@ -define(TASK_STATUS_FAILED, 0). %% 离线 -define(TASK_STATUS_OK, 1). %% 在线 --record(router, { - router_id -}). +%% 主机端上报数据类型标识 -define(METHOD_CREATE_SESSION, 16#01). -define(METHOD_DATA, 16#02). -define(METHOD_PING, 16#03). -define(METHOD_INFORM, 16#04). -define(METHOD_FEEDBACK_STEP, 16#05). --define(METHOD_FEEDBACK_RESULT, 16#06). - - - +-define(METHOD_FEEDBACK_RESULT, 16#06). \ No newline at end of file diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 67a403f..febd3eb 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -236,45 +236,50 @@ handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) -> %% 需要将消息转换成json格式然后再处理, 需要在host进程里面处理 %% 收到消息则认为主机端已经发送了心跳包 -handle_event(cast, {handle, Payload}, _StateName, State) -> - lager:debug("[iot_host] get payload: ~p", [Payload]), - Message = catch jiffy:decode(Payload, [return_maps]), - lager:debug("[iot_host] get message: ~p", [Message]), - gen_statem:cast(self(), {handle_message, Message}), - {keep_state, State#state{is_answered = true}}; +handle_event(cast, {handle, <>}, denied, State = #state{uuid = UUID}) -> + case catch jiffy:decode(Params, [return_maps]) of + #{<<"pub_key">> := PubKey} -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), -handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) -> - lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = #{<<"a">> => false, <<"aes">> => <<"">>}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) - after 10000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) - end, - {keep_state, State}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> - lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), - Reply = #{<<"a">> => true, <<"aes">> => Aes}, - EncReply = iot_cipher_rsa:encode(Reply, PubKey), - - {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), - receive - {ok, Ref, PacketId} -> - lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), - {next_state, session, State#state{pub_key = PubKey}} - after 10000 -> - lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), - {keep_state, State} + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]) + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]) + end, + {keep_state, State#state{is_answered = true}}; + Msg -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error: ~p", [UUID, Msg]), + {keep_state, State} end; -handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) -> - PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)), +handle_event(cast, {handle, <>}, _StateName, State = #state{uuid = UUID, aes = Aes}) -> + case catch jiffy:decode(Params, [return_maps]) of + #{<<"pub_key">> := PubKey} -> + lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]), + Reply = #{<<"a">> => true, <<"aes">> => Aes}, + EncReply = iot_cipher_rsa:encode(Reply, PubKey), + + {ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1), + receive + {ok, Ref, PacketId} -> + lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]), + {next_state, session, State#state{pub_key = PubKey, is_answered = true}} + after 10000 -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]), + {keep_state, State#state{is_answered = true}} + end; + Msg -> + lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error: ~p", [UUID, Msg]), + {keep_state, State} + end; + +handle_event(cast, {handle, <>}, session, State = #state{uuid = UUID, aes = AES}) -> + PlainData = iot_cipher_aes:decrypt(AES, Data), case catch jiffy:decode(PlainData, [return_maps]) of Infos when is_list(Infos) -> lager:debug("[iot_host] the data is: ~p", [Infos]), @@ -292,19 +297,23 @@ handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> : poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end) end, Infos); Other -> - lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other]) + lager:debug("[iot_message_handler] the data is invalid json: ~p", [Other]) end, - {keep_state, State}; + {keep_state, State#state{is_answered = true}}; -handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) -> - MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)), - Metrics = jiffy:decode(MetricsInfo, [return_maps]), +handle_event(cast, {handle, <>}, session, State = #state{uuid = UUID, aes = AES}) -> + MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric), + case catch jiffy:decode(MetricsInfo, [return_maps]) of + Metrics when is_map(Metrics) -> + lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), + {keep_state, State#state{metrics = Metrics, is_answered = true}}; + Other -> + lager:debug("[iot_message_handler] host_id: ~p, ping is invalid json: ~p", [UUID, Other]), + {keep_state, State#state{is_answered = true}} + end; - lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]), - {keep_state, State#state{metrics = Metrics}}; - -handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) -> - Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), +handle_event(cast, {handle, <>}, session, State = #state{host_id = HostId, aes = AES}) -> + Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"at">> := At, <<"services">> := ServiceInforms} -> lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) -> @@ -327,10 +336,10 @@ handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> Error -> lager:warning("[iot_host] inform get error: ~p", [Error]) end, - {keep_state, State}; + {keep_state, State#state{is_answered = true}}; -handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) -> - Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)), +handle_event(cast, {handle, <>}, session, State = #state{aes = AES}) -> + Info = iot_cipher_aes:decrypt(AES, Info0), case catch jiffy:decode(Info, [return_maps]) of #{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} -> scene_feedback:insert(#{