first commit

This commit is contained in:
gqc 2025-12-10 14:24:36 +08:00
parent 07f6d8036f
commit e83a8eef26
10 changed files with 19 additions and 48 deletions

View File

@ -6,8 +6,6 @@ import (
"os" "os"
) )
// SensorType 传感器类型
// SensorConfig 传感器配置 // SensorConfig 传感器配置
type SensorConfig struct { type SensorConfig struct {
HostUUID string `json:"host_uuid"` HostUUID string `json:"host_uuid"`

View File

@ -35,7 +35,6 @@ func NewDataCollector(config *ModbusConfig) *DataCollector {
// ReadSensorData 读取单个传感器数据使用新的Modbus客户端 // ReadSensorData 读取单个传感器数据使用新的Modbus客户端
func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, error) { func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, error) {
// 确保通信间隔
dc.ensureCommunicationInterval() dc.ensureCommunicationInterval()
// 获取寄存器配置(只包含地址和数量) // 获取寄存器配置(只包含地址和数量)
@ -44,7 +43,6 @@ func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData,
return nil, fmt.Errorf("Failed to get register configuration: %v", err) return nil, fmt.Errorf("Failed to get register configuration: %v", err)
} }
// 关键:直接把 facility 当作 Modbus 从站地址SlaveID
slaveID, err := strconv.Atoi(sensorConfig.Facility) slaveID, err := strconv.Atoi(sensorConfig.Facility)
if err != nil || slaveID < 1 || slaveID > 247 { if err != nil || slaveID < 1 || slaveID > 247 {
return nil, fmt.Errorf("Invalid facility value, cannot be used as SlaveID: %s (DeviceUUID: %s)", return nil, fmt.Errorf("Invalid facility value, cannot be used as SlaveID: %s (DeviceUUID: %s)",
@ -52,19 +50,16 @@ func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData,
} }
slaveIDByte := byte(slaveID) slaveIDByte := byte(slaveID)
// 日志:用真实的 slaveID来自 facility
logger.Logger.Printf("Reading sensor data - Device: %s, SlaveID: %d (from facility=%s), Addr: %d, Count: %d", logger.Logger.Printf("Reading sensor data - Device: %s, SlaveID: %d (from facility=%s), Addr: %d, Count: %d",
sensorConfig.DeviceUUID, slaveID, sensorConfig.Facility, sensorConfig.DeviceUUID, slaveID, sensorConfig.Facility,
registerConfig.RegisterAddress, registerConfig.RegisterCount) registerConfig.RegisterAddress, registerConfig.RegisterCount)
// 使用 facility 作为 SlaveID 创建客户端
client, err := NewDebugModbusClient(dc.modbusConfig, slaveIDByte) client, err := NewDebugModbusClient(dc.modbusConfig, slaveIDByte)
if err != nil { if err != nil {
return nil, fmt.Errorf("Failed to create Modbus client: %v", err) return nil, fmt.Errorf("Failed to create Modbus client: %v", err)
} }
defer client.Close() defer client.Close()
// 读取寄存器数据
rawData, err := client.ReadHoldingRegisters( rawData, err := client.ReadHoldingRegisters(
registerConfig.RegisterAddress, registerConfig.RegisterAddress,
registerConfig.RegisterCount, registerConfig.RegisterCount,
@ -95,7 +90,7 @@ func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData,
return sensorData, nil return sensorData, nil
} }
// ensureCommunicationInterval 确保通信间隔 // 确保通信间隔
func (dc *DataCollector) ensureCommunicationInterval() { func (dc *DataCollector) ensureCommunicationInterval() {
elapsed := time.Since(dc.lastReadTime) elapsed := time.Since(dc.lastReadTime)
if elapsed < dc.modbusConfig.SensorInterval { if elapsed < dc.modbusConfig.SensorInterval {
@ -105,7 +100,7 @@ func (dc *DataCollector) ensureCommunicationInterval() {
} }
} }
// BatchReadSensorData 批量读取所有传感器数据 // 批量读取所有传感器数据
func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*SensorData, []error) { func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*SensorData, []error) {
var results []*SensorData var results []*SensorData
var errors []error var errors []error
@ -119,7 +114,6 @@ func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*S
if err != nil { if err != nil {
errors = append(errors, err) errors = append(errors, err)
// 如果连续错误,增加等待时间
if len(errors) > 2 { if len(errors) > 2 {
logger.Logger.Printf("Detected consecutive communication errors, increasing bus stabilization time") logger.Logger.Printf("Detected consecutive communication errors, increasing bus stabilization time")
time.Sleep(dc.modbusConfig.BusResetDelay * 2) time.Sleep(dc.modbusConfig.BusResetDelay * 2)
@ -133,7 +127,7 @@ func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*S
return results, errors return results, errors
} }
// groupSensorsByType 按传感器类型分组 // 按传感器类型分组
func groupSensorsByType(configs []SensorConfig) map[SensorType][]SensorConfig { func groupSensorsByType(configs []SensorConfig) map[SensorType][]SensorConfig {
groups := make(map[SensorType][]SensorConfig) groups := make(map[SensorType][]SensorConfig)

View File

@ -47,13 +47,12 @@ func (dp *DataProcessor) ProcessSensorData(sensorData *SensorData) (*ProcessedSe
switch sensorData.SensorType { switch sensorData.SensorType {
case SensorTypeTemperatureHumidity: case SensorTypeTemperatureHumidity:
// 温湿度传感器2个寄存器分别表示温度和湿度
if len(sensorData.RawData) >= 4 { if len(sensorData.RawData) >= 4 {
humidity := float64(binary.BigEndian.Uint16(sensorData.RawData[0:2])) humidity := float64(binary.BigEndian.Uint16(sensorData.RawData[0:2]))
temperature := float64(binary.BigEndian.Uint16(sensorData.RawData[2:4])) temperature := float64(binary.BigEndian.Uint16(sensorData.RawData[2:4]))
// 转换为实际值(根据传感器规格调整转换公式)
temperature = temperature / 10.0 temperature = temperature / 10.0
humidity = humidity / 10.0 humidity = humidity / 10.0

View File

@ -10,7 +10,7 @@ type ReportData struct {
type ReportParams struct { type ReportParams struct {
RouteKey string `json:"route_key"` RouteKey string `json:"route_key"`
Metric interface{} `json:"metric"` // 使用interface{}来支持不同类型 Metric interface{} `json:"metric"`
} }
// ReportMetricData 数据上报指标 // ReportMetricData 数据上报指标

View File

@ -130,7 +130,6 @@ func (fd *FaultDetector) CollectAndClearPendingEvents() (alarms []EventData, onl
timestamp := strconv.FormatInt(current.Unix(), 10) timestamp := strconv.FormatInt(current.Unix(), 10)
for deviceUUID, status := range fd.deviceStatuses { for deviceUUID, status := range fd.deviceStatuses {
// 检查是否有需要上报的离线恢复(原来逻辑是实时上报的,这里改为攒着批量发)
if !status.IsOnline && status.ErrorCount >= 3 && status.LastReportTime.Before(current.Add(-5*time.Second)) { if !status.IsOnline && status.ErrorCount >= 3 && status.LastReportTime.Before(current.Add(-5*time.Second)) {
// 防止重复发,简单用时间判断 // 防止重复发,简单用时间判断
alarms = append(alarms, EventData{ alarms = append(alarms, EventData{
@ -140,21 +139,21 @@ func (fd *FaultDetector) CollectAndClearPendingEvents() (alarms []EventData, onl
DeviceUUID: deviceUUID, DeviceUUID: deviceUUID,
Level: "alarm", Level: "alarm",
LastSuccessfulTime: strconv.FormatInt(status.LastSuccessTime.Unix(), 10), LastSuccessfulTime: strconv.FormatInt(status.LastSuccessTime.Unix(), 10),
SensorType: "未知", // 后面你采集时会带 sensorType这里先占位 SensorType: "未知",
Recommendation: "Check device connection and communication line", Recommendation: "Check device connection and communication line",
}) })
status.LastReportTime = current // 标记已处理 status.LastReportTime = current // 标记已处理
} }
if status.IsOnline && status.LastSuccessTime.After(status.LastReportTime) { if status.IsOnline && status.LastSuccessTime.After(status.LastReportTime) {
// 如果最近一次成功采集时间 > 上次上报时间,说明刚刚恢复,需要发 online
onlines = append(onlines, EventData{ onlines = append(onlines, EventData{
Description: "Sensor online", Description: "Sensor online",
Timestamp: timestamp, Timestamp: timestamp,
HostUUID: fd.hostUUID, HostUUID: fd.hostUUID,
DeviceUUID: deviceUUID, DeviceUUID: deviceUUID,
Level: "online", Level: "online",
SensorType: "未知", // 同上 SensorType: "未知",
}) })
status.LastReportTime = current status.LastReportTime = current
} }

View File

@ -16,7 +16,6 @@ func TriggerBuzzer(seconds int) {
seconds = 10 seconds = 10
} }
// 必须用同步!异步 goroutine 在某些系统里会被静默丢弃
fmt.Printf("蜂鸣器开始响铃 %d 秒...\n", seconds) fmt.Printf("蜂鸣器开始响铃 %d 秒...\n", seconds)
gpioPath := fmt.Sprintf("/sys/class/gpio/gpio%d", BuzzerGPIO) gpioPath := fmt.Sprintf("/sys/class/gpio/gpio%d", BuzzerGPIO)

View File

@ -49,7 +49,7 @@ func NewBusinessManager(sensorConfigs []SensorConfig) (*BusinessManager, error)
reportManager: reportManager, reportManager: reportManager,
sensorConfigs: sensorConfigs, sensorConfigs: sensorConfigs,
hostUUID: hostUUID, hostUUID: hostUUID,
isFirstCollect: true, // 初始化第一次采集标志 isFirstCollect: true,
}, nil }, nil
} }
@ -112,7 +112,6 @@ func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, re
// 创建错误映射表 // 创建错误映射表
errorMap := make(map[string]string) errorMap := make(map[string]string)
for _, err := range readErrors { for _, err := range readErrors {
// 从错误信息中提取deviceUUID需要根据实际错误格式调整
errorMap["unknown"] = err.Error() errorMap["unknown"] = err.Error()
} }
@ -154,9 +153,9 @@ func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, re
} }
// reportFirstCollectionStatus 上报第一次采集的系统状态 // reportFirstCollectionStatus 上报第一次采集的系统状态
// reportFirstCollectionStatus 上报第一次采集的系统状态(新协议:批量发送所有在线设备的 online 事件)
func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []error, processedDataList []*ProcessedSensorData) { func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []error, processedDataList []*ProcessedSensorData) {
// 只使用 processedDataList 就够了,另外两个参数我们不需要但必须保留(调用方传了),用 _ 忽略
successCount := len(processedDataList) successCount := len(processedDataList)
totalCount := len(bm.sensorConfigs) totalCount := len(bm.sensorConfigs)
@ -201,12 +200,11 @@ func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []erro
metric := map[string]interface{}{ metric := map[string]interface{}{
"type": "event", "type": "event",
"args": events, // 批量数组! "args": events,
} }
routeKey := fmt.Sprintf("/dthjjc/device/%s/event", bm.hostUUID) routeKey := fmt.Sprintf("/dthjjc/device/%s/event", bm.hostUUID)
// 打印完整 JSON 方便你看
fullMsg := map[string]interface{}{ fullMsg := map[string]interface{}{
"method": "metric_data", "method": "metric_data",
"params": map[string]interface{}{ "params": map[string]interface{}{

View File

@ -53,11 +53,11 @@ func DefaultModbusConfig() *ModbusConfig {
DataBits: 8, DataBits: 8,
StopBits: 1, StopBits: 1,
Parity: "N", Parity: "N",
Timeout: 3 * time.Second, // 缩短超时时间 Timeout: 3 * time.Second,
RetryCount: 2, // 减少重试次数 RetryCount: 2,
RetryDelay: 100 * time.Millisecond, RetryDelay: 100 * time.Millisecond,
SensorInterval: 200 * time.Millisecond, // 传感器间读取间隔 SensorInterval: 200 * time.Millisecond,
BusResetDelay: 50 * time.Millisecond, // 总线稳定时间 BusResetDelay: 50 * time.Millisecond,
} }
} }

View File

@ -43,13 +43,11 @@ func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData,
deviceDataList = append(deviceDataList, deviceData) deviceDataList = append(deviceDataList, deviceData)
} }
// 构建数据上报的metric内容
metricData := map[string]interface{}{ metricData := map[string]interface{}{
"type": "data", "type": "data",
"args": deviceDataList, "args": deviceDataList,
} }
// 外层route_key用于WebSocket路由
outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID) outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID)
// 记录要发送的详细内容 // 记录要发送的详细内容
@ -104,15 +102,11 @@ func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData,
// ReportEventData 上报事件数据 // ReportEventData 上报事件数据
func (rm *ReportManager) ReportEventData(event *EventData) error { func (rm *ReportManager) ReportEventData(event *EventData) error {
// 新协议:我们不再单个发,而是偷偷攒进一个全局批量队列,统一用新格式发
// 但为了不影响其他调用处,我们保留这个函数,只是内部实现变了
// 特殊处理:如果 event.Level == "system_start",直接忽略(取消这个事件)
if event.Level == "system_start" { if event.Level == "system_start" {
return nil return nil
} }
// 转换为新批量格式(复用 EventData 的字段)
item := map[string]interface{}{ item := map[string]interface{}{
"description": event.Description, "description": event.Description,
"timestamp": event.Timestamp, "timestamp": event.Timestamp,
@ -123,8 +117,7 @@ func (rm *ReportManager) ReportEventData(event *EventData) error {
if event.DeviceUUID != "" { if event.DeviceUUID != "" {
item["device_uuid"] = event.DeviceUUID // 修复字段名 item["device_uuid"] = event.DeviceUUID // 修复字段名
// 从全局 sensorConfigs 里找对应的 name你本来就传进来了 for _, cfg := range rm.sensorConfigs {
for _, cfg := range rm.sensorConfigs { // ← 这里我们马上加一个字段
if cfg.DeviceUUID == event.DeviceUUID { if cfg.DeviceUUID == event.DeviceUUID {
item["name"] = cfg.Name item["name"] = cfg.Name
break break
@ -141,13 +134,12 @@ func (rm *ReportManager) ReportEventData(event *EventData) error {
item["recommendation"] = event.Recommendation item["recommendation"] = event.Recommendation
} }
// 关键:复用原来的 metric_data 通道发送,但 args 是数组
var events []map[string]interface{} var events []map[string]interface{}
events = append(events, item) events = append(events, item)
metric := map[string]interface{}{ metric := map[string]interface{}{
"type": "event", "type": "event",
"args": events, // 关键这里是数组哪怕只有1条 "args": events,
} }
routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID) routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID)
@ -198,7 +190,7 @@ func (rm *ReportManager) convertToDeviceData(data *ProcessedSensorData, sensorCo
"temp": fmt.Sprintf("%.1f", data.Values[0]), "temp": fmt.Sprintf("%.1f", data.Values[0]),
"humidity": fmt.Sprintf("%.1f", data.Values[1]), "humidity": fmt.Sprintf("%.1f", data.Values[1]),
} }
deviceData.Value = valueMap // 直接赋值对象,不是数组 deviceData.Value = valueMap
} else { } else {
return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values) return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values)
} }

View File

@ -15,7 +15,6 @@ import (
) )
func main() { func main() {
// 解析命令行参数
debugMode := flag.Bool("debug", false, "Enable debug mode") debugMode := flag.Bool("debug", false, "Enable debug mode")
slaveID := flag.String("slave", "2", "Slave address (debug mode only)") slaveID := flag.String("slave", "2", "Slave address (debug mode only)")
sensorType := flag.String("type", "1", "Sensor type 1-Temperature/Humidity, 2-Water Leak, 3-Smoke (debug mode only)") sensorType := flag.String("type", "1", "Sensor type 1-Temperature/Humidity, 2-Water Leak, 3-Smoke (debug mode only)")
@ -68,7 +67,6 @@ func setupGracefulShutdown() {
sig := <-sigChan sig := <-sigChan
logger.Logger.Printf("Received signal: %v, starting shutdown", sig) logger.Logger.Printf("Received signal: %v, starting shutdown", sig)
// 关闭 WebSocket 连接
connect.Close() connect.Close()
logger.Logger.Printf("WebSocket connection closed") logger.Logger.Printf("WebSocket connection closed")
@ -81,7 +79,6 @@ func setupGracefulShutdown() {
func runNormalMode() { func runNormalMode() {
logger.Logger.Printf("=== Environmental Detection System Starting ===") logger.Logger.Printf("=== Environmental Detection System Starting ===")
// 原有的正常模式代码...
// 2. 初始化 WebSocket 连接 // 2. 初始化 WebSocket 连接
logger.Logger.Printf("Initializing WebSocket connection...") logger.Logger.Printf("Initializing WebSocket connection...")
if err := connect.InitWSChannel(); err != nil { if err := connect.InitWSChannel(); err != nil {
@ -233,9 +230,4 @@ func handleAlertMessage(params *connect.PublishParams) {
return return
} }
// 终极兜底:只要包含 type 和 1 就响
//if strings.Contains(raw, "type") && strings.Contains(raw, "1") {
// logger.Logger.Printf("暴力识别告警指令 → 强制响 3 秒")
// business.TriggerBuzzer(3)
//}
} }