2025-12-10 14:24:36 +08:00

235 lines
6.5 KiB
Go

package business
import (
"Dynamic_environmental_detection/connect"
"encoding/json"
"fmt"
"strconv"
"time"
"Dynamic_environmental_detection/logger"
"github.com/goburrow/modbus"
)
// BusinessManager 业务管理器
type BusinessManager struct {
modbusClient modbus.Client
modbusConfig *ModbusConfig
dataCollector *DataCollector
dataProcessor *DataProcessor
reportManager *ReportManager
sensorConfigs []SensorConfig
hostUUID string
isFirstCollect bool
}
// NewBusinessManager 创建业务管理器
func NewBusinessManager(sensorConfigs []SensorConfig) (*BusinessManager, error) {
if len(sensorConfigs) == 0 {
return nil, fmt.Errorf("sensor configuration is empty")
}
// 从第一个配置获取host_uuid
hostUUID := sensorConfigs[0].HostUUID
// 创建Modbus配置
modbusConfig := DefaultModbusConfig()
// 创建数据采集器、处理器和上报管理器
dataCollector := NewDataCollector(modbusConfig)
dataProcessor := NewDataProcessor()
reportManager := NewReportManager(hostUUID, sensorConfigs)
logger.Logger.Printf("DEBUG: Business manager created, isFirstCollect initialized to: true")
return &BusinessManager{
modbusConfig: modbusConfig,
dataCollector: dataCollector,
dataProcessor: dataProcessor,
reportManager: reportManager,
sensorConfigs: sensorConfigs,
hostUUID: hostUUID,
isFirstCollect: true,
}, nil
}
// StartDataCollection 启动数据采集和上报
func (bm *BusinessManager) StartDataCollection() error {
logger.Logger.Printf("Starting sensor data collection, total sensors: %d", len(bm.sensorConfigs))
startTime := time.Now()
// 批量读取传感器数据
sensorDataList, readErrors := bm.dataCollector.BatchReadSensorData(bm.sensorConfigs)
// 更新设备状态并上报事件
bm.updateDeviceStatuses(sensorDataList, readErrors)
// 批量处理传感器数据
processedDataList, processErrors := bm.dataProcessor.BatchProcessSensorData(sensorDataList)
if len(processErrors) > 0 {
logger.Logger.Printf("Partial data processing failed, error count: %d", len(processErrors))
for _, err := range processErrors {
logger.Logger.Printf("Processing error: %v", err)
}
}
// 添加调试信息,检查是否为第一次采集
logger.Logger.Printf("DEBUG: Is first collection: %v", bm.isFirstCollect)
// 如果是第一次采集,上报整体状态
if bm.isFirstCollect {
logger.Logger.Printf("DEBUG: Processing first collection status report")
bm.reportFirstCollectionStatus(sensorDataList, readErrors, processedDataList)
bm.isFirstCollect = false // 标记为已完成第一次采集
logger.Logger.Printf("DEBUG: First collection status report completed, isFirstCollect set to: %v", bm.isFirstCollect)
}
// 上报指标数据
if len(processedDataList) > 0 {
if err := bm.reportManager.ReportMetricData(processedDataList, bm.sensorConfigs); err != nil {
logger.Logger.Printf("Failed to report metric data: %v", err)
}
}
// 统计信息
successCount := 0
for _, data := range processedDataList {
if bm.dataProcessor.IsDataValid(data) {
successCount++
}
}
totalTime := time.Since(startTime)
logger.Logger.Printf("Data collection completed - Success: %d/%d, Read errors: %d, Process errors: %d, Total time: %v",
successCount, len(bm.sensorConfigs), len(readErrors), len(processErrors), totalTime)
return nil
}
// updateDeviceStatuses 更新所有设备状态
func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, readErrors []error) {
// 创建错误映射表
errorMap := make(map[string]string)
for _, err := range readErrors {
errorMap["unknown"] = err.Error()
}
// 更新成功读取的设备状态
for _, sensorData := range sensorDataList {
bm.reportManager.UpdateAndReportDeviceStatus(
sensorData.DeviceUUID,
true,
"",
sensorData.SensorType,
)
}
// 更新读取失败的设备状态
for _, config := range bm.sensorConfigs {
found := false
for _, sensorData := range sensorDataList {
if sensorData.DeviceUUID == config.DeviceUUID {
found = true
break
}
}
if !found {
// 设备读取失败
errMsg := "Device read timeout or communication failure"
if len(readErrors) > 0 {
errMsg = readErrors[0].Error()
}
bm.reportManager.UpdateAndReportDeviceStatus(
config.DeviceUUID,
false,
errMsg,
config.GetSensorType(),
)
}
}
}
// reportFirstCollectionStatus 上报第一次采集的系统状态
func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []error, processedDataList []*ProcessedSensorData) {
successCount := len(processedDataList)
totalCount := len(bm.sensorConfigs)
logger.Logger.Printf("=== System First Collection Status Report ===")
logger.Logger.Printf("Total devices: %d, Successfully collected and processed: %d", totalCount, successCount)
// 收集成功设备的 UUID
successUUIDs := make([]string, 0, successCount)
for _, data := range processedDataList {
successUUIDs = append(successUUIDs, data.DeviceUUID)
}
// 统一时间戳
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
// 事件数组
var events []map[string]interface{}
for _, uuid := range successUUIDs {
sensorName := "Unknown Sensor"
for _, cfg := range bm.sensorConfigs {
if cfg.DeviceUUID == uuid {
sensorName = GetSensorTypeName(cfg.GetSensorType())
break
}
}
events = append(events, map[string]interface{}{
"description": "Sensor online",
"timestamp": timestamp,
"host_uuid": bm.hostUUID,
"deviceuuid": uuid,
"level": "online",
"sensor_type": sensorName,
})
}
if len(events) == 0 {
logger.Logger.Printf("No successful devices in first collection, skipping report")
return
}
metric := map[string]interface{}{
"type": "event",
"args": events,
}
routeKey := fmt.Sprintf("/dthjjc/device/%s/event", bm.hostUUID)
fullMsg := map[string]interface{}{
"method": "metric_data",
"params": map[string]interface{}{
"route_key": routeKey,
"metric": metric,
},
}
if b, err := json.Marshal(fullMsg); err == nil {
logger.Logger.Printf("First collection batch online events: %s", string(b))
}
// 发送
if err := connect.ReportMetricData(routeKey, metric); err != nil {
logger.Logger.Printf("Failed to report first batch online events: %v", err)
} else {
logger.Logger.Printf("First collection successful, batch reported %d devices online", len(events))
}
}
func (bm *BusinessManager) GetReportManager() *ReportManager {
return bm.reportManager
}
// Close 关闭资源
func (bm *BusinessManager) Close() {
logger.Logger.Printf("Business manager resources released")
}