diff --git a/business/config_parser.go b/business/config_parser.go index e20763d..36f5a8c 100644 --- a/business/config_parser.go +++ b/business/config_parser.go @@ -6,8 +6,6 @@ import ( "os" ) -// SensorType 传感器类型 - // SensorConfig 传感器配置 type SensorConfig struct { HostUUID string `json:"host_uuid"` diff --git a/business/data_collector.go b/business/data_collector.go index 7472a13..9eade54 100644 --- a/business/data_collector.go +++ b/business/data_collector.go @@ -35,7 +35,6 @@ func NewDataCollector(config *ModbusConfig) *DataCollector { // ReadSensorData 读取单个传感器数据(使用新的Modbus客户端) func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, error) { - // 确保通信间隔 dc.ensureCommunicationInterval() // 获取寄存器配置(只包含地址和数量) @@ -44,7 +43,6 @@ func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, return nil, fmt.Errorf("Failed to get register configuration: %v", err) } - // 关键:直接把 facility 当作 Modbus 从站地址(SlaveID) slaveID, err := strconv.Atoi(sensorConfig.Facility) if err != nil || slaveID < 1 || slaveID > 247 { return nil, fmt.Errorf("Invalid facility value, cannot be used as SlaveID: %s (DeviceUUID: %s)", @@ -52,19 +50,16 @@ func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, } slaveIDByte := byte(slaveID) - // 日志:用真实的 slaveID(来自 facility) logger.Logger.Printf("Reading sensor data - Device: %s, SlaveID: %d (from facility=%s), Addr: %d, Count: %d", sensorConfig.DeviceUUID, slaveID, sensorConfig.Facility, registerConfig.RegisterAddress, registerConfig.RegisterCount) - // 使用 facility 作为 SlaveID 创建客户端 client, err := NewDebugModbusClient(dc.modbusConfig, slaveIDByte) if err != nil { return nil, fmt.Errorf("Failed to create Modbus client: %v", err) } defer client.Close() - // 读取寄存器数据 rawData, err := client.ReadHoldingRegisters( registerConfig.RegisterAddress, registerConfig.RegisterCount, @@ -95,7 +90,7 @@ func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, return sensorData, nil } -// ensureCommunicationInterval 确保通信间隔 +// 确保通信间隔 func (dc *DataCollector) ensureCommunicationInterval() { elapsed := time.Since(dc.lastReadTime) if elapsed < dc.modbusConfig.SensorInterval { @@ -105,7 +100,7 @@ func (dc *DataCollector) ensureCommunicationInterval() { } } -// BatchReadSensorData 批量读取所有传感器数据 +// 批量读取所有传感器数据 func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*SensorData, []error) { var results []*SensorData var errors []error @@ -119,7 +114,6 @@ func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*S if err != nil { errors = append(errors, err) - // 如果连续错误,增加等待时间 if len(errors) > 2 { logger.Logger.Printf("Detected consecutive communication errors, increasing bus stabilization time") time.Sleep(dc.modbusConfig.BusResetDelay * 2) @@ -133,7 +127,7 @@ func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*S return results, errors } -// groupSensorsByType 按传感器类型分组 +// 按传感器类型分组 func groupSensorsByType(configs []SensorConfig) map[SensorType][]SensorConfig { groups := make(map[SensorType][]SensorConfig) diff --git a/business/data_processor.go b/business/data_processor.go index 011a680..8fbaa68 100644 --- a/business/data_processor.go +++ b/business/data_processor.go @@ -47,13 +47,12 @@ func (dp *DataProcessor) ProcessSensorData(sensorData *SensorData) (*ProcessedSe switch sensorData.SensorType { case SensorTypeTemperatureHumidity: - // 温湿度传感器:2个寄存器,分别表示温度和湿度 + if len(sensorData.RawData) >= 4 { humidity := float64(binary.BigEndian.Uint16(sensorData.RawData[0:2])) temperature := float64(binary.BigEndian.Uint16(sensorData.RawData[2:4])) - // 转换为实际值(根据传感器规格调整转换公式) temperature = temperature / 10.0 humidity = humidity / 10.0 diff --git a/business/data_types.go b/business/data_types.go index 7af7f84..e97bfeb 100644 --- a/business/data_types.go +++ b/business/data_types.go @@ -10,7 +10,7 @@ type ReportData struct { type ReportParams struct { RouteKey string `json:"route_key"` - Metric interface{} `json:"metric"` // 使用interface{}来支持不同类型 + Metric interface{} `json:"metric"` } // ReportMetricData 数据上报指标 diff --git a/business/fault_detector.go b/business/fault_detector.go index 7919f9e..b06772b 100644 --- a/business/fault_detector.go +++ b/business/fault_detector.go @@ -130,7 +130,6 @@ func (fd *FaultDetector) CollectAndClearPendingEvents() (alarms []EventData, onl timestamp := strconv.FormatInt(current.Unix(), 10) for deviceUUID, status := range fd.deviceStatuses { - // 检查是否有需要上报的离线恢复(原来逻辑是实时上报的,这里改为攒着批量发) if !status.IsOnline && status.ErrorCount >= 3 && status.LastReportTime.Before(current.Add(-5*time.Second)) { // 防止重复发,简单用时间判断 alarms = append(alarms, EventData{ @@ -140,21 +139,21 @@ func (fd *FaultDetector) CollectAndClearPendingEvents() (alarms []EventData, onl DeviceUUID: deviceUUID, Level: "alarm", LastSuccessfulTime: strconv.FormatInt(status.LastSuccessTime.Unix(), 10), - SensorType: "未知", // 后面你采集时会带 sensorType,这里先占位 + SensorType: "未知", Recommendation: "Check device connection and communication line", }) status.LastReportTime = current // 标记已处理 } if status.IsOnline && status.LastSuccessTime.After(status.LastReportTime) { - // 如果最近一次成功采集时间 > 上次上报时间,说明刚刚恢复,需要发 online + onlines = append(onlines, EventData{ Description: "Sensor online", Timestamp: timestamp, HostUUID: fd.hostUUID, DeviceUUID: deviceUUID, Level: "online", - SensorType: "未知", // 同上 + SensorType: "未知", }) status.LastReportTime = current } diff --git a/business/gpio_ctrl.go b/business/gpio_ctrl.go index 37f3184..dbab336 100644 --- a/business/gpio_ctrl.go +++ b/business/gpio_ctrl.go @@ -16,7 +16,6 @@ func TriggerBuzzer(seconds int) { seconds = 10 } - // 必须用同步!异步 goroutine 在某些系统里会被静默丢弃 fmt.Printf("蜂鸣器开始响铃 %d 秒...\n", seconds) gpioPath := fmt.Sprintf("/sys/class/gpio/gpio%d", BuzzerGPIO) diff --git a/business/main_business.go b/business/main_business.go index 8d503a5..fae1470 100644 --- a/business/main_business.go +++ b/business/main_business.go @@ -49,7 +49,7 @@ func NewBusinessManager(sensorConfigs []SensorConfig) (*BusinessManager, error) reportManager: reportManager, sensorConfigs: sensorConfigs, hostUUID: hostUUID, - isFirstCollect: true, // 初始化第一次采集标志 + isFirstCollect: true, }, nil } @@ -112,7 +112,6 @@ func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, re // 创建错误映射表 errorMap := make(map[string]string) for _, err := range readErrors { - // 从错误信息中提取deviceUUID(需要根据实际错误格式调整) errorMap["unknown"] = err.Error() } @@ -154,9 +153,9 @@ func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, re } // reportFirstCollectionStatus 上报第一次采集的系统状态 -// reportFirstCollectionStatus 上报第一次采集的系统状态(新协议:批量发送所有在线设备的 online 事件) + func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []error, processedDataList []*ProcessedSensorData) { - // 只使用 processedDataList 就够了,另外两个参数我们不需要但必须保留(调用方传了),用 _ 忽略 + successCount := len(processedDataList) totalCount := len(bm.sensorConfigs) @@ -201,12 +200,11 @@ func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []erro metric := map[string]interface{}{ "type": "event", - "args": events, // 批量数组! + "args": events, } routeKey := fmt.Sprintf("/dthjjc/device/%s/event", bm.hostUUID) - // 打印完整 JSON 方便你看 fullMsg := map[string]interface{}{ "method": "metric_data", "params": map[string]interface{}{ diff --git a/business/modbus_config.go b/business/modbus_config.go index 6ecbfe1..9eb8b3f 100644 --- a/business/modbus_config.go +++ b/business/modbus_config.go @@ -53,11 +53,11 @@ func DefaultModbusConfig() *ModbusConfig { DataBits: 8, StopBits: 1, Parity: "N", - Timeout: 3 * time.Second, // 缩短超时时间 - RetryCount: 2, // 减少重试次数 + Timeout: 3 * time.Second, + RetryCount: 2, RetryDelay: 100 * time.Millisecond, - SensorInterval: 200 * time.Millisecond, // 传感器间读取间隔 - BusResetDelay: 50 * time.Millisecond, // 总线稳定时间 + SensorInterval: 200 * time.Millisecond, + BusResetDelay: 50 * time.Millisecond, } } diff --git a/business/report_manager.go b/business/report_manager.go index 2e049a8..d30b07a 100644 --- a/business/report_manager.go +++ b/business/report_manager.go @@ -43,13 +43,11 @@ func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData, deviceDataList = append(deviceDataList, deviceData) } - // 构建数据上报的metric内容 metricData := map[string]interface{}{ "type": "data", "args": deviceDataList, } - // 外层route_key(用于WebSocket路由) outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID) // 记录要发送的详细内容 @@ -104,15 +102,11 @@ func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData, // ReportEventData 上报事件数据 func (rm *ReportManager) ReportEventData(event *EventData) error { - // 新协议:我们不再单个发,而是偷偷攒进一个全局批量队列,统一用新格式发 - // 但为了不影响其他调用处,我们保留这个函数,只是内部实现变了 - // 特殊处理:如果 event.Level == "system_start",直接忽略(取消这个事件) if event.Level == "system_start" { return nil } - // 转换为新批量格式(复用 EventData 的字段) item := map[string]interface{}{ "description": event.Description, "timestamp": event.Timestamp, @@ -123,8 +117,7 @@ func (rm *ReportManager) ReportEventData(event *EventData) error { if event.DeviceUUID != "" { item["device_uuid"] = event.DeviceUUID // 修复字段名 - // 从全局 sensorConfigs 里找对应的 name(你本来就传进来了!) - for _, cfg := range rm.sensorConfigs { // ← 这里我们马上加一个字段 + for _, cfg := range rm.sensorConfigs { if cfg.DeviceUUID == event.DeviceUUID { item["name"] = cfg.Name break @@ -141,13 +134,12 @@ func (rm *ReportManager) ReportEventData(event *EventData) error { item["recommendation"] = event.Recommendation } - // 关键:复用原来的 metric_data 通道发送,但 args 是数组 var events []map[string]interface{} events = append(events, item) metric := map[string]interface{}{ "type": "event", - "args": events, // 关键:这里是数组,哪怕只有1条 + "args": events, } routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID) @@ -198,7 +190,7 @@ func (rm *ReportManager) convertToDeviceData(data *ProcessedSensorData, sensorCo "temp": fmt.Sprintf("%.1f", data.Values[0]), "humidity": fmt.Sprintf("%.1f", data.Values[1]), } - deviceData.Value = valueMap // 直接赋值对象,不是数组 + deviceData.Value = valueMap } else { return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values) } diff --git a/main.go b/main.go index 811b6bc..6ab5997 100644 --- a/main.go +++ b/main.go @@ -15,7 +15,6 @@ import ( ) func main() { - // 解析命令行参数 debugMode := flag.Bool("debug", false, "Enable debug mode") slaveID := flag.String("slave", "2", "Slave address (debug mode only)") sensorType := flag.String("type", "1", "Sensor type 1-Temperature/Humidity, 2-Water Leak, 3-Smoke (debug mode only)") @@ -68,7 +67,6 @@ func setupGracefulShutdown() { sig := <-sigChan logger.Logger.Printf("Received signal: %v, starting shutdown", sig) - // 关闭 WebSocket 连接 connect.Close() logger.Logger.Printf("WebSocket connection closed") @@ -81,7 +79,6 @@ func setupGracefulShutdown() { func runNormalMode() { logger.Logger.Printf("=== Environmental Detection System Starting ===") - // 原有的正常模式代码... // 2. 初始化 WebSocket 连接 logger.Logger.Printf("Initializing WebSocket connection...") if err := connect.InitWSChannel(); err != nil { @@ -233,9 +230,4 @@ func handleAlertMessage(params *connect.PublishParams) { return } - // 终极兜底:只要包含 type 和 1 就响 - //if strings.Contains(raw, "type") && strings.Contains(raw, "1") { - // logger.Logger.Printf("暴力识别告警指令 → 强制响 3 秒") - // business.TriggerBuzzer(3) - //} }