fix alarm
This commit is contained in:
parent
50fe31d374
commit
a836b16f15
@ -62,9 +62,11 @@
|
||||
retry_timeout :: integer(),
|
||||
|
||||
%% 数据定义
|
||||
metrics = #{} :: map(),
|
||||
|
||||
controls = #{} :: map()
|
||||
metrics = [] :: list(),
|
||||
%% 控制
|
||||
controls = [] :: list(),
|
||||
% 告警信息
|
||||
alarms = [] :: list()
|
||||
}).
|
||||
|
||||
-record(modbus_metric, {
|
||||
@ -82,38 +84,21 @@
|
||||
precision
|
||||
}).
|
||||
|
||||
-record(modbus_processor, {
|
||||
name :: binary(),
|
||||
input :: binary(),
|
||||
transform :: any(),
|
||||
|
||||
output :: []
|
||||
}).
|
||||
|
||||
-record(modbus_alarm, {
|
||||
name :: binary(),
|
||||
condition :: binary(),
|
||||
|
||||
%% 持续判定
|
||||
hold_time :: binary(),
|
||||
|
||||
%% 动作
|
||||
actions :: [],
|
||||
|
||||
%% 恢复动作
|
||||
recovery_actions = []
|
||||
hold_time :: binary()
|
||||
}).
|
||||
|
||||
-record(modbus_var, {
|
||||
address :: integer(),
|
||||
type :: atom(),
|
||||
scale = 1.0 :: float(),
|
||||
unit :: binary() | undefined,
|
||||
poll = true :: boolean()
|
||||
unit :: binary() | undefined
|
||||
}).
|
||||
|
||||
-record(ast, {
|
||||
modbus,
|
||||
devices = [],
|
||||
alarms = []
|
||||
devices = []
|
||||
}).
|
||||
@ -21,8 +21,6 @@
|
||||
-record(state, {
|
||||
parent_pid :: pid(),
|
||||
device :: #modbus_device{},
|
||||
%% 重新建立 #{Name => Metric}
|
||||
metrics_map = #{},
|
||||
%% #{Ref => MetricName}
|
||||
inflight = #{}
|
||||
}).
|
||||
@ -46,21 +44,19 @@ start_link(ParentPid, Device = #modbus_device{}) when is_pid(ParentPid) ->
|
||||
-spec(init(Args :: term()) ->
|
||||
{ok, State :: #state{}} | {ok, State :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term()} | ignore).
|
||||
init([ParentPid, Device = #modbus_device{metrics = Metrics0, poll_interval = PollInterval}]) ->
|
||||
init([ParentPid, Device = #modbus_device{metrics = Metrics, poll_interval = PollInterval}]) ->
|
||||
%% 处理采集项目
|
||||
MetricsMap = maps:from_list(lists:map(fun(Metric0 = #modbus_metric{name = Name}) -> {Name, Metric0} end, Metrics0)),
|
||||
lager:debug("[modbus_device] device metrics is: ~p", [MetricsMap]),
|
||||
%% 初步启动的过程中按照step提交任务
|
||||
Len = length(Metrics0),
|
||||
Len = length(Metrics),
|
||||
case Len > 0 of
|
||||
true ->
|
||||
Step = erlang:max(1, PollInterval div Len),
|
||||
lager:debug("[modbus_deivce] step is: ~p", [Step]),
|
||||
start_ticker(maps:keys(MetricsMap), Step);
|
||||
start_ticker(Metrics, Step);
|
||||
false ->
|
||||
ok
|
||||
end,
|
||||
{ok, #state{parent_pid = ParentPid, metrics_map = MetricsMap, device = Device}}.
|
||||
{ok, #state{parent_pid = ParentPid, device = Device}}.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling call messages
|
||||
@ -81,8 +77,20 @@ handle_call(_Request, _From, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, State}.
|
||||
handle_cast({control, ControlName, Val0}, State = #state{parent_pid = ParentPid, device = #modbus_device{slave_id = SlaveId, controls = Controls}}) ->
|
||||
case lists:search(fun(#modbus_control{name = Name0}) -> Name0 =:= ControlName end, Controls) of
|
||||
{value, #modbus_control{address = Address, type = Type, precision = Precision}} ->
|
||||
%% 读取采集项目
|
||||
ReceiverPid = self(),
|
||||
Ref = make_ref(),
|
||||
Cnt = get_register_count(Type),
|
||||
Val = Val0 * Precision,
|
||||
ParentPid ! {control_device, ReceiverPid, Ref, SlaveId, Address, Val, Cnt},
|
||||
|
||||
{noreply, State};
|
||||
false ->
|
||||
{noreply, State}
|
||||
end.
|
||||
|
||||
%% @private
|
||||
%% @doc Handling all non call/cast messages
|
||||
@ -90,27 +98,28 @@ handle_cast(_Request, State = #state{}) ->
|
||||
{noreply, NewState :: #state{}} |
|
||||
{noreply, NewState :: #state{}, timeout() | hibernate} |
|
||||
{stop, Reason :: term(), NewState :: #state{}}).
|
||||
handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight, metrics_map = MetricsMap,
|
||||
device = #modbus_device{slave_id = SlaveId, poll_interval = PollInterval}}) ->
|
||||
handle_info({timeout, _, {poll_ticker, Name}}, State = #state{parent_pid = ParentPid, inflight = Inflight,
|
||||
device = #modbus_device{slave_id = SlaveId, metrics = Metrics, poll_interval = PollInterval}}) ->
|
||||
|
||||
%% 开启下一次的循环
|
||||
poll_ticker(PollInterval, Name),
|
||||
#modbus_metric{address = Address, type = Type} = get_metric(Name, Metrics),
|
||||
|
||||
#modbus_metric{address = Address, type = Type} = maps:get(Name, MetricsMap),
|
||||
%% 读取采集项目
|
||||
ReceiverPid = self(),
|
||||
Ref = make_ref(),
|
||||
Cnt = get_register_count(Type),
|
||||
ParentPid ! {request, ReceiverPid, Ref, SlaveId, Address, Cnt},
|
||||
ParentPid ! {request_device, ReceiverPid, Ref, SlaveId, Address, Cnt},
|
||||
|
||||
{noreply, State#state{inflight = maps:put(Ref, Name, Inflight)}};
|
||||
|
||||
handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName}, inflight = Inflight, metrics_map = MetricsMap}) ->
|
||||
handle_info({request_reply, Ref, Val}, State = #state{parent_pid = ParentPid, device = #modbus_device{name = DeviceName, metrics = Metrics}, inflight = Inflight}) ->
|
||||
case maps:take(Ref, Inflight) of
|
||||
error ->
|
||||
{noreply, State};
|
||||
{MetricName, NInflight} ->
|
||||
#modbus_metric{name = MetricName, unit = Uint, type = Type, scale = Scale} = maps:get(MetricName, MetricsMap),
|
||||
#modbus_metric{name = MetricName, unit = Uint, type = Type, scale = Scale} = get_metric(MetricName, Metrics),
|
||||
|
||||
lager:debug("[modbus_device] metric_name: ~p, get value: ~p", [MetricName, Val]),
|
||||
|
||||
RealVal = parse_value(Val, Type, Scale),
|
||||
@ -154,7 +163,7 @@ code_change(_OldVsn, State = #state{}, _Extra) ->
|
||||
-spec start_ticker(list(), Step :: integer()) -> no_return().
|
||||
start_ticker([], _) ->
|
||||
ok;
|
||||
start_ticker([Name|T], Step) ->
|
||||
start_ticker([#modbus_metric{name = Name}|T], Step) ->
|
||||
poll_ticker(Step, Name),
|
||||
start_ticker(T, Step + Step).
|
||||
|
||||
@ -194,4 +203,9 @@ parse_value(<<Val:32/unsigned-integer>>, <<"uint32">>, Scale) ->
|
||||
parse_value(<<Val:32/big-float>>, <<"float32">>, Scale) ->
|
||||
Val * Scale;
|
||||
parse_value(<<Val:64/big-float>>, <<"float32">>, Scale) ->
|
||||
Val * Scale.
|
||||
Val * Scale.
|
||||
|
||||
-spec get_metric(Name :: binary(), Metrics :: [#modbus_metric{}]) -> Metric :: #modbus_metric{}.
|
||||
get_metric(Name, Metrics) when is_list(Metrics) ->
|
||||
{value, Metric} = lists:search(fun(#modbus_metric{name = Name0}) -> Name0 =:= Name end, Metrics),
|
||||
Metric.
|
||||
@ -13,7 +13,7 @@
|
||||
-behaviour(gen_statem).
|
||||
|
||||
%% API
|
||||
-export([start_link/1]).
|
||||
-export([start_link/2]).
|
||||
-export([test/0, frame_delay/4]).
|
||||
|
||||
%% gen_statem callbacks
|
||||
@ -21,6 +21,7 @@
|
||||
|
||||
%% rtu指令
|
||||
-define(MODBUS_READ, 16#01).
|
||||
-define(MODBUS_CONTROL, 16#02).
|
||||
|
||||
%% 当前的状态
|
||||
-define(DISCONNECTED, disconnected).
|
||||
@ -38,6 +39,9 @@
|
||||
}).
|
||||
|
||||
-record(state, {
|
||||
%% 拥有者的pid
|
||||
owner_pid :: pid(),
|
||||
|
||||
ast :: #ast{},
|
||||
|
||||
%% 连接模式
|
||||
@ -66,13 +70,21 @@ test() ->
|
||||
{ok, Config} = file:read_file("/usr/local/code/cloudkit/modbus/modbus.conf"),
|
||||
%lager:debug("config is: ~ts", [Config]),
|
||||
{ok, AST} = modbus_parser:parse(Config),
|
||||
start_link(AST).
|
||||
|
||||
Pid = spawn(fun() ->
|
||||
receive
|
||||
Info ->
|
||||
lager:debug("[onwer] recv: ~p", [Info])
|
||||
end
|
||||
end),
|
||||
|
||||
start_link(Pid, AST).
|
||||
|
||||
%% @doc Creates a gen_statem process which calls Module:init/1 to
|
||||
%% initialize. To ensure a synchronized start-up procedure, this
|
||||
%% function does not return until Module:init/1 has returned.
|
||||
start_link(AST = #ast{}) ->
|
||||
gen_statem:start_link(?MODULE, [AST], []).
|
||||
start_link(OwnerPid, AST = #ast{}) when is_pid(OwnerPid) ->
|
||||
gen_statem:start_link(?MODULE, [OwnerPid, AST], []).
|
||||
|
||||
%%%===================================================================
|
||||
%%% gen_statem callbacks
|
||||
@ -82,7 +94,7 @@ start_link(AST = #ast{}) ->
|
||||
%% @doc Whenever a gen_statem is started using gen_statem:start/[3,4] or
|
||||
%% gen_statem:start_link/[3,4], this function is called by the new
|
||||
%% process to initialize.
|
||||
init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = ErrorLog, access_log = AccessLog}, devices = Devices, alarms = Alarms}]) ->
|
||||
init([OwnerPid, AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = ErrorLog, access_log = AccessLog}, devices = Devices, alarms = Alarms}]) ->
|
||||
%lager:debug("[modbus_service] ast is: ~p", [AST]),
|
||||
lager:debug("[modbus_service] devices is: ~p", [Devices]),
|
||||
|
||||
@ -93,11 +105,11 @@ init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = Er
|
||||
case ConnectResult of
|
||||
{rtu, Port, DelayMs} ->
|
||||
|
||||
{ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
{ok, ?CONNECTED, #state{owner_pid = OwnerPid, ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
mode = #rtu_mode{port = Port, delay_ms = DelayMs},
|
||||
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}};
|
||||
{tcp, Socket} ->
|
||||
{ok, ?CONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
{ok, ?CONNECTED, #state{owner_pid = OwnerPid, ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
mode = #tcp_mode{socket = Socket},
|
||||
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}}
|
||||
end;
|
||||
@ -105,7 +117,7 @@ init([AST = #ast{modbus = Modbus = #modbus{transport = Transport, error_log = Er
|
||||
lager:warning("[modbus_serivce] connect get error: ~p", [Reason]),
|
||||
%% 5秒后重试
|
||||
retry_connect(),
|
||||
{ok, ?DISCONNECTED, #state{ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
{ok, ?DISCONNECTED, #state{owner_pid = OwnerPid, ast = AST, access_log_pid = create_log_file(AccessLog), error_log_pid = create_log_file(ErrorLog),
|
||||
queue = queue:new(), packet_id = 1, devices_map = DevicesMap, alarm_pids = []}}
|
||||
end.
|
||||
|
||||
@ -121,11 +133,22 @@ callback_mode() ->
|
||||
%% process message, this function is called.
|
||||
|
||||
%% 请求的时候必须先加入队列,modbus需要控制请求的频率
|
||||
handle_event(info, {request, ReceiverPid, Ref, SlaveId, Address, Cnt}, StateName, State = #state{mode = #rtu_mode{}, packet_id = PacketId, queue = Q}) ->
|
||||
handle_event(info, {request_device, ReceiverPid, Ref, SlaveId, Address, Cnt}, StateName, State = #state{mode = #rtu_mode{}, packet_id = PacketId, queue = Q}) ->
|
||||
case StateName of
|
||||
?CONNECTED ->
|
||||
%% 基于队列处理, modbus不是全双工的模式
|
||||
NQ = queue:in({PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt}, Q),
|
||||
NQ = queue:in({request, PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt}, Q),
|
||||
{keep_state, State#state{queue = NQ, packet_id = PacketId + 1}, [{next_event, info, read_next}]};
|
||||
_ ->
|
||||
{keep_state, State}
|
||||
end;
|
||||
|
||||
%% 请求的时候必须先加入队列,modbus需要控制请求的频率
|
||||
handle_event(info, {control_device, ReceiverPid, Ref, SlaveId, Address, Val, Cnt}, StateName, State = #state{mode = #rtu_mode{}, packet_id = PacketId, queue = Q}) ->
|
||||
case StateName of
|
||||
?CONNECTED ->
|
||||
%% 基于队列处理, modbus不是全双工的模式
|
||||
NQ = queue:in({control, PacketId, ReceiverPid, Ref, SlaveId, Address, Val, Cnt}, Q),
|
||||
{keep_state, State#state{queue = NQ, packet_id = PacketId + 1}, [{next_event, info, read_next}]};
|
||||
_ ->
|
||||
{keep_state, State}
|
||||
@ -136,12 +159,18 @@ handle_event(info, read_next, ?CONNECTED, State = #state{mode = #rtu_mode{is_bus
|
||||
{keep_state, State};
|
||||
handle_event(info, read_next, ?CONNECTED, State = #state{mode = Mode = #rtu_mode{port = Port}, queue = Q, inflight = Inflight}) ->
|
||||
lager:debug("[modbus_service] read next, q: ~p, port is: ~p", [queue:to_list(Q), Port]),
|
||||
|
||||
case queue:out(Q) of
|
||||
{{value, {PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt}}, Q2} ->
|
||||
ReadCmd = <<?MODBUS_READ:8, PacketId:32, SlaveId:8, Address:16, Cnt:16>>,
|
||||
Port ! {self(), {command, ReadCmd}},
|
||||
{keep_state, State#state{queue = Q2, mode = Mode#rtu_mode{is_busy = true}, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
||||
{{value, Item}, Q2} ->
|
||||
case Item of
|
||||
{request, PacketId, ReceiverPid, Ref, SlaveId, Address, Cnt} ->
|
||||
ReadCmd = <<?MODBUS_READ:8, PacketId:32, SlaveId:8, Address:16, Cnt:16>>,
|
||||
Port ! {self(), {command, ReadCmd}},
|
||||
{keep_state, State#state{queue = Q2, mode = Mode#rtu_mode{is_busy = true}, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}};
|
||||
{control, PacketId, ReceiverPid, Ref, SlaveId, Address, Val, Cnt} ->
|
||||
ReadCmd = <<?MODBUS_CONTROL:8, PacketId:32, SlaveId:8, Address:16, Val, Cnt:16>>,
|
||||
Port ! {self(), {command, ReadCmd}},
|
||||
{keep_state, State#state{queue = Q2, mode = Mode#rtu_mode{is_busy = true}, inflight = maps:put(PacketId, {ReceiverPid, Ref}, Inflight)}}
|
||||
end;
|
||||
{empty, Q1} ->
|
||||
{keep_state, State#state{queue = Q1}}
|
||||
end;
|
||||
@ -172,9 +201,15 @@ handle_event(info, {timeout, _, delay_locking}, ?CONNECTED, State = #state{mode
|
||||
lager:debug("[modbus_service] delay unlock trigger next"),
|
||||
{keep_state, State#state{mode = Mode#rtu_mode{is_busy = false}}, [{next_event, info, read_next}]};
|
||||
|
||||
%% todo 数据上报
|
||||
handle_event(info, {device_report, DeviceName, Name, Value, Uint}, ?CONNECTED, State) ->
|
||||
lager:debug("[modbus_service] get report_data device_name: ~p, name: ~p, value: ~p, uint: ~p", [DeviceName, Name, Value, Uint]),
|
||||
%% 数据上报
|
||||
handle_event(info, {device_report, DeviceName, MetricName, Value, Uint}, ?CONNECTED, State = #state{owner_pid = OwnerPid}) ->
|
||||
lager:debug("[modbus_service] get report_data device_name: ~p, name: ~p, value: ~p, uint: ~p", [DeviceName, MetricName, Value, Uint]),
|
||||
OwnerPid ! {metric_data, DeviceName, MetricName, Value, Uint},
|
||||
{keep_state, State};
|
||||
|
||||
handle_event(info, {device_alarm, AlarmName, HoldTime, Val}, ?CONNECTED, State = #state{owner_pid = OwnerPid}) ->
|
||||
lager:debug("[modbus_service] get alarm device_name: ~p, name: ~p, value: ~p, uint: ~p", [DeviceName, MetricName, Value, Uint]),
|
||||
% OwnerPid ! {metric_data, DeviceName, MetricName, Value, Uint},
|
||||
{keep_state, State};
|
||||
|
||||
%% 广播消息
|
||||
|
||||
31
modbus.conf
31
modbus.conf
@ -66,6 +66,17 @@ device_io t1 {
|
||||
}
|
||||
}
|
||||
|
||||
## 告警信息
|
||||
alarms {
|
||||
high_temperature {
|
||||
# 触发条件
|
||||
condition $temperature > 90.0;
|
||||
|
||||
# 持续判定
|
||||
hold_time 30s;
|
||||
}
|
||||
}
|
||||
|
||||
}
|
||||
|
||||
device boiler_controller {
|
||||
@ -126,18 +137,6 @@ device xyz {
|
||||
scale 0.01;
|
||||
unit "kPa";
|
||||
}
|
||||
|
||||
# 状态位(线圈)
|
||||
#alarm_status {
|
||||
# address 00001;
|
||||
# type bool;
|
||||
# bits {
|
||||
# 0 "overheat";
|
||||
# 1 "low_pressure";
|
||||
# 2 "pump_failure";
|
||||
# }
|
||||
# poll on;
|
||||
#}
|
||||
}
|
||||
|
||||
# 写入控制
|
||||
@ -158,12 +157,4 @@ device xyz {
|
||||
precision 0.1;
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
alarm high_temperature {
|
||||
# 触发条件
|
||||
condition $boiler_controller.temperature > 90.0;
|
||||
|
||||
# 持续判定
|
||||
hold_time 30s;
|
||||
}
|
||||
Loading…
x
Reference in New Issue
Block a user