This commit is contained in:
anlicheng 2025-05-07 17:44:42 +08:00
parent 7e3d522ecd
commit 3d460ce4e3
3 changed files with 1129 additions and 1247 deletions

View File

@ -23,8 +23,7 @@
-define('AUTH_REPLY_PB_H', true). -define('AUTH_REPLY_PB_H', true).
-record(auth_reply, -record(auth_reply,
{code = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits {code = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
message = <<>> :: unicode:chardata() | undefined, % = 2, optional message = <<>> :: unicode:chardata() | undefined % = 2, optional
repository_url = <<>> :: unicode:chardata() | undefined % = 3, optional
}). }).
-endif. -endif.
@ -35,28 +34,55 @@
}). }).
-endif. -endif.
-ifndef('SESSION_REQUEST_PB_H'). -ifndef('DEPLOY_PB_H').
-define('SESSION_REQUEST_PB_H', true). -define('DEPLOY_PB_H', true).
-record(session_request, -record(deploy,
{ {task_id = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits
service_id = <<>> :: unicode:chardata() | undefined, % = 2, optional
tar_url = <<>> :: unicode:chardata() | undefined % = 3, optional
}). }).
-endif. -endif.
-ifndef('SESSION_REPLY_PB_H'). -ifndef('EFKA_RESPONSE_PB_H').
-define('SESSION_REPLY_PB_H', true). -define('EFKA_RESPONSE_PB_H', true).
-record(session_reply, -record(efka_response,
{a = false :: boolean() | 0 | 1 | undefined % = 1, optional {code = 0 :: integer() | undefined, % = 1, optional, 32 bits
result = <<>> :: unicode:chardata() | undefined, % = 2, optional
message = <<>> :: unicode:chardata() | undefined % = 3, optional
}).
-endif.
-ifndef('TOPIC_MESSAGE_PB_H').
-define('TOPIC_MESSAGE_PB_H', true).
-record(topic_message,
{topic = <<>> :: unicode:chardata() | undefined, % = 1, optional
content = <<>> :: unicode:chardata() | undefined % = 2, optional
}).
-endif.
-ifndef('SERVICE_PARAMS_PB_H').
-define('SERVICE_PARAMS_PB_H', true).
-record(service_params,
{service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
params = <<>> :: unicode:chardata() | undefined, % = 2, optional
timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}).
-endif.
-ifndef('SERVICE_METRICS_PB_H').
-define('SERVICE_METRICS_PB_H', true).
-record(service_metrics,
{service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
metrics = <<>> :: unicode:chardata() | undefined, % = 2, optional
timeout = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
}). }).
-endif. -endif.
-ifndef('DATA_PB_H'). -ifndef('DATA_PB_H').
-define('DATA_PB_H', true). -define('DATA_PB_H', true).
-record(data, -record(data,
{device_uuid = <<>> :: unicode:chardata() | undefined, % = 1, optional {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
service_name = <<>> :: unicode:chardata() | undefined, % = 2, optional metric = <<>> :: unicode:chardata() | undefined % = 2, optional
at = 0 :: integer() | undefined, % = 3, optional, 32 bits
tags = [] :: [{unicode:chardata(), unicode:chardata()}] | undefined, % = 4
fields = [] :: [iodata()] | undefined % = 5, repeated
}). }).
-endif. -endif.
@ -75,66 +101,44 @@
cpu_temperature = 0.0 :: float() | integer() | infinity | '-infinity' | nan | undefined, % = 10, optional cpu_temperature = 0.0 :: float() | integer() | infinity | '-infinity' | nan | undefined, % = 10, optional
disk = [] :: [integer()] | undefined, % = 11, repeated, 32 bits disk = [] :: [integer()] | undefined, % = 11, repeated, 32 bits
memory = [] :: [integer()] | undefined, % = 12, repeated, 32 bits memory = [] :: [integer()] | undefined, % = 12, repeated, 32 bits
interfaces = [] :: [iodata()] | undefined % = 13, repeated interfaces = <<>> :: unicode:chardata() | undefined % = 13, optional
}). }).
-endif. -endif.
-ifndef('SERVICE_INFORM_PB_H'). -ifndef('SERVICE_INFORM_PB_H').
-define('SERVICE_INFORM_PB_H', true). -define('SERVICE_INFORM_PB_H', true).
-record(service_inform, -record(service_inform,
{name = <<>> :: unicode:chardata() | undefined, % = 1, optional {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
props = <<>> :: unicode:chardata() | undefined, % = 2, optional props = <<>> :: unicode:chardata() | undefined, % = 2, optional
version = <<>> :: unicode:chardata() | undefined, % = 3, optional status = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
version_copy = <<>> :: unicode:chardata() | undefined, % = 4, optional timestamp = 0 :: non_neg_integer() | undefined % = 4, optional, 32 bits
status = 0 :: non_neg_integer() | undefined, % = 5, optional, 32 bits
at = 0 :: non_neg_integer() | undefined % = 6, optional, 32 bits
}). }).
-endif. -endif.
-ifndef('FEEDBACK_STEP_PB_H'). -ifndef('FEEDBACK_PHASE_PB_H').
-define('FEEDBACK_STEP_PB_H', true). -define('FEEDBACK_PHASE_PB_H', true).
-record(feedback_step, -record(feedback_phase,
{task_id = <<>> :: unicode:chardata() | undefined, % = 1, optional {task_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
code = 0 :: non_neg_integer() | undefined % = 2, optional, 32 bits phase = <<>> :: unicode:chardata() | undefined, % = 2, optional
}). timestamp = 0 :: non_neg_integer() | undefined % = 3, optional, 32 bits
-endif.
-ifndef('FEEDBACK_RESULT_PB_H').
-define('FEEDBACK_RESULT_PB_H', true).
-record(feedback_result,
{task_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
task_type = <<>> :: unicode:chardata() | undefined, % = 2, optional
time = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
code = 0 :: non_neg_integer() | undefined, % = 4, optional, 32 bits
reason = <<>> :: unicode:chardata() | undefined, % = 5, optional
error = <<>> :: unicode:chardata() | undefined % = 6, optional
}). }).
-endif. -endif.
-ifndef('EVENT_PB_H'). -ifndef('EVENT_PB_H').
-define('EVENT_PB_H', true). -define('EVENT_PB_H', true).
-record(event, -record(event,
{event_type = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
params = <<>> :: iodata() | undefined % = 2, optional event_type = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
params = <<>> :: unicode:chardata() | undefined % = 3, optional
}). }).
-endif. -endif.
-ifndef('AI_EVENT_PB_H'). -ifndef('AI_EVENT_PB_H').
-define('AI_EVENT_PB_H', true). -define('AI_EVENT_PB_H', true).
-record(ai_event, -record(ai_event,
{event_type = 0 :: non_neg_integer() | undefined, % = 1, optional, 32 bits {service_id = <<>> :: unicode:chardata() | undefined, % = 1, optional
params = <<>> :: iodata() | undefined % = 2, optional event_type = 0 :: non_neg_integer() | undefined, % = 2, optional, 32 bits
}). params = <<>> :: unicode:chardata() | undefined % = 3, optional
-endif.
-ifndef('DIRECTIVE_PB_H').
-define('DIRECTIVE_PB_H', true).
-record(directive,
{device_uuid = <<>> :: unicode:chardata() | undefined, % = 1, optional
version = <<>> :: unicode:chardata() | undefined, % = 2, optional
directive_type = 0 :: non_neg_integer() | undefined, % = 3, optional, 32 bits
timeout = 0 :: non_neg_integer() | undefined, % = 4, optional, 32 bits
directive = <<>> :: iodata() | undefined % = 5, optional
}). }).
-endif. -endif.

