fix directive
This commit is contained in:
parent
e7e75b51ea
commit
c6879d5183
@ -15,6 +15,7 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/0]).
|
-export([start_link/0]).
|
||||||
|
-export([mock/0, mock/1]).
|
||||||
|
|
||||||
%% gen_server callbacks
|
%% gen_server callbacks
|
||||||
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
-export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]).
|
||||||
@ -41,6 +42,25 @@
|
|||||||
%%% API
|
%%% API
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
|
mock() ->
|
||||||
|
LocationCode = <<"0001000290040150004002560">>,
|
||||||
|
mock(LocationCode).
|
||||||
|
|
||||||
|
mock(LocationCode) when is_binary(LocationCode) ->
|
||||||
|
Req = #{
|
||||||
|
<<"version">> => <<"1.0">>,
|
||||||
|
<<"ts">> => iot_util:current_time(),
|
||||||
|
<<"properties">> => #{
|
||||||
|
<<"type">> => <<"ctrl">>,
|
||||||
|
<<"stype">> => 0,
|
||||||
|
<<"ctype">> => 1,
|
||||||
|
<<"value">> => 1234,
|
||||||
|
<<"timestamp">> => iot_util:current_time()
|
||||||
|
},
|
||||||
|
<<"location_code">> => LocationCode
|
||||||
|
},
|
||||||
|
gen_server:call(?MODULE, {mock, Req}).
|
||||||
|
|
||||||
%% @doc Spawns the server and registers the local name (unique)
|
%% @doc Spawns the server and registers the local name (unique)
|
||||||
-spec(start_link() ->
|
-spec(start_link() ->
|
||||||
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
{ok, Pid :: pid()} | ignore | {error, Reason :: term()}).
|
||||||
@ -77,7 +97,8 @@ init([]) ->
|
|||||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||||
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
{stop, Reason :: term(), Reply :: term(), NewState :: #state{}} |
|
||||||
{stop, Reason :: term(), NewState :: #state{}}).
|
{stop, Reason :: term(), NewState :: #state{}}).
|
||||||
handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) ->
|
handle_call({mock, Request}, _From, State = #state{conn_pid = _ConnPid}) ->
|
||||||
|
publish_directive(Request),
|
||||||
{reply, ok, State}.
|
{reply, ok, State}.
|
||||||
|
|
||||||
%% @private
|
%% @private
|
||||||
@ -101,25 +122,10 @@ handle_info({disconnect, ReasonCode, Properties}, State) ->
|
|||||||
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
||||||
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
||||||
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]),
|
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]),
|
||||||
case catch jiffy:decode(Payload, [return_maps]) of
|
|
||||||
#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams} ->
|
Request = catch jiffy:decode(Payload, [return_maps]),
|
||||||
%% 通过LocationCode查找到主机和Device_uuid
|
publish_directive(Request),
|
||||||
case redis_client:hgetall(LocationCode) of
|
|
||||||
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
|
|
||||||
case iot_host:get_pid(HostUUID) of
|
|
||||||
undefined ->
|
|
||||||
lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]);
|
|
||||||
Pid ->
|
|
||||||
ReceiverPid = self(),
|
|
||||||
spawn(fun() ->
|
|
||||||
DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT),
|
|
||||||
ReceiverPid ! {directive_reply, DirectiveResult}
|
|
||||||
end)
|
|
||||||
end;
|
|
||||||
_ ->
|
|
||||||
lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode])
|
|
||||||
end
|
|
||||||
end,
|
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
||||||
@ -144,7 +150,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
|||||||
{noreply, State#state{conn_pid = undefined}};
|
{noreply, State#state{conn_pid = undefined}};
|
||||||
|
|
||||||
handle_info({directive_reply, Reply}, State = #state{}) ->
|
handle_info({directive_reply, Reply}, State = #state{}) ->
|
||||||
lager:debug("[iot_zd_consumer] get directive_reply: ~p", [Reply]),
|
lager:debug("[iot_zd_consumer] get directive_reply: ~ts", [Reply]),
|
||||||
{noreply, State};
|
{noreply, State};
|
||||||
|
|
||||||
handle_info(Info, State = #state{}) ->
|
handle_info(Info, State = #state{}) ->
|
||||||
@ -182,6 +188,28 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
|||||||
%%% Internal functions
|
%%% Internal functions
|
||||||
%%%===================================================================
|
%%%===================================================================
|
||||||
|
|
||||||
|
publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}) ->
|
||||||
|
%% 通过LocationCode查找到主机和Device_uuid
|
||||||
|
case redis_client:hgetall(LocationCode) of
|
||||||
|
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
|
||||||
|
case iot_host:get_pid(HostUUID) of
|
||||||
|
undefined ->
|
||||||
|
lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]);
|
||||||
|
Pid ->
|
||||||
|
ReceiverPid = self(),
|
||||||
|
spawn(fun() ->
|
||||||
|
DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT),
|
||||||
|
ReceiverPid ! {directive_reply, DirectiveResult}
|
||||||
|
end)
|
||||||
|
end;
|
||||||
|
{ok, Map} ->
|
||||||
|
lager:notice("[iot_zd_consumer] location_code: ~p, redis data invalid: ~p", [LocationCode, Map]);
|
||||||
|
_ ->
|
||||||
|
lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode])
|
||||||
|
end;
|
||||||
|
publish_directive(Other) ->
|
||||||
|
lager:notice("[iot_zd_consumer] get a unknown directive", [Other]).
|
||||||
|
|
||||||
-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
|
-spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}.
|
||||||
create_consumer(Props) when is_list(Props) ->
|
create_consumer(Props) when is_list(Props) ->
|
||||||
Node = atom_to_binary(node()),
|
Node = atom_to_binary(node()),
|
||||||
|
|||||||
@ -21,7 +21,8 @@
|
|||||||
|
|
||||||
%% API
|
%% API
|
||||||
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
-export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]).
|
||||||
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1, publish_directive/6, send_directive/5]).
|
-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]).
|
||||||
|
-export([publish_directive/6, send_directive/5]).
|
||||||
-export([create_session/2, attach_channel/2]).
|
-export([create_session/2, attach_channel/2]).
|
||||||
-export([reload_device/2, delete_device/2, activate_device/3]).
|
-export([reload_device/2, delete_device/2, activate_device/3]).
|
||||||
-export([heartbeat/1]).
|
-export([heartbeat/1]).
|
||||||
@ -124,9 +125,9 @@ publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Time
|
|||||||
case gen_statem:call(Pid, {publish_directive, self(), Directive}) of
|
case gen_statem:call(Pid, {publish_directive, self(), Directive}) of
|
||||||
{ok, Ref} ->
|
{ok, Ref} ->
|
||||||
receive
|
receive
|
||||||
{directive_response, Ref} ->
|
{ws_response, Ref} ->
|
||||||
ok;
|
ok;
|
||||||
{directive_response, Ref, Response} ->
|
{ws_response, Ref, Response} ->
|
||||||
{ok, Response}
|
{ok, Response}
|
||||||
after Timeout ->
|
after Timeout ->
|
||||||
{error, timeout}
|
{error, timeout}
|
||||||
@ -278,7 +279,7 @@ handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_
|
|||||||
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
||||||
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
Ref = ws_channel:publish_directive(ChannelPid, ReceiverPid, Directive),
|
Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>),
|
||||||
|
|
||||||
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
{keep_state, State, [{reply, From, {ok, Ref}}]};
|
||||||
|
|
||||||
@ -294,7 +295,7 @@ handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED,
|
|||||||
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]),
|
||||||
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
Directive = iot_cipher_aes:encrypt(AES, Directive0),
|
||||||
%% 通过websocket发送请求
|
%% 通过websocket发送请求
|
||||||
ws_channel:send_directive(ChannelPid, Directive),
|
ws_channel:send(ChannelPid, <<16:8, Directive/binary>>),
|
||||||
|
|
||||||
{keep_state, State, [{reply, From, ok}]};
|
{keep_state, State, [{reply, From, ok}]};
|
||||||
|
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user