diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index 40ce2a7..1db6399 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -15,6 +15,7 @@ %% API -export([start_link/0]). +-export([mock/0, mock/1]). %% gen_server callbacks -export([init/1, handle_call/3, handle_cast/2, handle_info/2, terminate/2, code_change/3]). @@ -41,6 +42,25 @@ %%% API %%%=================================================================== +mock() -> + LocationCode = <<"0001000290040150004002560">>, + mock(LocationCode). + +mock(LocationCode) when is_binary(LocationCode) -> + Req = #{ + <<"version">> => <<"1.0">>, + <<"ts">> => iot_util:current_time(), + <<"properties">> => #{ + <<"type">> => <<"ctrl">>, + <<"stype">> => 0, + <<"ctype">> => 1, + <<"value">> => 1234, + <<"timestamp">> => iot_util:current_time() + }, + <<"location_code">> => LocationCode + }, + gen_server:call(?MODULE, {mock, Req}). + %% @doc Spawns the server and registers the local name (unique) -spec(start_link() -> {ok, Pid :: pid()} | ignore | {error, Reason :: term()}). @@ -77,7 +97,8 @@ init([]) -> {noreply, NewState :: #state{}, timeout() | hibernate} | {stop, Reason :: term(), Reply :: term(), NewState :: #state{}} | {stop, Reason :: term(), NewState :: #state{}}). -handle_call(_Info, _From, State = #state{conn_pid = _ConnPid}) -> +handle_call({mock, Request}, _From, State = #state{conn_pid = _ConnPid}) -> + publish_directive(Request), {reply, ok, State}. %% @private @@ -101,25 +122,10 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), - case catch jiffy:decode(Payload, [return_maps]) of - #{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams} -> - %% 通过LocationCode查找到主机和Device_uuid - case redis_client:hgetall(LocationCode) of - {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> - case iot_host:get_pid(HostUUID) of - undefined -> - lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); - Pid -> - ReceiverPid = self(), - spawn(fun() -> - DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), - ReceiverPid ! {directive_reply, DirectiveResult} - end) - end; - _ -> - lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) - end - end, + + Request = catch jiffy:decode(Payload, [return_maps]), + publish_directive(Request), + {noreply, State}; handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> @@ -144,7 +150,7 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, State#state{conn_pid = undefined}}; handle_info({directive_reply, Reply}, State = #state{}) -> - lager:debug("[iot_zd_consumer] get directive_reply: ~p", [Reply]), + lager:debug("[iot_zd_consumer] get directive_reply: ~ts", [Reply]), {noreply, State}; handle_info(Info, State = #state{}) -> @@ -182,6 +188,28 @@ code_change(_OldVsn, State = #state{}, _Extra) -> %%% Internal functions %%%=================================================================== +publish_directive(#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams}) -> + %% 通过LocationCode查找到主机和Device_uuid + case redis_client:hgetall(LocationCode) of + {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> + case iot_host:get_pid(HostUUID) of + undefined -> + lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); + Pid -> + ReceiverPid = self(), + spawn(fun() -> + DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), + ReceiverPid ! {directive_reply, DirectiveResult} + end) + end; + {ok, Map} -> + lager:notice("[iot_zd_consumer] location_code: ~p, redis data invalid: ~p", [LocationCode, Map]); + _ -> + lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) + end; +publish_directive(Other) -> + lager:notice("[iot_zd_consumer] get a unknown directive", [Other]). + -spec create_consumer(Props :: list()) -> {ok, ConnPid :: pid()} | {error, Reason :: any()}. create_consumer(Props) when is_list(Props) -> Node = atom_to_binary(node()), diff --git a/apps/iot/src/iot_host.erl b/apps/iot/src/iot_host.erl index 0764b96..914e6a4 100644 --- a/apps/iot/src/iot_host.erl +++ b/apps/iot/src/iot_host.erl @@ -21,7 +21,8 @@ %% API -export([start_link/2, get_name/1, get_alias_name/1, get_pid/1, handle/2, activate/2]). --export([get_metric/1, publish_message/4, get_aes/1, get_status/1, publish_directive/6, send_directive/5]). +-export([get_metric/1, publish_message/4, get_aes/1, get_status/1]). +-export([publish_directive/6, send_directive/5]). -export([create_session/2, attach_channel/2]). -export([reload_device/2, delete_device/2, activate_device/3]). -export([heartbeat/1]). @@ -124,9 +125,9 @@ publish_directive(Pid, DeviceUUID, DirectiveType, Version, DirectiveParams, Time case gen_statem:call(Pid, {publish_directive, self(), Directive}) of {ok, Ref} -> receive - {directive_response, Ref} -> + {ws_response, Ref} -> ok; - {directive_response, Ref, Response} -> + {ws_response, Ref, Response} -> {ok, Response} after Timeout -> {error, timeout} @@ -278,7 +279,7 @@ handle_event({call, From}, {publish_directive, ReceiverPid, Directive0}, ?STATE_ lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), Directive = iot_cipher_aes:encrypt(AES, Directive0), %% 通过websocket发送请求 - Ref = ws_channel:publish_directive(ChannelPid, ReceiverPid, Directive), + Ref = ws_channel:publish(ChannelPid, ReceiverPid, <<16:8, Directive/binary>>), {keep_state, State, [{reply, From, {ok, Ref}}]}; @@ -294,7 +295,7 @@ handle_event({call, From}, {send_directive, Directive0}, ?STATE_ACTIVATED, lager:debug("[iot_host] host: ~p, will publish_directive: ~p", [UUID, Directive0]), Directive = iot_cipher_aes:encrypt(AES, Directive0), %% 通过websocket发送请求 - ws_channel:send_directive(ChannelPid, Directive), + ws_channel:send(ChannelPid, <<16:8, Directive/binary>>), {keep_state, State, [{reply, From, ok}]};