235 lines
6.5 KiB
Go
235 lines
6.5 KiB
Go
package business
|
|
|
|
import (
|
|
"Dynamic_environmental_detection/connect"
|
|
"encoding/json"
|
|
"fmt"
|
|
"strconv"
|
|
"time"
|
|
|
|
"Dynamic_environmental_detection/logger"
|
|
"github.com/goburrow/modbus"
|
|
)
|
|
|
|
// BusinessManager 业务管理器
|
|
type BusinessManager struct {
|
|
modbusClient modbus.Client
|
|
modbusConfig *ModbusConfig
|
|
dataCollector *DataCollector
|
|
dataProcessor *DataProcessor
|
|
reportManager *ReportManager
|
|
sensorConfigs []SensorConfig
|
|
hostUUID string
|
|
isFirstCollect bool
|
|
}
|
|
|
|
// NewBusinessManager 创建业务管理器
|
|
func NewBusinessManager(sensorConfigs []SensorConfig) (*BusinessManager, error) {
|
|
if len(sensorConfigs) == 0 {
|
|
return nil, fmt.Errorf("sensor configuration is empty")
|
|
}
|
|
|
|
// 从第一个配置获取host_uuid
|
|
hostUUID := sensorConfigs[0].HostUUID
|
|
|
|
// 创建Modbus配置
|
|
modbusConfig := DefaultModbusConfig()
|
|
|
|
// 创建数据采集器、处理器和上报管理器
|
|
dataCollector := NewDataCollector(modbusConfig)
|
|
dataProcessor := NewDataProcessor()
|
|
reportManager := NewReportManager(hostUUID, sensorConfigs)
|
|
|
|
logger.Logger.Printf("DEBUG: Business manager created, isFirstCollect initialized to: true")
|
|
|
|
return &BusinessManager{
|
|
modbusConfig: modbusConfig,
|
|
dataCollector: dataCollector,
|
|
dataProcessor: dataProcessor,
|
|
reportManager: reportManager,
|
|
sensorConfigs: sensorConfigs,
|
|
hostUUID: hostUUID,
|
|
isFirstCollect: true,
|
|
}, nil
|
|
}
|
|
|
|
// StartDataCollection 启动数据采集和上报
|
|
func (bm *BusinessManager) StartDataCollection() error {
|
|
logger.Logger.Printf("Starting sensor data collection, total sensors: %d", len(bm.sensorConfigs))
|
|
startTime := time.Now()
|
|
|
|
// 批量读取传感器数据
|
|
sensorDataList, readErrors := bm.dataCollector.BatchReadSensorData(bm.sensorConfigs)
|
|
|
|
// 更新设备状态并上报事件
|
|
bm.updateDeviceStatuses(sensorDataList, readErrors)
|
|
|
|
// 批量处理传感器数据
|
|
processedDataList, processErrors := bm.dataProcessor.BatchProcessSensorData(sensorDataList)
|
|
|
|
if len(processErrors) > 0 {
|
|
logger.Logger.Printf("Partial data processing failed, error count: %d", len(processErrors))
|
|
for _, err := range processErrors {
|
|
logger.Logger.Printf("Processing error: %v", err)
|
|
}
|
|
}
|
|
|
|
// 添加调试信息,检查是否为第一次采集
|
|
logger.Logger.Printf("DEBUG: Is first collection: %v", bm.isFirstCollect)
|
|
|
|
// 如果是第一次采集,上报整体状态
|
|
if bm.isFirstCollect {
|
|
logger.Logger.Printf("DEBUG: Processing first collection status report")
|
|
bm.reportFirstCollectionStatus(sensorDataList, readErrors, processedDataList)
|
|
bm.isFirstCollect = false // 标记为已完成第一次采集
|
|
logger.Logger.Printf("DEBUG: First collection status report completed, isFirstCollect set to: %v", bm.isFirstCollect)
|
|
}
|
|
|
|
// 上报指标数据
|
|
if len(processedDataList) > 0 {
|
|
if err := bm.reportManager.ReportMetricData(processedDataList, bm.sensorConfigs); err != nil {
|
|
logger.Logger.Printf("Failed to report metric data: %v", err)
|
|
}
|
|
}
|
|
|
|
// 统计信息
|
|
successCount := 0
|
|
for _, data := range processedDataList {
|
|
if bm.dataProcessor.IsDataValid(data) {
|
|
successCount++
|
|
}
|
|
}
|
|
|
|
totalTime := time.Since(startTime)
|
|
logger.Logger.Printf("Data collection completed - Success: %d/%d, Read errors: %d, Process errors: %d, Total time: %v",
|
|
successCount, len(bm.sensorConfigs), len(readErrors), len(processErrors), totalTime)
|
|
|
|
return nil
|
|
}
|
|
|
|
// updateDeviceStatuses 更新所有设备状态
|
|
func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, readErrors []error) {
|
|
// 创建错误映射表
|
|
errorMap := make(map[string]string)
|
|
for _, err := range readErrors {
|
|
errorMap["unknown"] = err.Error()
|
|
}
|
|
|
|
// 更新成功读取的设备状态
|
|
for _, sensorData := range sensorDataList {
|
|
bm.reportManager.UpdateAndReportDeviceStatus(
|
|
sensorData.DeviceUUID,
|
|
true,
|
|
"",
|
|
sensorData.SensorType,
|
|
)
|
|
}
|
|
|
|
// 更新读取失败的设备状态
|
|
for _, config := range bm.sensorConfigs {
|
|
found := false
|
|
for _, sensorData := range sensorDataList {
|
|
if sensorData.DeviceUUID == config.DeviceUUID {
|
|
found = true
|
|
break
|
|
}
|
|
}
|
|
|
|
if !found {
|
|
// 设备读取失败
|
|
errMsg := "Device read timeout or communication failure"
|
|
if len(readErrors) > 0 {
|
|
errMsg = readErrors[0].Error()
|
|
}
|
|
|
|
bm.reportManager.UpdateAndReportDeviceStatus(
|
|
config.DeviceUUID,
|
|
false,
|
|
errMsg,
|
|
config.GetSensorType(),
|
|
)
|
|
}
|
|
}
|
|
}
|
|
|
|
// reportFirstCollectionStatus 上报第一次采集的系统状态
|
|
|
|
func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []error, processedDataList []*ProcessedSensorData) {
|
|
|
|
successCount := len(processedDataList)
|
|
totalCount := len(bm.sensorConfigs)
|
|
|
|
logger.Logger.Printf("=== System First Collection Status Report ===")
|
|
logger.Logger.Printf("Total devices: %d, Successfully collected and processed: %d", totalCount, successCount)
|
|
|
|
// 收集成功设备的 UUID
|
|
successUUIDs := make([]string, 0, successCount)
|
|
for _, data := range processedDataList {
|
|
successUUIDs = append(successUUIDs, data.DeviceUUID)
|
|
}
|
|
|
|
// 统一时间戳
|
|
timestamp := strconv.FormatInt(time.Now().Unix(), 10)
|
|
|
|
// 事件数组
|
|
var events []map[string]interface{}
|
|
|
|
for _, uuid := range successUUIDs {
|
|
sensorName := "Unknown Sensor"
|
|
for _, cfg := range bm.sensorConfigs {
|
|
if cfg.DeviceUUID == uuid {
|
|
sensorName = GetSensorTypeName(cfg.GetSensorType())
|
|
break
|
|
}
|
|
}
|
|
|
|
events = append(events, map[string]interface{}{
|
|
"description": "Sensor online",
|
|
"timestamp": timestamp,
|
|
"host_uuid": bm.hostUUID,
|
|
"deviceuuid": uuid,
|
|
"level": "online",
|
|
"sensor_type": sensorName,
|
|
})
|
|
}
|
|
|
|
if len(events) == 0 {
|
|
logger.Logger.Printf("No successful devices in first collection, skipping report")
|
|
return
|
|
}
|
|
|
|
metric := map[string]interface{}{
|
|
"type": "event",
|
|
"args": events,
|
|
}
|
|
|
|
routeKey := fmt.Sprintf("/dthjjc/device/%s/event", bm.hostUUID)
|
|
|
|
fullMsg := map[string]interface{}{
|
|
"method": "metric_data",
|
|
"params": map[string]interface{}{
|
|
"route_key": routeKey,
|
|
"metric": metric,
|
|
},
|
|
}
|
|
if b, err := json.Marshal(fullMsg); err == nil {
|
|
logger.Logger.Printf("First collection batch online events: %s", string(b))
|
|
}
|
|
|
|
// 发送
|
|
if err := connect.ReportMetricData(routeKey, metric); err != nil {
|
|
logger.Logger.Printf("Failed to report first batch online events: %v", err)
|
|
} else {
|
|
logger.Logger.Printf("First collection successful, batch reported %d devices online", len(events))
|
|
}
|
|
}
|
|
|
|
func (bm *BusinessManager) GetReportManager() *ReportManager {
|
|
return bm.reportManager
|
|
}
|
|
|
|
// Close 关闭资源
|
|
func (bm *BusinessManager) Close() {
|
|
logger.Logger.Printf("Business manager resources released")
|
|
}
|