View File

@ -22,8 +22,8 @@
%% API %% API
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
-export([get_metric/1, publish_message/4, get_status/1]). -export([get_metric/1, get_status/1]).
-export([publish_directive/6, send_directive/5]). -export([publish_directive/4, send_directive/3]).
-export([attach_channel/2]). -export([attach_channel/2]).
-export([reload_device/2, delete_device/2, activate_device/3]). -export([reload_device/2, delete_device/2, activate_device/3]).
-export([heartbeat/1]). -export([heartbeat/1]).
@ -83,11 +83,10 @@ get_metric(Pid) when is_pid(Pid) ->
attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) -> attach_channel(Pid, ChannelPid) when is_pid(Pid), is_pid(ChannelPid) ->
gen_statem:call(Pid, {attach_channel, ChannelPid}). gen_statem:call(Pid, {attach_channel, ChannelPid}).
%% -spec publish_directive(Pid :: pid(), Topic :: binary(), Content :: binary(), Timeout :: integer()) ->
-spec publish_message(Pid :: pid(), CommandType :: integer(), Params :: binary() | {Encrypt :: atom(), Params :: binary()}, Timeout :: integer()) ->
ok | {ok, Response :: binary()} | {error, Reason :: any()}. ok | {ok, Response :: binary()} | {error, Reason :: any()}.
publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(CommandType), is_integer(Timeout) -> publish_directive(Pid, Topic, Content, Timeout) when is_pid(Pid), is_binary(Topic), is_binary(Content), is_integer(Timeout) ->
case gen_statem:call(Pid, {publish_message, self(), CommandType, Params}) of case gen_statem:call(Pid, {publish_directive, self(), Topic, Content}) of
{ok, Ref} -> {ok, Ref} ->
receive receive
{ws_response, Ref} -> {ws_response, Ref} ->
@ -101,47 +100,10 @@ publish_message(Pid, CommandType, Params, Timeout) when is_pid(Pid), is_integer(
{error, Reason} {error, Reason}
end. end.
-spec publish_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map(), Timeout :: integer()) -> -spec send_directive(Pid :: pid(), Topic :: binary(), Content :: binary()) ->
ok | {ok, Response :: binary()} | {error, Reason :: any()}.
publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Timeout)
when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams), is_integer(Timeout) ->
Directive = #{
<<"device_uuid">> => DeviceUUID,
<<"version">> => Version,
<<"directive_type">> => DirectiveType,
<<"directive">> => DirectiveParams
},
JsonDirective = iolist_to_binary(jiffy:encode(Directive, [force_utf8])),
case gen_statem:call(Pid, {publish_directive, self(), JsonDirective}) of
{ok, Ref} ->
receive
{ws_response, Ref} ->
ok;
{ws_response, Ref, Response} ->
{ok, Response}
after Timeout ->
{error, timeout}
end;
{error, Reason} ->
{error, Reason}
end.
-spec send_directive(Pid :: pid(), DeviceUUID :: binary(), DirectiveType :: integer(), Version :: binary(), DirectiveParams :: binary() | map()) ->
ok | {error, Reason :: any()}. ok | {error, Reason :: any()}.
send_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams) send_directive(Pid, Topic, Content) when is_pid(Pid), is_binary(Topic), is_binary(Content) ->
when is_pid(Pid), is_binary(DeviceUUID), is_integer(DirectiveType), is_binary(Version), is_binary(DirectiveParams); is_map(DirectiveParams) -> gen_statem:call(Pid, {send_directive, Topic, Content}).
Directive = #{
<<"device_uuid">> => DeviceUUID,
<<"version">> => Version,
<<"directive_type">> => DirectiveType,
<<"directive">> => DirectiveParams
},
JsonDirective = iolist_to_binary(jiffy:encode(Directive, [force_utf8])),
gen_statem:call(Pid, {send_directive, DeviceUUID, JsonDirective}).
%% %%
@ -249,33 +211,38 @@ handle_event({call, From}, {publish_message, _, _, _}, _, State = #state{uuid =
{keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]}; {keep_state, State, [{reply, From, {error, <<"主机离线,发送命令失败"/utf8>>}}]};
%% , aes加密session是存在的 %% , aes加密session是存在的
handle_event({call, From}, {publish_directive, ReceiverPid, Directive}, ?STATE_ACTIVATED, handle_event({call, From}, {publish_directive, ReceiverPid, Topic, Content}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) -> State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
lager:debug("[iot_host] host: ~p, publish_directive to topic: ~p, content: ~p", [UUID, Topic, Content]),
BinTopicMessage = message_pb:encode_msg(#topic_message{topic = Topic, content = Content}),
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive]),
%% websocket发送请求 %% websocket发送请求
Ref = tcp_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>), Ref = tcp_channel:publish(ChannelPid, ReceiverPid, <<16:8, BinTopicMessage/binary>>),
{keep_state, State, [{reply, From, {ok, Ref}}]}; {keep_state, State, [{reply, From, {ok, Ref}}]};
%% %%
handle_event({call, From}, {publish_directive, _, Directive}, _, State = #state{uuid = UUID}) -> handle_event({call, From}, {publish_directive, _, Topic, Content}, _, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] uuid: ~p, publish_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]), lager:debug("[iot_host] uuid: ~p, publish_directive to topic: ~p, content: ~p, invalid state: ~p", [UUID, Topic, Content, state_map(State)]),
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
%% !! device_uuid, json的格式 %% !! device_uuid, json的格式
handle_event({call, From}, {send_directive, DeviceUUID, Directive}, ?STATE_ACTIVATED, handle_event({call, From}, {send_directive, Topic, Content}, ?STATE_ACTIVATED,
State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) -> State = #state{uuid = UUID, channel_pid = ChannelPid, has_session = true}) ->
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive]), BinTopicMessage = message_pb:encode_msg(#topic_message{topic = Topic, content = Content}),
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Content]),
%% websocket发送请求 %% websocket发送请求
tcp_channel:send(ChannelPid, <<16:8, DeviceUUID/binary, Directive/binary>>), tcp_channel:send(ChannelPid, <<16:8, BinTopicMessage/binary>>),
{keep_state, State, [{reply, From, ok}]}; {keep_state, State, [{reply, From, ok}]};
%% %%
handle_event({call, From}, {send_directive, Directive}, _, State = #state{uuid = UUID}) -> handle_event({call, From}, {send_directive, Topic, Content}, _, State = #state{uuid = UUID}) ->
lager:debug("[iot_host] uuid: ~p, send_directive: ~p, invalid state: ~p", [UUID, Directive, state_map(State)]), lager:debug("[iot_host] host_uuid: ~p, send_directive to topic: ~p, content: ~p, invalid state: ~p",
[UUID, Topic, Content, state_map(State)]),
{keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]}; {keep_state, State, [{reply, From, {error, <<"主机离线,发送指令失败"/utf8>>}}]};
%% %%

File diff suppressed because it is too large Load Diff