fix
This commit is contained in:
parent
e7044534a5
commit
e7e75b51ea
@ -57,6 +57,9 @@
|
||||
%% ai相关的事件
|
||||
-define(EVENT_AI, 16#03).
|
||||
|
||||
%% 指令相关
|
||||
-define(DIRECTIVE_ZD_CTRL, 16#01).
|
||||
|
||||
%% 缓存数据库表
|
||||
-record(kv, {
|
||||
key :: binary(),
|
||||
|
||||
@ -22,6 +22,9 @@
|
||||
-define(SERVER, ?MODULE).
|
||||
-define(RETRY_INTERVAL, 5000).
|
||||
|
||||
%% 执行超时时间
|
||||
-define(EXECUTE_TIMEOUT, 10 * 1000).
|
||||
|
||||
%% 需要订阅的主题信息
|
||||
-define(Topics,[
|
||||
{<<"CET/NX/upload">>, 2}
|
||||
@ -98,8 +101,27 @@ handle_info({disconnect, ReasonCode, Properties}, State) ->
|
||||
%% 必须要做到消息的快速分发,数据的json反序列需要在host进程进行
|
||||
handle_info({publish, #{packet_id := _PacketId, payload := Payload, qos := Qos, topic := Topic}}, State) ->
|
||||
lager:debug("[iot_zd_consumer] Recv a topic: ~p, publish packet: ~ts, qos: ~p", [Topic, Payload, Qos]),
|
||||
%% 将消息分发到对应的host进程去处理
|
||||
case catch jiffy:decode(Payload, [return_maps]) of
|
||||
#{<<"version">> := Version, <<"location_code">> := LocationCode, <<"properties">> := DirectiveParams} ->
|
||||
%% 通过LocationCode查找到主机和Device_uuid
|
||||
case redis_client:hgetall(LocationCode) of
|
||||
{ok, #{<<"host_uuid">> := HostUUID, <<"device_uuid">> := DeviceUUID}} ->
|
||||
case iot_host:get_pid(HostUUID) of
|
||||
undefined ->
|
||||
lager:notice("[iot_zd_consumer] host uuid: ~p, not found", [HostUUID]);
|
||||
Pid ->
|
||||
ReceiverPid = self(),
|
||||
spawn(fun() ->
|
||||
DirectiveResult = iot_host:publish_directive(Pid, DeviceUUID, ?DIRECTIVE_ZD_CTRL, Version, DirectiveParams, ?EXECUTE_TIMEOUT),
|
||||
ReceiverPid ! {directive_reply, DirectiveResult}
|
||||
end)
|
||||
end;
|
||||
_ ->
|
||||
lager:notice("[iot_zd_consumer] location_code: ~p, not found in redis", [LocationCode])
|
||||
end
|
||||
end,
|
||||
{noreply, State};
|
||||
|
||||
handle_info({puback, Packet = #{packet_id := _PacketId}}, State = #state{}) ->
|
||||
lager:debug("[iot_zd_consumer] receive puback packet: ~p", [Packet]),
|
||||
{noreply, State};
|
||||
@ -121,6 +143,10 @@ handle_info({'EXIT', ConnPid, Reason}, State = #state{conn_pid = ConnPid}) ->
|
||||
|
||||
{noreply, State#state{conn_pid = undefined}};
|
||||
|
||||
handle_info({directive_reply, Reply}, State = #state{}) ->
|
||||
lager:debug("[iot_zd_consumer] get directive_reply: ~p", [Reply]),
|
||||
{noreply, State};
|
||||
|
||||
handle_info(Info, State = #state{}) ->
|
||||
lager:debug("[iot_zd_consumer] get info: ~p", [Info]),
|
||||
{noreply, State}.
|
||||
|
||||
@ -10,7 +10,7 @@
|
||||
-author("aresei").
|
||||
|
||||
%% API
|
||||
-export([hget/2]).
|
||||
-export([hget/2, hgetall/1]).
|
||||
|
||||
%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%%
|
||||
%% HashTable处理
|
||||
@ -19,3 +19,24 @@
|
||||
-spec hget(Key :: binary(), Field :: binary()) -> {ok, Val :: any()} | {error, Reason :: binary()}.
|
||||
hget(Key, Field) when is_binary(Key), is_binary(Field) ->
|
||||
poolboy:transaction(redis_pool, fun(Conn) -> eredis:q(Conn, ["HGET", Key, Field]) end).
|
||||
|
||||
-spec hgetall(Key :: binary()) -> {ok, Fields :: map()} | {error, Reason :: binary()}.
|
||||
hgetall(Key) when is_binary(Key) ->
|
||||
poolboy:transaction(redis_pool, fun(Conn) ->
|
||||
case eredis:q(Conn, ["HGETALL", Key]) of
|
||||
{ok, Items} ->
|
||||
{ok, to_map(Items)};
|
||||
Error ->
|
||||
Error
|
||||
end
|
||||
end).
|
||||
|
||||
|
||||
to_map(Items) when is_list(Items), length(Items) rem 2 == 0 ->
|
||||
to_map(Items, #{}).
|
||||
to_map([], Target) ->
|
||||
Target;
|
||||
to_map([K, V|Tail], Target) ->
|
||||
to_map(Tail, Target#{K => V}).
|
||||
|
||||
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user