2025-12-10 14:24:36 +08:00

228 lines
6.3 KiB
Go

package business
import (
"Dynamic_environmental_detection/connect"
"Dynamic_environmental_detection/logger"
"encoding/json"
"fmt"
"strconv"
)
// ReportManager 数据上报管理器
type ReportManager struct {
hostUUID string
faultDetector *FaultDetector
sensorConfigs []SensorConfig
}
// NewReportManager 创建数据上报管理器
func NewReportManager(hostUUID string, sensorConfigs []SensorConfig) *ReportManager {
return &ReportManager{
hostUUID: hostUUID,
faultDetector: NewFaultDetector(hostUUID),
sensorConfigs: sensorConfigs,
}
}
// ReportMetricData 上报指标数据
func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData, sensorConfigs []SensorConfig) error {
if len(processedData) == 0 {
logger.Logger.Printf("There is no valid data to report")
return nil
}
// 构建设备数据数组
var deviceDataList []DeviceData
for _, data := range processedData {
deviceData, err := rm.convertToDeviceData(data, sensorConfigs)
if err != nil {
logger.Logger.Printf("Failed to convert device data (Device: %s): %v", data.DeviceUUID, err)
continue
}
deviceDataList = append(deviceDataList, deviceData)
}
metricData := map[string]interface{}{
"type": "data",
"args": deviceDataList,
}
outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID)
// 记录要发送的详细内容
messageJSON, err := json.Marshal(map[string]interface{}{
"method": "metric_data",
"params": map[string]interface{}{
"route_key": outerRouteKey,
"metric": metricData,
},
})
if err != nil {
logger.Logger.Printf("Failed to serialize and report data: %v", err)
} else {
logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON))
}
logger.Logger.Printf("Prepare to submit indicator data and equipment count: %d", len(deviceDataList))
// 上报数据
if err := connect.ReportMetricData(outerRouteKey, metricData); err != nil {
return fmt.Errorf("Failed to report indicator data: %v", err)
}
// 记录每个设备的上报状态
for _, data := range processedData {
switch data.SensorType {
case SensorTypeTemperatureHumidity:
if len(data.Values) >= 2 {
logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Temp=%.1f°C, Humidity=%.1f%%)",
data.DeviceUUID, data.Values[0], data.Values[1])
}
case SensorTypeWaterLeak:
statusText := "online"
if data.Status == 1 {
statusText = "alarm"
}
logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Water Immersion Sensor Status: %s)",
data.DeviceUUID, statusText)
case SensorTypeSmoke:
statusText := "online"
if data.Status == 1 {
statusText = "alarm"
}
logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Smoke Sensor Status: %s)",
data.DeviceUUID, statusText)
}
}
logger.Logger.Printf("Asynchronous reporting successful")
return nil
}
// ReportEventData 上报事件数据
func (rm *ReportManager) ReportEventData(event *EventData) error {
if event.Level == "system_start" {
return nil
}
item := map[string]interface{}{
"description": event.Description,
"timestamp": event.Timestamp,
"host_uuid": event.HostUUID,
"level": event.Level,
}
if event.DeviceUUID != "" {
item["device_uuid"] = event.DeviceUUID // 修复字段名
for _, cfg := range rm.sensorConfigs {
if cfg.DeviceUUID == event.DeviceUUID {
item["name"] = cfg.Name
break
}
}
}
if event.LastSuccessfulTime != "" {
item["last_successful_time"] = event.LastSuccessfulTime
}
if event.SensorType != "" {
item["sensor_type"] = event.SensorType
}
if event.Recommendation != "" {
item["recommendation"] = event.Recommendation
}
var events []map[string]interface{}
events = append(events, item)
metric := map[string]interface{}{
"type": "event",
"args": events,
}
routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID)
messageJSON, _ := json.Marshal(map[string]interface{}{
"method": "metric_data",
"params": map[string]interface{}{
"route_key": routeKey,
"metric": metric,
},
})
logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON))
return connect.ReportMetricData(routeKey, metric)
}
// convertToDeviceData 转换处理后的数据为上报格式
func (rm *ReportManager) convertToDeviceData(data *ProcessedSensorData, sensorConfigs []SensorConfig) (DeviceData, error) {
// 查找设备配置
var sensorConfig SensorConfig
found := false
for _, config := range sensorConfigs {
if config.DeviceUUID == data.DeviceUUID {
sensorConfig = config
found = true
break
}
}
if !found {
return DeviceData{}, fmt.Errorf("No device configuration found: %s", data.DeviceUUID)
}
deviceData := DeviceData{
DeviceUUID: data.DeviceUUID,
Type: strconv.Itoa(int(data.SensorType)),
Facility: sensorConfig.Facility,
Address: sensorConfig.Address,
Name: sensorConfig.Name,
HostUUID: sensorConfig.HostUUID,
}
// 根据传感器类型设置value字段
switch data.SensorType {
case SensorTypeTemperatureHumidity:
if len(data.Values) >= 2 {
valueMap := map[string]string{
"temp": fmt.Sprintf("%.1f", data.Values[0]),
"humidity": fmt.Sprintf("%.1f", data.Values[1]),
}
deviceData.Value = valueMap
} else {
return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values)
}
case SensorTypeWaterLeak, SensorTypeSmoke:
deviceData.Value = strconv.Itoa(data.Status)
default:
return DeviceData{}, fmt.Errorf("Unknown sensor type: %d", data.SensorType)
}
logger.Logger.Printf("DEBUG: Conversion of equipment data successful - Device: %s, Va1lue: %+v", data.DeviceUUID, deviceData.Value)
return deviceData, nil
}
// UpdateAndReportDeviceStatus 更新设备状态并上报事件
func (rm *ReportManager) UpdateAndReportDeviceStatus(deviceUUID string, success bool, errMsg string, sensorType SensorType) {
needReport, event := rm.faultDetector.UpdateDeviceStatus(deviceUUID, success, errMsg, sensorType)
if needReport && event != nil {
// 异步上报事件
go func() {
if err := rm.ReportEventData(event); err != nil {
logger.Logger.Printf("Event reporting failed: %v", err)
} else {
logger.Logger.Printf("Asynchronous reporting successful")
}
}()
}
}
// GetFaultDetector 获取故障检测器
func (rm *ReportManager) GetFaultDetector() *FaultDetector {
return rm.faultDetector
}