fix upload format

This commit is contained in:
anlicheng 2023-06-20 17:20:20 +08:00
parent 6ed5067df4
commit d9fbd8b55f
2 changed files with 59 additions and 55 deletions

View File

@ -18,16 +18,11 @@
-define(TASK_STATUS_FAILED, 0). %% 线
-define(TASK_STATUS_OK, 1). %% 线
-record(router, {
router_id
}).
%%
-define(METHOD_CREATE_SESSION, 16#01).
-define(METHOD_DATA, 16#02).
-define(METHOD_PING, 16#03).
-define(METHOD_INFORM, 16#04).
-define(METHOD_FEEDBACK_STEP, 16#05).
-define(METHOD_FEEDBACK_RESULT, 16#06).
-define(METHOD_FEEDBACK_RESULT, 16#06).

View File

@ -236,45 +236,50 @@ handle_event(cast, need_auth, _StateName, State = #state{uuid = UUID}) ->
%% json格式然后再处理, host进程里面处理
%%
handle_event(cast, {handle, Payload}, _StateName, State) ->
lager:debug("[iot_host] get payload: ~p", [Payload]),
Message = catch jiffy:decode(Payload, [return_maps]),
lager:debug("[iot_host] get message: ~p", [Message]),
gen_statem:cast(self(), {handle_message, Message}),
{keep_state, State#state{is_answered = true}};
handle_event(cast, {handle, <<?METHOD_CREATE_SESSION:8, Params/binary>>}, denied, State = #state{uuid = UUID}) ->
case catch jiffy:decode(Params, [return_maps]) of
#{<<"pub_key">> := PubKey} ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, denied, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => false, <<"aes">> => <<"">>},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId])
after 10000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID])
end,
{keep_state, State};
handle_event(cast, {handle_message, #{<<"method">> := <<"create_session">>, <<"params">> := #{<<"pub_key">> := PubKey}}}, _StateName, State = #state{uuid = UUID, aes = Aes}) ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => true, <<"aes">> => Aes},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]),
{next_state, session, State#state{pub_key = PubKey}}
after 10000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]),
{keep_state, State}
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId])
after 10000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID])
end,
{keep_state, State#state{is_answered = true}};
Msg ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error: ~p", [UUID, Msg]),
{keep_state, State}
end;
handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> := Data}}, session, State = #state{uuid = UUID, aes = AES}) ->
PlainData = iot_cipher_aes:decrypt(AES, base64:decode(Data)),
handle_event(cast, {handle, <<?METHOD_CREATE_SESSION:8, Params/binary>>}, _StateName, State = #state{uuid = UUID, aes = Aes}) ->
case catch jiffy:decode(Params, [return_maps]) of
#{<<"pub_key">> := PubKey} ->
lager:debug("[iot_host] host_id uuid: ~p, create_session", [UUID]),
Reply = #{<<"a">> => true, <<"aes">> => Aes},
EncReply = iot_cipher_rsa:encode(Reply, PubKey),
{ok, Ref} = iot_mqtt_publisher:publish(downstream_topic(UUID), <<10:8, EncReply/binary>>, 1),
receive
{ok, Ref, PacketId} ->
lager:debug("[iot_host] host_id uuid: ~p, packet_id: ~p, publish register reply success", [UUID, PacketId]),
{next_state, session, State#state{pub_key = PubKey, is_answered = true}}
after 10000 ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error is: timeout", [UUID]),
{keep_state, State#state{is_answered = true}}
end;
Msg ->
lager:debug("[iot_host] host_id uuid: ~p, publish register reply get error: ~p", [UUID, Msg]),
{keep_state, State}
end;
handle_event(cast, {handle, <<?METHOD_DATA:8, Data/binary>>}, session, State = #state{uuid = UUID, aes = AES}) ->
PlainData = iot_cipher_aes:decrypt(AES, Data),
case catch jiffy:decode(PlainData, [return_maps]) of
Infos when is_list(Infos) ->
lager:debug("[iot_host] the data is: ~p", [Infos]),
@ -292,19 +297,23 @@ handle_event(cast, {handle_message, #{<<"method">> := <<"data">>, <<"params">> :
poolboy:transaction(influx_pool, fun(Pid) -> influx_client:write(Pid, <<"iot">>, <<"iot">>, Precision, Points) end)
end, Infos);
Other ->
lager:debug("[iot_message_handler] the metric is invalid json: ~p", [Other])
lager:debug("[iot_message_handler] the data is invalid json: ~p", [Other])
end,
{keep_state, State};
{keep_state, State#state{is_answered = true}};
handle_event(cast, {handle_message, #{<<"method">> := <<"ping">>, <<"params">> := CipherMetric}}, session, State = #state{uuid = UUID, aes = AES}) ->
MetricsInfo = iot_cipher_aes:decrypt(AES, base64:decode(CipherMetric)),
Metrics = jiffy:decode(MetricsInfo, [return_maps]),
handle_event(cast, {handle, <<?METHOD_PING:8, CipherMetric/binary>>}, session, State = #state{uuid = UUID, aes = AES}) ->
MetricsInfo = iot_cipher_aes:decrypt(AES, CipherMetric),
case catch jiffy:decode(MetricsInfo, [return_maps]) of
Metrics when is_map(Metrics) ->
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics, is_answered = true}};
Other ->
lager:debug("[iot_message_handler] host_id: ~p, ping is invalid json: ~p", [UUID, Other]),
{keep_state, State#state{is_answered = true}}
end;
lager:debug("[iot_host] host_id uuid: ~p, get ping: ~p", [UUID, Metrics]),
{keep_state, State#state{metrics = Metrics}};
handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">> := Info0}}, session, State = #state{host_id = HostId, aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
handle_event(cast, {handle, <<?METHOD_INFORM:8, Info0/binary>>}, session, State = #state{host_id = HostId, aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"at">> := At, <<"services">> := ServiceInforms} ->
lists:foreach(fun(#{<<"props">> := Props, <<"name">> := Name, <<"version">> := Version, <<"version_copy">> := VersionCopy, <<"status">> := Status}) ->
@ -327,10 +336,10 @@ handle_event(cast, {handle_message, #{<<"method">> := <<"inform">>, <<"params">>
Error ->
lager:warning("[iot_host] inform get error: ~p", [Error])
end,
{keep_state, State};
{keep_state, State#state{is_answered = true}};
handle_event(cast, {handle_message, #{<<"method">> := <<"feedback_result">>, <<"params">> := Info0}}, session, State = #state{aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, base64:decode(Info0)),
handle_event(cast, {handle, <<?METHOD_FEEDBACK_RESULT:8, Info0/binary>>}, session, State = #state{aes = AES}) ->
Info = iot_cipher_aes:decrypt(AES, Info0),
case catch jiffy:decode(Info, [return_maps]) of
#{<<"task_id">> := TaskId, <<"time">> := Time, <<"code">> := Code, <<"reason">> := Reason, <<"error">> := Error, <<"type">> := Type} ->
scene_feedback:insert(#{