fix channel
This commit is contained in:
parent
d1e69a31f0
commit
5f66983633
@ -136,7 +136,7 @@ websocket_handle({binary, <<?PACKET_PUBLISH_RESPONSE, PacketId:32, Body/binary>>
|
|||||||
error ->
|
error ->
|
||||||
lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]),
|
lager:warning("[ws_channel] get unknown publish response message: ~p, packet_id: ~p", [Body, PacketId]),
|
||||||
{ok, State};
|
{ok, State};
|
||||||
{{ReceiverPid, Ref}, NInflight} ->
|
{{ReceiverPid, Ref, _}, NInflight} ->
|
||||||
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
case is_pid(ReceiverPid) andalso is_process_alive(ReceiverPid) of
|
||||||
true when Body == <<>> ->
|
true when Body == <<>> ->
|
||||||
ReceiverPid ! {ws_response, Ref};
|
ReceiverPid ! {ws_response, Ref};
|
||||||
@ -164,7 +164,7 @@ websocket_info({publish, ReceiverPid, Ref, Msg, Timeout}, State = #state{packet_
|
|||||||
TTL = iot_util:timestamp() + Timeout,
|
TTL = iot_util:timestamp() + Timeout,
|
||||||
maps:put(PacketId, {ReceiverPid, Ref, TTL}, Inflight);
|
maps:put(PacketId, {ReceiverPid, Ref, TTL}, Inflight);
|
||||||
false ->
|
false ->
|
||||||
maps:put(PacketId, {ReceiverPid, Ref}, Inflight)
|
maps:put(PacketId, {ReceiverPid, Ref, 0}, Inflight)
|
||||||
end,
|
end,
|
||||||
{reply, {binary, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State#state{packet_id = PacketId + 1, inflight = NInflight}};
|
{reply, {binary, <<?PACKET_PUBLISH, PacketId:32, Msg/binary>>}, State#state{packet_id = PacketId + 1, inflight = NInflight}};
|
||||||
|
|
||||||
@ -177,13 +177,8 @@ websocket_info({timeout, _, clean_ticker}, State=#state{inflight = Inflight}) ->
|
|||||||
clean_ticker(),
|
clean_ticker(),
|
||||||
|
|
||||||
Timestamp = iot_util:timestamp(),
|
Timestamp = iot_util:timestamp(),
|
||||||
NInflight = maps:filter(fun(_, Promise) ->
|
NInflight = maps:filter(fun(_, {_ReceiverPid, _Ref, TTL}) ->
|
||||||
case Promise of
|
TTL == 0 orelse (TTL > 0 andalso TTL < Timestamp)
|
||||||
{_ReceiverPid, _Ref, TTL} ->
|
|
||||||
TTL < Timestamp;
|
|
||||||
{_ReceiverPid, _Ref} ->
|
|
||||||
true
|
|
||||||
end
|
|
||||||
end, Inflight),
|
end, Inflight),
|
||||||
|
|
||||||
{ok, State#state{inflight = NInflight}};
|
{ok, State#state{inflight = NInflight}};
|
||||||
|
|||||||
Loading…
x
Reference in New Issue
Block a user