diff --git a/apps/iot/include/iot.hrl b/apps/iot/include/iot.hrl index 4d6264e..602cc29 100644 --- a/apps/iot/include/iot.hrl +++ b/apps/iot/include/iot.hrl @@ -57,6 +57,9 @@ %% ai相关的事件 -define(EVENT_AI, 16#03). +%% 指令相关 +-define(DIRECTIVE_ZD_CTRL, 16#01). + %% 缓存数据库表 -record(kv, { key :: binary(), diff --git a/apps/iot/src/consumer/iot_zd_consumer.erl b/apps/iot/src/consumer/iot_zd_consumer.erl index eae3ad5..40ce2a7 100644 --- a/apps/iot/src/consumer/iot_zd_consumer.erl +++ b/apps/iot/src/consumer/iot_zd_consumer.erl @@ -22,6 +22,9 @@ -define(SERVER, ?MODULE). -define(RETRY_INTERVAL, 5000). +%% 执行超时时间 +-define(EXECUTE_TIMEOUT, 10 * 1000). + %% 需要订阅的主题信息 -define(Topics,[ {<<"CET/NX/upload">>, 2} @@ -98,8 +101,27 @@ handle_info({disconnect, ReasonCode, Properties}, State) -> %% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行 handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) -> lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]), - %% 将消息分发到对应的host进程去处理 + case catch jiffy:decode(Payload, [return_maps]) of + #{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams} -> + %% 通过LocationCode查找到主机和Device_uuid + case redis_client:hgetall(LocationCode) of + {ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} -> + case iot_host:get_pid(HostUUID) of + undefined -> + lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]); + Pid -> + ReceiverPid = self(), + spawn(fun() -> + DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT), + ReceiverPid ! {directive_reply, DirectiveResult} + end) + end; + _ -> + lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode]) + end + end, {noreply, State}; + handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) -> lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]), {noreply, State}; @@ -121,6 +143,10 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) -> {noreply, State#state{conn_pid = undefined}}; +handle_info({directive_reply, Reply}, State = #state{}) -> + lager:debug("[iot_zd_consumer] get directive_reply: ~p", [Reply]), + {noreply, State}; + handle_info(Info, State = #state{}) -> lager:debug("[iot_zd_consumer] get info: ~p", [Info]), {noreply, State}. diff --git a/apps/iot/src/redis/redis_client.erl b/apps/iot/src/redis/redis_client.erl index bf0d341..ea29396 100755 --- a/apps/iot/src/redis/redis_client.erl +++ b/apps/iot/src/redis/redis_client.erl @@ -10,7 +10,7 @@ -author("aresei"). %% API --export([hget/2]). +-export([hget/2, hgetall/1]). %%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%% %% HashTable处理 @@ -18,4 +18,25 @@ -spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}. hget(Key, Field) when is_binary(Key), is_binary(Field) -> - poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end). \ No newline at end of file + poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end). + +-spec hgetall(Key :: binary()) -> {ok, Fields :: map()} | {error, Reason :: binary()}. +hgetall(Key) when is_binary(Key) -> + poolboy:transaction(redis_pool, fun(Conn) -> + case eredis:q(Conn, ["HGETALL", Key]) of + {ok, Items} -> + {ok, to_map(Items)}; + Error -> + Error + end + end). + + +to_map(Items) when is_list(Items), length(Items) rem 2 == 0 -> + to_map(Items, #{}). +to_map([], Target) -> + Target; +to_map([K, V|Tail], Target) -> + to_map(Tail, Target#{K => V}). + +