package business import ( "Dynamic_environmental_detection/connect" "Dynamic_environmental_detection/logger" "encoding/json" "fmt" "strconv" ) // ReportManager 数据上报管理器 type ReportManager struct { hostUUID string faultDetector *FaultDetector sensorConfigs []SensorConfig } // NewReportManager 创建数据上报管理器 func NewReportManager(hostUUID string, sensorConfigs []SensorConfig) *ReportManager { return &ReportManager{ hostUUID: hostUUID, faultDetector: NewFaultDetector(hostUUID), sensorConfigs: sensorConfigs, } } // ReportMetricData 上报指标数据 func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData, sensorConfigs []SensorConfig) error { if len(processedData) == 0 { logger.Logger.Printf("There is no valid data to report") return nil } // 构建设备数据数组 var deviceDataList []DeviceData for _, data := range processedData { deviceData, err := rm.convertToDeviceData(data, sensorConfigs) if err != nil { logger.Logger.Printf("Failed to convert device data (Device: %s): %v", data.DeviceUUID, err) continue } deviceDataList = append(deviceDataList, deviceData) } // 构建数据上报的metric内容 metricData := map[string]interface{}{ "type": "data", "args": deviceDataList, } // 外层route_key(用于WebSocket路由) outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID) // 记录要发送的详细内容 messageJSON, err := json.Marshal(map[string]interface{}{ "method": "metric_data", "params": map[string]interface{}{ "route_key": outerRouteKey, "metric": metricData, }, }) if err != nil { logger.Logger.Printf("Failed to serialize and report data: %v", err) } else { logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON)) } logger.Logger.Printf("Prepare to submit indicator data and equipment count: %d", len(deviceDataList)) // 上报数据 if err := connect.ReportMetricData(outerRouteKey, metricData); err != nil { return fmt.Errorf("Failed to report indicator data: %v", err) } // 记录每个设备的上报状态 for _, data := range processedData { switch data.SensorType { case SensorTypeTemperatureHumidity: if len(data.Values) >= 2 { logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Temp=%.1f°C, Humidity=%.1f%%)", data.DeviceUUID, data.Values[0], data.Values[1]) } case SensorTypeWaterLeak: statusText := "online" if data.Status == 1 { statusText = "alarm" } logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Water Immersion Sensor Status: %s)", data.DeviceUUID, statusText) case SensorTypeSmoke: statusText := "online" if data.Status == 1 { statusText = "alarm" } logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Smoke Sensor Status: %s)", data.DeviceUUID, statusText) } } logger.Logger.Printf("Asynchronous reporting successful") return nil } // ReportEventData 上报事件数据 func (rm *ReportManager) ReportEventData(event *EventData) error { // 新协议:我们不再单个发,而是偷偷攒进一个全局批量队列,统一用新格式发 // 但为了不影响其他调用处,我们保留这个函数,只是内部实现变了 // 特殊处理:如果 event.Level == "system_start",直接忽略(取消这个事件) if event.Level == "system_start" { return nil } // 转换为新批量格式(复用 EventData 的字段) item := map[string]interface{}{ "description": event.Description, "timestamp": event.Timestamp, "host_uuid": event.HostUUID, "level": event.Level, } if event.DeviceUUID != "" { item["device_uuid"] = event.DeviceUUID // 修复字段名 // 从全局 sensorConfigs 里找对应的 name(你本来就传进来了!) for _, cfg := range rm.sensorConfigs { // ← 这里我们马上加一个字段 if cfg.DeviceUUID == event.DeviceUUID { item["name"] = cfg.Name break } } } if event.LastSuccessfulTime != "" { item["last_successful_time"] = event.LastSuccessfulTime } if event.SensorType != "" { item["sensor_type"] = event.SensorType } if event.Recommendation != "" { item["recommendation"] = event.Recommendation } // 关键:复用原来的 metric_data 通道发送,但 args 是数组 var events []map[string]interface{} events = append(events, item) metric := map[string]interface{}{ "type": "event", "args": events, // 关键:这里是数组,哪怕只有1条 } routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID) messageJSON, _ := json.Marshal(map[string]interface{}{ "method": "metric_data", "params": map[string]interface{}{ "route_key": routeKey, "metric": metric, }, }) logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON)) return connect.ReportMetricData(routeKey, metric) } // convertToDeviceData 转换处理后的数据为上报格式 func (rm *ReportManager) convertToDeviceData(data *ProcessedSensorData, sensorConfigs []SensorConfig) (DeviceData, error) { // 查找设备配置 var sensorConfig SensorConfig found := false for _, config := range sensorConfigs { if config.DeviceUUID == data.DeviceUUID { sensorConfig = config found = true break } } if !found { return DeviceData{}, fmt.Errorf("No device configuration found: %s", data.DeviceUUID) } deviceData := DeviceData{ DeviceUUID: data.DeviceUUID, Type: strconv.Itoa(int(data.SensorType)), Facility: sensorConfig.Facility, Address: sensorConfig.Address, Name: sensorConfig.Name, HostUUID: sensorConfig.HostUUID, } // 根据传感器类型设置value字段 switch data.SensorType { case SensorTypeTemperatureHumidity: if len(data.Values) >= 2 { valueMap := map[string]string{ "temp": fmt.Sprintf("%.1f", data.Values[0]), "humidity": fmt.Sprintf("%.1f", data.Values[1]), } deviceData.Value = valueMap // 直接赋值对象,不是数组 } else { return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values) } case SensorTypeWaterLeak, SensorTypeSmoke: deviceData.Value = strconv.Itoa(data.Status) default: return DeviceData{}, fmt.Errorf("Unknown sensor type: %d", data.SensorType) } logger.Logger.Printf("DEBUG: Conversion of equipment data successful - Device: %s, Va1lue: %+v", data.DeviceUUID, deviceData.Value) return deviceData, nil } // UpdateAndReportDeviceStatus 更新设备状态并上报事件 func (rm *ReportManager) UpdateAndReportDeviceStatus(deviceUUID string, success bool, errMsg string, sensorType SensorType) { needReport, event := rm.faultDetector.UpdateDeviceStatus(deviceUUID, success, errMsg, sensorType) if needReport && event != nil { // 异步上报事件 go func() { if err := rm.ReportEventData(event); err != nil { logger.Logger.Printf("Event reporting failed: %v", err) } else { logger.Logger.Printf("Asynchronous reporting successful") } }() } } // GetFaultDetector 获取故障检测器 func (rm *ReportManager) GetFaultDetector() *FaultDetector { return rm.faultDetector }