236 lines
7.1 KiB
Go
236 lines
7.1 KiB
Go
package business
|
||
|
||
import (
|
||
"Dynamic_environmental_detection/connect"
|
||
"Dynamic_environmental_detection/logger"
|
||
"encoding/json"
|
||
"fmt"
|
||
"strconv"
|
||
)
|
||
|
||
// ReportManager 数据上报管理器
|
||
type ReportManager struct {
|
||
hostUUID string
|
||
faultDetector *FaultDetector
|
||
sensorConfigs []SensorConfig
|
||
}
|
||
|
||
// NewReportManager 创建数据上报管理器
|
||
func NewReportManager(hostUUID string, sensorConfigs []SensorConfig) *ReportManager {
|
||
return &ReportManager{
|
||
hostUUID: hostUUID,
|
||
faultDetector: NewFaultDetector(hostUUID),
|
||
sensorConfigs: sensorConfigs,
|
||
}
|
||
}
|
||
|
||
// ReportMetricData 上报指标数据
|
||
func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData, sensorConfigs []SensorConfig) error {
|
||
if len(processedData) == 0 {
|
||
logger.Logger.Printf("There is no valid data to report")
|
||
return nil
|
||
}
|
||
|
||
// 构建设备数据数组
|
||
var deviceDataList []DeviceData
|
||
|
||
for _, data := range processedData {
|
||
deviceData, err := rm.convertToDeviceData(data, sensorConfigs)
|
||
if err != nil {
|
||
logger.Logger.Printf("Failed to convert device data (Device: %s): %v", data.DeviceUUID, err)
|
||
continue
|
||
}
|
||
deviceDataList = append(deviceDataList, deviceData)
|
||
}
|
||
|
||
// 构建数据上报的metric内容
|
||
metricData := map[string]interface{}{
|
||
"type": "data",
|
||
"args": deviceDataList,
|
||
}
|
||
|
||
// 外层route_key(用于WebSocket路由)
|
||
outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID)
|
||
|
||
// 记录要发送的详细内容
|
||
messageJSON, err := json.Marshal(map[string]interface{}{
|
||
"method": "metric_data",
|
||
"params": map[string]interface{}{
|
||
"route_key": outerRouteKey,
|
||
"metric": metricData,
|
||
},
|
||
})
|
||
if err != nil {
|
||
logger.Logger.Printf("Failed to serialize and report data: %v", err)
|
||
} else {
|
||
logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON))
|
||
}
|
||
|
||
logger.Logger.Printf("Prepare to submit indicator data and equipment count: %d", len(deviceDataList))
|
||
|
||
// 上报数据
|
||
if err := connect.ReportMetricData(outerRouteKey, metricData); err != nil {
|
||
return fmt.Errorf("Failed to report indicator data: %v", err)
|
||
}
|
||
|
||
// 记录每个设备的上报状态
|
||
for _, data := range processedData {
|
||
switch data.SensorType {
|
||
case SensorTypeTemperatureHumidity:
|
||
if len(data.Values) >= 2 {
|
||
logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Temp=%.1f°C, Humidity=%.1f%%)",
|
||
data.DeviceUUID, data.Values[0], data.Values[1])
|
||
}
|
||
case SensorTypeWaterLeak:
|
||
statusText := "online"
|
||
if data.Status == 1 {
|
||
statusText = "alarm"
|
||
}
|
||
logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Water Immersion Sensor Status: %s)",
|
||
data.DeviceUUID, statusText)
|
||
case SensorTypeSmoke:
|
||
statusText := "online"
|
||
if data.Status == 1 {
|
||
statusText = "alarm"
|
||
}
|
||
logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Smoke Sensor Status: %s)",
|
||
data.DeviceUUID, statusText)
|
||
}
|
||
}
|
||
|
||
logger.Logger.Printf("Asynchronous reporting successful")
|
||
return nil
|
||
}
|
||
|
||
// ReportEventData 上报事件数据
|
||
func (rm *ReportManager) ReportEventData(event *EventData) error {
|
||
// 新协议:我们不再单个发,而是偷偷攒进一个全局批量队列,统一用新格式发
|
||
// 但为了不影响其他调用处,我们保留这个函数,只是内部实现变了
|
||
|
||
// 特殊处理:如果 event.Level == "system_start",直接忽略(取消这个事件)
|
||
if event.Level == "system_start" {
|
||
return nil
|
||
}
|
||
|
||
// 转换为新批量格式(复用 EventData 的字段)
|
||
item := map[string]interface{}{
|
||
"description": event.Description,
|
||
"timestamp": event.Timestamp,
|
||
"host_uuid": event.HostUUID,
|
||
"level": event.Level,
|
||
}
|
||
|
||
if event.DeviceUUID != "" {
|
||
item["device_uuid"] = event.DeviceUUID // 修复字段名
|
||
|
||
// 从全局 sensorConfigs 里找对应的 name(你本来就传进来了!)
|
||
for _, cfg := range rm.sensorConfigs { // ← 这里我们马上加一个字段
|
||
if cfg.DeviceUUID == event.DeviceUUID {
|
||
item["name"] = cfg.Name
|
||
break
|
||
}
|
||
}
|
||
}
|
||
if event.LastSuccessfulTime != "" {
|
||
item["last_successful_time"] = event.LastSuccessfulTime
|
||
}
|
||
if event.SensorType != "" {
|
||
item["sensor_type"] = event.SensorType
|
||
}
|
||
if event.Recommendation != "" {
|
||
item["recommendation"] = event.Recommendation
|
||
}
|
||
|
||
// 关键:复用原来的 metric_data 通道发送,但 args 是数组
|
||
var events []map[string]interface{}
|
||
events = append(events, item)
|
||
|
||
metric := map[string]interface{}{
|
||
"type": "event",
|
||
"args": events, // 关键:这里是数组,哪怕只有1条
|
||
}
|
||
|
||
routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID)
|
||
|
||
messageJSON, _ := json.Marshal(map[string]interface{}{
|
||
"method": "metric_data",
|
||
"params": map[string]interface{}{
|
||
"route_key": routeKey,
|
||
"metric": metric,
|
||
},
|
||
})
|
||
logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON))
|
||
|
||
return connect.ReportMetricData(routeKey, metric)
|
||
}
|
||
|
||
// convertToDeviceData 转换处理后的数据为上报格式
|
||
func (rm *ReportManager) convertToDeviceData(data *ProcessedSensorData, sensorConfigs []SensorConfig) (DeviceData, error) {
|
||
// 查找设备配置
|
||
var sensorConfig SensorConfig
|
||
found := false
|
||
for _, config := range sensorConfigs {
|
||
if config.DeviceUUID == data.DeviceUUID {
|
||
sensorConfig = config
|
||
found = true
|
||
break
|
||
}
|
||
}
|
||
|
||
if !found {
|
||
return DeviceData{}, fmt.Errorf("No device configuration found: %s", data.DeviceUUID)
|
||
}
|
||
|
||
deviceData := DeviceData{
|
||
DeviceUUID: data.DeviceUUID,
|
||
Type: strconv.Itoa(int(data.SensorType)),
|
||
Facility: sensorConfig.Facility,
|
||
Address: sensorConfig.Address,
|
||
Name: sensorConfig.Name,
|
||
HostUUID: sensorConfig.HostUUID,
|
||
}
|
||
|
||
// 根据传感器类型设置value字段
|
||
switch data.SensorType {
|
||
case SensorTypeTemperatureHumidity:
|
||
if len(data.Values) >= 2 {
|
||
valueMap := map[string]string{
|
||
"temp": fmt.Sprintf("%.1f", data.Values[0]),
|
||
"humidity": fmt.Sprintf("%.1f", data.Values[1]),
|
||
}
|
||
deviceData.Value = valueMap // 直接赋值对象,不是数组
|
||
} else {
|
||
return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values)
|
||
}
|
||
|
||
case SensorTypeWaterLeak, SensorTypeSmoke:
|
||
deviceData.Value = strconv.Itoa(data.Status)
|
||
|
||
default:
|
||
return DeviceData{}, fmt.Errorf("Unknown sensor type: %d", data.SensorType)
|
||
}
|
||
|
||
logger.Logger.Printf("DEBUG: Conversion of equipment data successful - Device: %s, Va1lue: %+v", data.DeviceUUID, deviceData.Value)
|
||
return deviceData, nil
|
||
}
|
||
|
||
// UpdateAndReportDeviceStatus 更新设备状态并上报事件
|
||
func (rm *ReportManager) UpdateAndReportDeviceStatus(deviceUUID string, success bool, errMsg string, sensorType SensorType) {
|
||
needReport, event := rm.faultDetector.UpdateDeviceStatus(deviceUUID, success, errMsg, sensorType)
|
||
if needReport && event != nil {
|
||
// 异步上报事件
|
||
go func() {
|
||
if err := rm.ReportEventData(event); err != nil {
|
||
logger.Logger.Printf("Event reporting failed: %v", err)
|
||
} else {
|
||
logger.Logger.Printf("Asynchronous reporting successful")
|
||
}
|
||
}()
|
||
}
|
||
}
|
||
|
||
// GetFaultDetector 获取故障检测器
|
||
func (rm *ReportManager) GetFaultDetector() *FaultDetector {
|
||
return rm.faultDetector
|
||
}
|