commit 189844c9bb4dad60f06735776f5aab509d813451 Author: gqc Date: Mon Nov 24 18:06:00 2025 +0800 Initial commit diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/Dynamic_environmental_detection.iml b/.idea/Dynamic_environmental_detection.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/Dynamic_environmental_detection.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..1a8022f --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/README.md b/README.md new file mode 100644 index 0000000..d9dcf36 --- /dev/null +++ b/README.md @@ -0,0 +1,180 @@ +# 设备数据采集与上报协议完整文档 + +> 版本:v1.0  更新日期:2025-11-19 +### 1. service.conf 采集项下发(平台 → 设备) +```json +[ + { + "host_uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "device_uuid": "deviceuuid1", + "task_id": "6588", + "address": "房间东侧", + "type": "2", + "facility": "01" + }, + { + "host_uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "device_uuid": "deviceuuid2", + "task_id": "6589", + "address": "房间南侧", + "type": "1", + "facility": "02" + }, + { + "host_uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "device_uuid": "deviceuuid3", + "task_id": "6590", + "address": "房间西侧", + "type": "smog", + "facility": "03" + } +] +``` +字段名 | 类型 | 必填 | 说明 | 示例值 +-------------|--------|------|------------------------------------|-------------------------------------- +host_uuid | string | 是 | 网关/采集主机唯一ID | qbxmjyzrkpntfgswaevodhluicqzxplkm +device_uuid | string | 是 | 传感器设备唯一ID | deviceuuid1 +task_id | string | 是 | 平台任务ID,用于追踪和撤销 | 6588 +address | string | 是 | 安装位置描述 | 房间东侧 +type | string | 是 | 传感器类型 | "1"→温湿度,"2"→水浸传感器,"3"→烟雾 +facility | string | 是 | 设施点位编码(固定2位) | 01、02、03 +### 2. 上报数据(metric_data) +```json +{ + "method": "metric_data", + "params": { + "route_key": "/dthjjc/device/3eeba8usb86c0gs448oktxay41a0rm1v/data", + "metric": { + "type": "data", + "args": [ + { + "deviceuuid": "deviceuuid1", + "type": "1", + "facility": "2", + "address": "房间东侧", + "value": { "temp": "25.3", "humidity": "12.5" } + }, + { + "deviceuuid": "deviceuuid2", + "type": "2", + "facility": "1", + "address": "房间南侧", + "value": "0" + }, + { + "deviceuuid": "deviceuuid3", + "type": "3", + "facility": "3", + "address": "房间西侧", + "value": "0" + } + ] + } + } +} +``` +层级 | 字段名 | 类型 | 必填 | 说明 +------------------------|---------------|-----------------|------|----------------------------------------- +根对象 | method | string | 是 | 固定值 "metric_data" +params | route_key | string | 是 | 格式:/dthjjc/device/{host_uuid}/data +params.metric | type | string | 是 | 固定值 "data" +params.metric.args | - | array | 是 | 每项对应一个设备 +args[i] | deviceuuid | string | 是 | 设备唯一ID +args[i] | type | string | 是 | 传感器类型 +args[i] | facility | string | 是 | 设施编码(两位) +args[i] | address | string | 是 | 安装位置 +args[i] | value | string/array | 是 | 实际值,格式见下表 + +value 格式对照表 +type | 传感器类型 | value 类型 | 示例值 | 说明 +-------|----------------|----------------|-------------------------------------------|---------------------- +1 | 温湿度传感器 | array[object] | [{"temp":"25.3","humidity":"12.5"}] | 必须包含temp和humidity +2 | 水浸传感器 | string | "0" 或 "1" | 0=正常 1=报警 +3/smog | 烟雾传感器 | string | "0" 或 "1" | 0=正常 1=报警 + +### 3. 上报事件(event_data) + +# 3.1 离线报警示例 +```json +{ + "method": "event_data", + "params": { + "route_key": "/dthjjc/device/deviceuuid3/event", + "metric": { + "type": "event", + "args": { + "description": "The sensor is offline", + "timestamp": "1763544662", + "host_uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "device_uuid": "device3", + "level": "alarm", + "last_successful_time": "1763544625", + "sensor_type": "SmokeTransducer(3)", + "recommendation": "Check device connection and communication line" + } + } + } +} +``` +# 3.2 恢复在线示例 +```json +{ + "method": "event_data", + "params": { + "route_key": "/dthjjc/device/deviceuuid3/event", + "metric": { + "type": "event", + "args": { + "description": "Sensor online", + "timestamp": "1763544662", + "host_uuid": "qbxmjyzrkpntfgswaevodhluicqzxplkm", + "device_uuid": "device3", + "level": "online", + "sensor_type": "SmokeTransducer(3)" + } + } + } +} + +``` +# 3.3 首次执行采集任务之后上报状态 +```json +{ + "method": "metric_data", + "params": { + "metric": { + "args": { + "description": "Environmental detection system started successfully", + "timestamp": "1763644447", + "host_uuid": "pemsyszrkpntfgswaevodhluicqzxplkm", + "level": "system_start", + "device_count": 3, + "online_count": 3 + }, + "type": "event" + }, + "route_key": "/dthjjc/device/pemsyszrkpntfgswaevodhluicqzxplkm/event" + } +} +``` + +event_data 通用字段说明 +字段名 | 类型 | 必填 | 说明 +----------------------|--------|------|------------------------------------------- +description | string | 是 | 事件描述 +timestamp | string | 是 | UNIX 时间戳(秒) +host_uuid | string | 是 | 主机UUID +device_uuid | string | 是 | 设备UUID +level | string | 是 | alarm / online 等 +last_successful_time | string | 否 | 仅离线时填写 +sensor_type | string | 否 | 传感器类型描述 +recommendation | string | 否 | 处理建议 +device_count | string | 否 | 检测设备数量 +online_count | string | 否 | 当前在线数量 + +### 4. 协议总览 + +上报类型 | method | route_key 后缀 | 说明 +-------------|----------------|----------------|------------------------------------ +普通数据上报 | metric_data | /data | 定时上报温湿度、水浸传感器、烟雾等数值 +事件上报 | event_data | /event | 设备离线、恢复在线、报警等事件 \ No newline at end of file diff --git a/business/config_parser.go b/business/config_parser.go new file mode 100644 index 0000000..ead81a6 --- /dev/null +++ b/business/config_parser.go @@ -0,0 +1,70 @@ +package business + +import ( + "encoding/json" + "fmt" + "os" +) + +// SensorType 传感器类型 + +// SensorConfig 传感器配置 +type SensorConfig struct { + HostUUID string `json:"host_uuid"` + DeviceUUID string `json:"device_uuid"` + TaskID string `json:"task_id"` + Address string `json:"address"` + Type int `json:"type"` // 改为int类型 + Facility string `json:"facility"` + SlaveID int `json:"slave_id"` +} + +// GetSensorType 获取传感器类型枚举 +func (sc *SensorConfig) GetSensorType() SensorType { + return SensorType(sc.Type) +} + +// ParseServiceConfig 解析service.conf配置文件 +func ParseServiceConfig(filePath string) ([]SensorConfig, error) { + data, err := os.ReadFile(filePath) + if err != nil { + return nil, fmt.Errorf("Failed to read configuration file: %v", err) + } + + var configs []SensorConfig + if err := json.Unmarshal(data, &configs); err != nil { + return nil, fmt.Errorf("Failed to parse JSON: %v", err) + } + + return configs, nil +} + +// ValidateConfig 验证配置有效性 +func ValidateConfig(configs []SensorConfig) error { + for i, config := range configs { + if config.DeviceUUID == "" { + return fmt.Errorf("Configuration %d: device_uuid is empty", i+1) + } + if config.TaskID == "" { + return fmt.Errorf("Configuration %d: task_id is empty", i+1) + } + if config.Type < 1 || config.Type > 3 { + return fmt.Errorf("Configuration %d: invalid type value: %d", i+1, config.Type) + } + } + return nil +} + +// GetSensorTypeName 获取传感器类型名称 +func GetSensorTypeName(sensorType SensorType) string { + switch sensorType { + case SensorTypeTemperatureHumidity: + return "温湿度传感器" + case SensorTypeWaterLeak: + return "水浸传感器" + case SensorTypeSmoke: + return "烟雾传感器" + default: + return "未知传感器" + } +} diff --git a/business/data_collector.go b/business/data_collector.go new file mode 100644 index 0000000..7472a13 --- /dev/null +++ b/business/data_collector.go @@ -0,0 +1,146 @@ +package business + +import ( + "fmt" + "strconv" + "time" + + "Dynamic_environmental_detection/logger" +) + +type SensorData struct { + DeviceUUID string + TaskID string + Address string + SensorType SensorType + Facility string + RawData []byte + Timestamp time.Time + Error error +} + +// DataCollector 数据采集器 +type DataCollector struct { + modbusConfig *ModbusConfig + lastReadTime time.Time +} + +// NewDataCollector 创建数据采集器 +func NewDataCollector(config *ModbusConfig) *DataCollector { + return &DataCollector{ + modbusConfig: config, + lastReadTime: time.Now().Add(-config.SensorInterval), + } +} + +// ReadSensorData 读取单个传感器数据(使用新的Modbus客户端) +func (dc *DataCollector) ReadSensorData(sensorConfig SensorConfig) (*SensorData, error) { + // 确保通信间隔 + dc.ensureCommunicationInterval() + + // 获取寄存器配置(只包含地址和数量) + registerConfig, err := GetRegisterConfig(sensorConfig.GetSensorType()) + if err != nil { + return nil, fmt.Errorf("Failed to get register configuration: %v", err) + } + + // 关键:直接把 facility 当作 Modbus 从站地址(SlaveID) + slaveID, err := strconv.Atoi(sensorConfig.Facility) + if err != nil || slaveID < 1 || slaveID > 247 { + return nil, fmt.Errorf("Invalid facility value, cannot be used as SlaveID: %s (DeviceUUID: %s)", + sensorConfig.Facility, sensorConfig.DeviceUUID) + } + slaveIDByte := byte(slaveID) + + // 日志:用真实的 slaveID(来自 facility) + logger.Logger.Printf("Reading sensor data - Device: %s, SlaveID: %d (from facility=%s), Addr: %d, Count: %d", + sensorConfig.DeviceUUID, slaveID, sensorConfig.Facility, + registerConfig.RegisterAddress, registerConfig.RegisterCount) + + // 使用 facility 作为 SlaveID 创建客户端 + client, err := NewDebugModbusClient(dc.modbusConfig, slaveIDByte) + if err != nil { + return nil, fmt.Errorf("Failed to create Modbus client: %v", err) + } + defer client.Close() + + // 读取寄存器数据 + rawData, err := client.ReadHoldingRegisters( + registerConfig.RegisterAddress, + registerConfig.RegisterCount, + ) + + if err != nil { + return nil, fmt.Errorf("Failed to read sensor data (DeviceUUID: %s): %v", + sensorConfig.DeviceUUID, err) + } + + sensorData := &SensorData{ + DeviceUUID: sensorConfig.DeviceUUID, + TaskID: sensorConfig.TaskID, + Address: sensorConfig.Address, + SensorType: sensorConfig.GetSensorType(), + Facility: sensorConfig.Facility, + RawData: rawData, + Timestamp: time.Now(), + } + + // 成功日志也用真实的 slaveID + logger.Logger.Printf("Successfully read sensor data - DeviceUUID: %s, SlaveID: %d (from facility=%s), RawData: %v", + sensorConfig.DeviceUUID, slaveID, sensorConfig.Facility, rawData) + + // 更新最后读取时间 + dc.lastReadTime = time.Now() + + return sensorData, nil +} + +// ensureCommunicationInterval 确保通信间隔 +func (dc *DataCollector) ensureCommunicationInterval() { + elapsed := time.Since(dc.lastReadTime) + if elapsed < dc.modbusConfig.SensorInterval { + sleepTime := dc.modbusConfig.SensorInterval - elapsed + logger.Logger.Printf("Waiting for 485 communication interval: %v", sleepTime) + time.Sleep(sleepTime) + } +} + +// BatchReadSensorData 批量读取所有传感器数据 +func (dc *DataCollector) BatchReadSensorData(sensorConfigs []SensorConfig) ([]*SensorData, []error) { + var results []*SensorData + var errors []error + + // 按传感器类型分组,优化读取顺序 + groupedConfigs := groupSensorsByType(sensorConfigs) + + for _, configs := range groupedConfigs { + for _, config := range configs { + sensorData, err := dc.ReadSensorData(config) + if err != nil { + errors = append(errors, err) + + // 如果连续错误,增加等待时间 + if len(errors) > 2 { + logger.Logger.Printf("Detected consecutive communication errors, increasing bus stabilization time") + time.Sleep(dc.modbusConfig.BusResetDelay * 2) + } + continue + } + results = append(results, sensorData) + } + } + + return results, errors +} + +// groupSensorsByType 按传感器类型分组 +func groupSensorsByType(configs []SensorConfig) map[SensorType][]SensorConfig { + groups := make(map[SensorType][]SensorConfig) + + for _, config := range configs { + sensorType := config.GetSensorType() + groups[sensorType] = append(groups[sensorType], config) + } + + return groups +} diff --git a/business/data_processor.go b/business/data_processor.go new file mode 100644 index 0000000..b2b3d19 --- /dev/null +++ b/business/data_processor.go @@ -0,0 +1,131 @@ +package business + +import ( + "encoding/binary" + "fmt" + "math" + + "Dynamic_environmental_detection/logger" +) + +// ProcessedSensorData 处理后的传感器数据 +type ProcessedSensorData struct { + DeviceUUID string `json:"device_uuid"` + TaskID string `json:"task_id"` + Address string `json:"address"` + SensorType SensorType `json:"sensor_type"` + Facility string `json:"facility"` + Values []float64 `json:"values"` // 处理后的数值 + Status int `json:"status"` // 状态值 (0或1) + Timestamp int64 `json:"timestamp"` // 时间戳 + Error string `json:"error,omitempty"` +} + +// DataProcessor 数据处理器 +type DataProcessor struct{} + +// NewDataProcessor 创建数据处理器 +func NewDataProcessor() *DataProcessor { + return &DataProcessor{} +} + +// ProcessSensorData 处理传感器原始数据 +func (dp *DataProcessor) ProcessSensorData(sensorData *SensorData) (*ProcessedSensorData, error) { + processed := &ProcessedSensorData{ + DeviceUUID: sensorData.DeviceUUID, + TaskID: sensorData.TaskID, + Address: sensorData.Address, + SensorType: sensorData.SensorType, + Facility: sensorData.Facility, + Timestamp: sensorData.Timestamp.Unix(), + } + + if sensorData.Error != nil { + processed.Error = sensorData.Error.Error() + return processed, sensorData.Error + } + + switch sensorData.SensorType { + case SensorTypeTemperatureHumidity: + // 温湿度传感器:2个寄存器,分别表示温度和湿度 + if len(sensorData.RawData) >= 4 { + // 每个寄存器2字节,大端序 + temperature := float64(binary.BigEndian.Uint16(sensorData.RawData[0:2])) + humidity := float64(binary.BigEndian.Uint16(sensorData.RawData[2:4])) + + // 转换为实际值(根据传感器规格调整转换公式) + temperature = temperature / 10.0 // 假设原始值需要除以10 + humidity = humidity / 10.0 // 假设原始值需要除以10 + + processed.Values = []float64{temperature, humidity} + logger.Logger.Printf("Temperature and Humidity Sensor Data - Temperature: %.1f°C, humidity: %.1f%%", + temperature, humidity) + } else { + return nil, fmt.Errorf("Insufficient data length for temperature and humidity sensor: %d", len(sensorData.RawData)) + } + + case SensorTypeWaterLeak, SensorTypeSmoke: + // 水浸和烟雾传感器:1个寄存器,返回状态值 + if len(sensorData.RawData) >= 2 { + status := int(binary.BigEndian.Uint16(sensorData.RawData[0:2])) + processed.Status = status + processed.Values = []float64{float64(status)} + + sensorName := GetSensorTypeName(sensorData.SensorType) + statusText := "online" + if status == 1 { + statusText = "alarm" + } + logger.Logger.Printf("%sstate - %s: %d (%s)", + sensorName, sensorData.DeviceUUID, status, statusText) + } else { + return nil, fmt.Errorf("%sInsufficient sensor data length: %d", + GetSensorTypeName(sensorData.SensorType), len(sensorData.RawData)) + } + + default: + return nil, fmt.Errorf("Unknown sensor type: %d", sensorData.SensorType) + } + + return processed, nil +} + +// BatchProcessSensorData 批量处理传感器数据 +func (dp *DataProcessor) BatchProcessSensorData(sensorDataList []*SensorData) ([]*ProcessedSensorData, []error) { + var results []*ProcessedSensorData + var errors []error + + for _, sensorData := range sensorDataList { + processed, err := dp.ProcessSensorData(sensorData) + if err != nil { + errors = append(errors, err) + continue + } + results = append(results, processed) + } + + return results, errors +} + +// IsDataValid 检查数据是否有效 +func (dp *DataProcessor) IsDataValid(processedData *ProcessedSensorData) bool { + switch processedData.SensorType { + case SensorTypeTemperatureHumidity: + // 温湿度合理性检查 + if len(processedData.Values) != 2 { + return false + } + temperature := processedData.Values[0] + humidity := processedData.Values[1] + return !math.IsNaN(temperature) && !math.IsNaN(humidity) && + temperature >= -50 && temperature <= 100 && + humidity >= 0 && humidity <= 100 + + case SensorTypeWaterLeak, SensorTypeSmoke: + // 状态传感器检查 + return processedData.Status == 0 || processedData.Status == 1 + + default: + return false + } +} diff --git a/business/data_types.go b/business/data_types.go new file mode 100644 index 0000000..256c189 --- /dev/null +++ b/business/data_types.go @@ -0,0 +1,56 @@ +package business + +import "time" + +// ReportData 上报数据结构 +type ReportData struct { + Method string `json:"method"` + Params ReportParams `json:"params"` +} + +type ReportParams struct { + RouteKey string `json:"route_key"` + Metric interface{} `json:"metric"` // 使用interface{}来支持不同类型 +} + +// ReportMetricData 数据上报指标 +type ReportMetricData struct { + Type string `json:"type"` + Args []DeviceData `json:"args"` +} + +// ReportMetricEvent 事件上报指标 +type ReportMetricEvent struct { + Type string `json:"type"` + Args EventData `json:"args"` +} + +// DeviceData 设备数据(用于metric_data) +type DeviceData struct { + DeviceUUID string `json:"deviceuuid"` + Type string `json:"type"` + Facility string `json:"facility"` + Address string `json:"address"` + Value any `json:"value"` // string 或 array +} + +type EventData struct { + Description string `json:"description"` + Timestamp string `json:"timestamp"` + HostUUID string `json:"host_uuid"` + DeviceUUID string `json:"device_uuid"` + Level string `json:"level"` // "online" 或 "alarm" + LastSuccessfulTime string `json:"last_successful_time,omitempty"` + SensorType string `json:"sensor_type,omitempty"` + Recommendation string `json:"recommendation,omitempty"` +} + +// DeviceStatus 设备状态管理 +type DeviceStatus struct { + DeviceUUID string + LastSuccessTime time.Time + LastReportTime time.Time + IsOnline bool + ErrorCount int + LastError string +} diff --git a/business/debug_handler.go b/business/debug_handler.go new file mode 100644 index 0000000..8222e40 --- /dev/null +++ b/business/debug_handler.go @@ -0,0 +1,211 @@ +package business + +import ( + "encoding/hex" + "fmt" + "strconv" + "strings" + "time" + + "Dynamic_environmental_detection/logger" +) + +// DebugHandler 调试处理器 +type DebugHandler struct { + modbusConfig *ModbusConfig +} + +// NewDebugHandler 创建调试处理器 +func NewDebugHandler() *DebugHandler { + return &DebugHandler{ + modbusConfig: DefaultModbusConfig(), + } +} + +// TestDevice 测试单个设备 +func (dh *DebugHandler) TestDevice(slaveIDStr, sensorTypeStr string) error { + // 解析参数 + slaveID, err := strconv.ParseUint(slaveIDStr, 10, 8) + if err != nil { + return fmt.Errorf("Invalid slave address: %s", slaveIDStr) + } + + sensorTypeInt, err := strconv.Atoi(sensorTypeStr) + if err != nil { + return fmt.Errorf("Invalid sensor type: %s", sensorTypeStr) + } + + sensorType := SensorType(sensorTypeInt) + if sensorType < 1 || sensorType > 3 { + return fmt.Errorf("Sensor type must be between 1-3") + } + + logger.Logger.Printf("=== Starting Debug Mode ===") + logger.Logger.Printf("Slave ID: %d, Sensor Type: %s", slaveID, GetSensorTypeName(sensorType)) + + // 获取寄存器配置 + registerConfig, err := GetRegisterConfig(sensorType) + if err != nil { + return fmt.Errorf("Failed to get register configuration: %v", err) + } + + // 创建调试客户端 + client, err := NewDebugModbusClient(dh.modbusConfig, byte(slaveID)) + if err != nil { + return fmt.Errorf("Failed to create Modbus client: %v", err) + } + defer client.Close() + + // 读取数据 + logger.Logger.Printf("Reading registers - Address: %d, Count: %d", registerConfig.RegisterAddress, registerConfig.RegisterCount) + + rawData, err := client.ReadHoldingRegisters( + registerConfig.RegisterAddress, + registerConfig.RegisterCount, + ) + + if err != nil { + return fmt.Errorf("Read failed: %v", err) + } + + // 显示原始响应 + dh.printRawResponse(rawData, byte(slaveID)) + + // 解析数据 + processedData, err := dh.parseSensorData(rawData, sensorType, byte(slaveID)) + if err != nil { + return fmt.Errorf("Data parsing failed: %v", err) + } + + // 显示解析结果 + dh.printParsedData(processedData, sensorType) + + logger.Logger.Printf("=== Debug Completed ===") + return nil +} + +// TestRawCommand 测试原始Modbus命令(简化版) +func (dh *DebugHandler) TestRawCommand(hexCommand string) error { + logger.Logger.Printf("=== Raw Command Debug Mode ===") + logger.Logger.Printf("Sending command: %s", hexCommand) + + // 解码十六进制命令 + command, err := hex.DecodeString(hexCommand) + if err != nil { + return fmt.Errorf("Command format error: %v", err) + } + + if len(command) < 6 { + return fmt.Errorf("Command length insufficient, minimum 6 bytes required") + } + + slaveID := command[0] + functionCode := command[1] + + // 只支持读保持寄存器功能码 + if functionCode != 0x03 { + return fmt.Errorf("Only read holding registers function code (0x03) is supported, current: 0x%02X", functionCode) + } + + address := uint16(command[2])<<8 | uint16(command[3]) + quantity := uint16(command[4])<<8 | uint16(command[5]) + + logger.Logger.Printf("Parsed command - SlaveID: %d, Function Code: 0x%02X, Address: %d, Quantity: %d", + slaveID, functionCode, address, quantity) + + // 创建调试客户端 + client, err := NewDebugModbusClient(dh.modbusConfig, slaveID) + if err != nil { + return fmt.Errorf("Failed to create Modbus client: %v", err) + } + defer client.Close() + + // 读取寄存器 + rawData, err := client.ReadHoldingRegisters(address, quantity) + if err != nil { + return fmt.Errorf("Read failed: %v", err) + } + + // 显示原始响应 + dh.printRawResponse(rawData, slaveID) + + logger.Logger.Printf("=== Raw Command Debug Completed ===") + return nil +} + +// printRawResponse 打印原始响应 +func (dh *DebugHandler) printRawResponse(rawData []byte, slaveID byte) { + logger.Logger.Printf("Response received (%d bytes):", len(rawData)) + + // 格式化的十六进制输出 + hexParts := make([]string, len(rawData)) + for i, b := range rawData { + hexParts[i] = fmt.Sprintf("%02X", b) + } + logger.Logger.Printf("Hexadecimal: %s", strings.Join(hexParts, " ")) + + // ASCII显示 + asciiParts := make([]string, len(rawData)) + for i, b := range rawData { + if b >= 32 && b <= 126 { + asciiParts[i] = string(b) + } else { + asciiParts[i] = "." + } + } + logger.Logger.Printf("ASCII: %s", strings.Join(asciiParts, "")) +} + +// parseSensorData 解析传感器数据 +func (dh *DebugHandler) parseSensorData(rawData []byte, sensorType SensorType, slaveID byte) (*ProcessedSensorData, error) { + // 创建临时的SensorData用于处理 + sensorData := &SensorData{ + DeviceUUID: fmt.Sprintf("debug_device_%d", slaveID), + TaskID: "debug", + Address: "Debug Location", + SensorType: sensorType, + Facility: "00", + RawData: rawData, + Timestamp: time.Now(), + } + + // 使用数据处理器 + processor := NewDataProcessor() + processedData, err := processor.ProcessSensorData(sensorData) + if err != nil { + return nil, err + } + + return processedData, nil +} + +// printParsedData 打印解析结果 +func (dh *DebugHandler) printParsedData(data *ProcessedSensorData, sensorType SensorType) { + logger.Logger.Printf("=== Data Parsing Results ===") + logger.Logger.Printf("Sensor Type: %s", GetSensorTypeName(sensorType)) + logger.Logger.Printf("Timestamp: %s", time.Unix(data.Timestamp, 0).Format("2006-01-02 15:04:05")) + + switch sensorType { + case SensorTypeTemperatureHumidity: + if len(data.Values) >= 2 { + logger.Logger.Printf("Temperature: %.1f°C", data.Values[0]) + logger.Logger.Printf("Humidity: %.1f%%", data.Values[1]) + } + case SensorTypeWaterLeak: + statusText := "Normal" + if data.Status == 1 { + statusText = "Alarm" + } + logger.Logger.Printf("Water Leak Status: %d (%s)", data.Status, statusText) + case SensorTypeSmoke: + statusText := "Normal" + if data.Status == 1 { + statusText = "Alarm" + } + logger.Logger.Printf("Smoke Status: %d (%s)", data.Status, statusText) + } + + if data.Error != "" { + logger.Logger.Printf("Parsing Error: %s", data.Error) + } +} diff --git a/business/debug_modbus.go b/business/debug_modbus.go new file mode 100644 index 0000000..852c344 --- /dev/null +++ b/business/debug_modbus.go @@ -0,0 +1,69 @@ +package business + +import ( + "fmt" + "time" + + "Dynamic_environmental_detection/logger" + "github.com/goburrow/modbus" +) + +// DebugModbusClient 调试版Modbus客户端 +type DebugModbusClient struct { + client modbus.Client + handler *modbus.RTUClientHandler + slaveID byte +} + +// NewDebugModbusClient 创建调试版Modbus客户端 +func NewDebugModbusClient(config *ModbusConfig, slaveID byte) (*DebugModbusClient, error) { + logger.Logger.Printf("DEBUG: Create a Modbus client - SlaveID: %d, port: %s", slaveID, config.Port) + + handler := modbus.NewRTUClientHandler(config.Port) + handler.BaudRate = config.BaudRate + handler.DataBits = config.DataBits + handler.StopBits = config.StopBits + handler.Parity = config.Parity + handler.Timeout = config.Timeout + handler.SlaveId = slaveID + + logger.Logger.Printf("DEBUG: Attempt to connect to the Modbus device...") + if err := handler.Connect(); err != nil { + logger.Logger.Printf("DEBUG: Modbus connection failed: %v", err) + return nil, fmt.Errorf("Failed to connect to Modbus device: %v", err) + } + + logger.Logger.Printf("DEBUG: Modbus connection successful!") + return &DebugModbusClient{ + client: modbus.NewClient(handler), + handler: handler, + slaveID: slaveID, + }, nil +} + +// ReadHoldingRegisters 读取保持寄存器(带调试信息) +func (d *DebugModbusClient) ReadHoldingRegisters(address, quantity uint16) ([]byte, error) { + logger.Logger.Printf("DEBUG: Send Modbus request - SlaveID: %d, Function code: 0x03, Address: %d, quantity: %d", + d.slaveID, address, quantity) + + startTime := time.Now() + data, err := d.client.ReadHoldingRegisters(address, quantity) + elapsed := time.Since(startTime) + + if err != nil { + logger.Logger.Printf("DEBUG: Failed to read - time-consuming: %v, Error: %v", elapsed, err) + return nil, err + } + + logger.Logger.Printf("DEBUG: Reading successful - time-consuming: %v, Data length: %d, Data %X", + elapsed, len(data), data) + return data, nil +} + +// Close 关闭连接 +func (d *DebugModbusClient) Close() { + if d.handler != nil { + d.handler.Close() + logger.Logger.Printf("DEBUG: Modbus connection has been closed.") + } +} diff --git a/business/fault_detector.go b/business/fault_detector.go new file mode 100644 index 0000000..7919f9e --- /dev/null +++ b/business/fault_detector.go @@ -0,0 +1,163 @@ +package business + +import ( + "Dynamic_environmental_detection/logger" + "fmt" + "strconv" + "sync" + "time" +) + +// FaultDetector 故障检测器 +type FaultDetector struct { + deviceStatuses map[string]*DeviceStatus + mutex sync.RWMutex + hostUUID string +} + +// NewFaultDetector 创建故障检测器 +func NewFaultDetector(hostUUID string) *FaultDetector { + return &FaultDetector{ + deviceStatuses: make(map[string]*DeviceStatus), + hostUUID: hostUUID, + } +} + +// UpdateDeviceStatus 更新设备状态 +func (fd *FaultDetector) UpdateDeviceStatus(deviceUUID string, success bool, errMsg string, sensorType SensorType) (needReport bool, event *EventData) { + fd.mutex.Lock() + defer fd.mutex.Unlock() + + // 获取或创建设备状态 + status, exists := fd.deviceStatuses[deviceUUID] + if !exists { + status = &DeviceStatus{ + DeviceUUID: deviceUUID, + IsOnline: true, + } + fd.deviceStatuses[deviceUUID] = status + } + + currentTime := time.Now() + needReport = false + + if success { + // 设备读取成功 + status.LastSuccessTime = currentTime + status.ErrorCount = 0 + status.LastError = "" + + if !status.IsOnline { + // 设备从离线恢复在线 + status.IsOnline = true + needReport = true + event = fd.createOnlineEvent(deviceUUID, sensorType, currentTime) + logger.Logger.Printf("Equipment restored to operational status: %s", deviceUUID) + } + } else { + // 设备读取失败 + status.ErrorCount++ + status.LastError = errMsg + + if status.IsOnline && status.ErrorCount >= 3 { + // 设备从在线变为离线(连续3次失败) + status.IsOnline = false + needReport = true + event = fd.createAlarmEvent(deviceUUID, sensorType, status.LastSuccessTime, currentTime, errMsg) + logger.Logger.Printf("Equipment Offline Alarm: %s, Error: %s", deviceUUID, errMsg) + } + } + + status.LastReportTime = currentTime + return needReport, event +} + +// createOnlineEvent 创建设备在线事件 +func (fd *FaultDetector) createOnlineEvent(deviceUUID string, sensorType SensorType, timestamp time.Time) *EventData { + return &EventData{ + Description: "Sensor online", + Timestamp: strconv.FormatInt(timestamp.Unix(), 10), + HostUUID: fd.hostUUID, + DeviceUUID: deviceUUID, + Level: "online", + SensorType: GetSensorTypeDescription(sensorType), + } +} + +// createAlarmEvent 创建设备报警事件 +func (fd *FaultDetector) createAlarmEvent(deviceUUID string, sensorType SensorType, lastSuccessTime, timestamp time.Time, errMsg string) *EventData { + return &EventData{ + Description: fmt.Sprintf("Sensor communication failure: %s", errMsg), + Timestamp: strconv.FormatInt(timestamp.Unix(), 10), + HostUUID: fd.hostUUID, + DeviceUUID: deviceUUID, + Level: "alarm", + LastSuccessfulTime: strconv.FormatInt(lastSuccessTime.Unix(), 10), + SensorType: GetSensorTypeDescription(sensorType), + Recommendation: "Check device connection and communication line", + } +} + +// GetOnlineDevices 获取在线设备列表 +func (fd *FaultDetector) GetOnlineDevices() []string { + fd.mutex.RLock() + defer fd.mutex.RUnlock() + + var onlineDevices []string + for deviceUUID, status := range fd.deviceStatuses { + if status.IsOnline { + onlineDevices = append(onlineDevices, deviceUUID) + } + } + return onlineDevices +} + +// GetDeviceStatus 获取设备状态 +func (fd *FaultDetector) GetDeviceStatus(deviceUUID string) (*DeviceStatus, bool) { + fd.mutex.RLock() + defer fd.mutex.RUnlock() + + status, exists := fd.deviceStatuses[deviceUUID] + return status, exists +} + +// CollectAndClearPendingEvents 收集所有待上报的事件并清空(供批量上报使用) +func (fd *FaultDetector) CollectAndClearPendingEvents() (alarms []EventData, onlines []EventData) { + fd.mutex.Lock() + defer fd.mutex.Unlock() + + current := time.Now() + timestamp := strconv.FormatInt(current.Unix(), 10) + + for deviceUUID, status := range fd.deviceStatuses { + // 检查是否有需要上报的离线恢复(原来逻辑是实时上报的,这里改为攒着批量发) + if !status.IsOnline && status.ErrorCount >= 3 && status.LastReportTime.Before(current.Add(-5*time.Second)) { + // 防止重复发,简单用时间判断 + alarms = append(alarms, EventData{ + Description: fmt.Sprintf("Sensor communication failure: %s", status.LastError), + Timestamp: timestamp, + HostUUID: fd.hostUUID, + DeviceUUID: deviceUUID, + Level: "alarm", + LastSuccessfulTime: strconv.FormatInt(status.LastSuccessTime.Unix(), 10), + SensorType: "未知", // 后面你采集时会带 sensorType,这里先占位 + Recommendation: "Check device connection and communication line", + }) + status.LastReportTime = current // 标记已处理 + } + + if status.IsOnline && status.LastSuccessTime.After(status.LastReportTime) { + // 如果最近一次成功采集时间 > 上次上报时间,说明刚刚恢复,需要发 online + onlines = append(onlines, EventData{ + Description: "Sensor online", + Timestamp: timestamp, + HostUUID: fd.hostUUID, + DeviceUUID: deviceUUID, + Level: "online", + SensorType: "未知", // 同上 + }) + status.LastReportTime = current + } + } + return alarms, onlines +} diff --git a/business/main_business.go b/business/main_business.go new file mode 100644 index 0000000..6c7aaac --- /dev/null +++ b/business/main_business.go @@ -0,0 +1,236 @@ +package business + +import ( + "Dynamic_environmental_detection/connect" + "encoding/json" + "fmt" + "strconv" + "time" + + "Dynamic_environmental_detection/logger" + "github.com/goburrow/modbus" +) + +// BusinessManager 业务管理器 +type BusinessManager struct { + modbusClient modbus.Client + modbusConfig *ModbusConfig + dataCollector *DataCollector + dataProcessor *DataProcessor + reportManager *ReportManager + sensorConfigs []SensorConfig + hostUUID string + isFirstCollect bool +} + +// NewBusinessManager 创建业务管理器 +func NewBusinessManager(sensorConfigs []SensorConfig) (*BusinessManager, error) { + if len(sensorConfigs) == 0 { + return nil, fmt.Errorf("sensor configuration is empty") + } + + // 从第一个配置获取host_uuid + hostUUID := sensorConfigs[0].HostUUID + + // 创建Modbus配置 + modbusConfig := DefaultModbusConfig() + + // 创建数据采集器、处理器和上报管理器 + dataCollector := NewDataCollector(modbusConfig) + dataProcessor := NewDataProcessor() + reportManager := NewReportManager(hostUUID) + + logger.Logger.Printf("DEBUG: Business manager created, isFirstCollect initialized to: true") + + return &BusinessManager{ + modbusConfig: modbusConfig, + dataCollector: dataCollector, + dataProcessor: dataProcessor, + reportManager: reportManager, + sensorConfigs: sensorConfigs, + hostUUID: hostUUID, + isFirstCollect: true, // 初始化第一次采集标志 + }, nil +} + +// StartDataCollection 启动数据采集和上报 +func (bm *BusinessManager) StartDataCollection() error { + logger.Logger.Printf("Starting sensor data collection, total sensors: %d", len(bm.sensorConfigs)) + startTime := time.Now() + + // 批量读取传感器数据 + sensorDataList, readErrors := bm.dataCollector.BatchReadSensorData(bm.sensorConfigs) + + // 更新设备状态并上报事件 + bm.updateDeviceStatuses(sensorDataList, readErrors) + + // 批量处理传感器数据 + processedDataList, processErrors := bm.dataProcessor.BatchProcessSensorData(sensorDataList) + + if len(processErrors) > 0 { + logger.Logger.Printf("Partial data processing failed, error count: %d", len(processErrors)) + for _, err := range processErrors { + logger.Logger.Printf("Processing error: %v", err) + } + } + + // 添加调试信息,检查是否为第一次采集 + logger.Logger.Printf("DEBUG: Is first collection: %v", bm.isFirstCollect) + + // 如果是第一次采集,上报整体状态 + if bm.isFirstCollect { + logger.Logger.Printf("DEBUG: Processing first collection status report") + bm.reportFirstCollectionStatus(sensorDataList, readErrors, processedDataList) + bm.isFirstCollect = false // 标记为已完成第一次采集 + logger.Logger.Printf("DEBUG: First collection status report completed, isFirstCollect set to: %v", bm.isFirstCollect) + } + + // 上报指标数据 + if len(processedDataList) > 0 { + if err := bm.reportManager.ReportMetricData(processedDataList, bm.sensorConfigs); err != nil { + logger.Logger.Printf("Failed to report metric data: %v", err) + } + } + + // 统计信息 + successCount := 0 + for _, data := range processedDataList { + if bm.dataProcessor.IsDataValid(data) { + successCount++ + } + } + + totalTime := time.Since(startTime) + logger.Logger.Printf("Data collection completed - Success: %d/%d, Read errors: %d, Process errors: %d, Total time: %v", + successCount, len(bm.sensorConfigs), len(readErrors), len(processErrors), totalTime) + + return nil +} + +// updateDeviceStatuses 更新所有设备状态 +func (bm *BusinessManager) updateDeviceStatuses(sensorDataList []*SensorData, readErrors []error) { + // 创建错误映射表 + errorMap := make(map[string]string) + for _, err := range readErrors { + // 从错误信息中提取deviceUUID(需要根据实际错误格式调整) + errorMap["unknown"] = err.Error() + } + + // 更新成功读取的设备状态 + for _, sensorData := range sensorDataList { + bm.reportManager.UpdateAndReportDeviceStatus( + sensorData.DeviceUUID, + true, + "", + sensorData.SensorType, + ) + } + + // 更新读取失败的设备状态 + for _, config := range bm.sensorConfigs { + found := false + for _, sensorData := range sensorDataList { + if sensorData.DeviceUUID == config.DeviceUUID { + found = true + break + } + } + + if !found { + // 设备读取失败 + errMsg := "Device read timeout or communication failure" + if len(readErrors) > 0 { + errMsg = readErrors[0].Error() + } + + bm.reportManager.UpdateAndReportDeviceStatus( + config.DeviceUUID, + false, + errMsg, + config.GetSensorType(), + ) + } + } +} + +// reportFirstCollectionStatus 上报第一次采集的系统状态 +// reportFirstCollectionStatus 上报第一次采集的系统状态(新协议:批量发送所有在线设备的 online 事件) +func (bm *BusinessManager) reportFirstCollectionStatus(_ []*SensorData, _ []error, processedDataList []*ProcessedSensorData) { + // 只使用 processedDataList 就够了,另外两个参数我们不需要但必须保留(调用方传了),用 _ 忽略 + successCount := len(processedDataList) + totalCount := len(bm.sensorConfigs) + + logger.Logger.Printf("=== System First Collection Status Report ===") + logger.Logger.Printf("Total devices: %d, Successfully collected and processed: %d", totalCount, successCount) + + // 收集成功设备的 UUID + successUUIDs := make([]string, 0, successCount) + for _, data := range processedDataList { + successUUIDs = append(successUUIDs, data.DeviceUUID) + } + + // 统一时间戳 + timestamp := strconv.FormatInt(time.Now().Unix(), 10) + + // 事件数组 + var events []map[string]interface{} + + for _, uuid := range successUUIDs { + sensorName := "Unknown Sensor" + for _, cfg := range bm.sensorConfigs { + if cfg.DeviceUUID == uuid { + sensorName = GetSensorTypeName(cfg.GetSensorType()) + break + } + } + + events = append(events, map[string]interface{}{ + "description": "Sensor online", + "timestamp": timestamp, + "host_uuid": bm.hostUUID, + "deviceuuid": uuid, + "level": "online", + "sensor_type": sensorName, + }) + } + + if len(events) == 0 { + logger.Logger.Printf("No successful devices in first collection, skipping report") + return + } + + metric := map[string]interface{}{ + "type": "event", + "args": events, // 批量数组! + } + + routeKey := fmt.Sprintf("/dthjjc/device/%s/event", bm.hostUUID) + + // 打印完整 JSON 方便你看 + fullMsg := map[string]interface{}{ + "method": "metric_data", + "params": map[string]interface{}{ + "route_key": routeKey, + "metric": metric, + }, + } + if b, err := json.Marshal(fullMsg); err == nil { + logger.Logger.Printf("First collection batch online events: %s", string(b)) + } + + // 发送 + if err := connect.ReportMetricData(routeKey, metric); err != nil { + logger.Logger.Printf("Failed to report first batch online events: %v", err) + } else { + logger.Logger.Printf("First collection successful, batch reported %d devices online", len(events)) + } +} + +func (bm *BusinessManager) GetReportManager() *ReportManager { + return bm.reportManager +} + +// Close 关闭资源 +func (bm *BusinessManager) Close() { + logger.Logger.Printf("Business manager resources released") +} diff --git a/business/modbus_config.go b/business/modbus_config.go new file mode 100644 index 0000000..6ecbfe1 --- /dev/null +++ b/business/modbus_config.go @@ -0,0 +1,76 @@ +package business + +import ( + "fmt" + "time" +) + +type SensorType int + +const ( + SensorTypeTemperatureHumidity SensorType = 1 // 温湿度传感器 + SensorTypeWaterLeak SensorType = 2 // 水浸传感器 + SensorTypeSmoke SensorType = 3 // 烟雾传感器 +) + +// ModbusConfig Modbus通信配置 +type ModbusConfig struct { + Port string + BaudRate int + DataBits int + StopBits int + Parity string + Timeout time.Duration + RetryCount int + RetryDelay time.Duration + SensorInterval time.Duration + BusResetDelay time.Duration +} +type SensorRegisterConfig struct { + RegisterAddress uint16 + RegisterCount uint16 +} + +// GetSensorTypeDescription 获取传感器类型描述 +func GetSensorTypeDescription(sensorType SensorType) string { + switch sensorType { + case SensorTypeTemperatureHumidity: + return "TemperatureHumidityTransducer(1)" + case SensorTypeWaterLeak: + return "WaterLeakTransducer(2)" + case SensorTypeSmoke: + return "SmokeTransducer(3)" + default: + return "UnknownTransducer" + } +} + +// DefaultModbusConfig 获取默认Modbus配置 +func DefaultModbusConfig() *ModbusConfig { + return &ModbusConfig{ + Port: "/dev/ttyS5", + BaudRate: 9600, + DataBits: 8, + StopBits: 1, + Parity: "N", + Timeout: 3 * time.Second, // 缩短超时时间 + RetryCount: 2, // 减少重试次数 + RetryDelay: 100 * time.Millisecond, + SensorInterval: 200 * time.Millisecond, // 传感器间读取间隔 + BusResetDelay: 50 * time.Millisecond, // 总线稳定时间 + } +} + +// GetRegisterConfig 根据传感器类型获取寄存器配置 +func GetRegisterConfig(sensorType SensorType) (*SensorRegisterConfig, error) { + switch sensorType { + case SensorTypeTemperatureHumidity: + return &SensorRegisterConfig{RegisterAddress: 0x00, RegisterCount: 2}, nil + case SensorTypeWaterLeak: + return &SensorRegisterConfig{RegisterAddress: 0x00, RegisterCount: 1}, nil + case SensorTypeSmoke: + return &SensorRegisterConfig{RegisterAddress: 0x03, RegisterCount: 1}, nil + default: + return nil, fmt.Errorf("Unsupported sensor types") + } +} diff --git a/business/report_manager.go b/business/report_manager.go new file mode 100644 index 0000000..4218ab0 --- /dev/null +++ b/business/report_manager.go @@ -0,0 +1,224 @@ +package business + +import ( + "Dynamic_environmental_detection/connect" + "Dynamic_environmental_detection/logger" + "encoding/json" + "fmt" + "strconv" +) + +// ReportManager 数据上报管理器 +type ReportManager struct { + hostUUID string + faultDetector *FaultDetector +} + +// NewReportManager 创建数据上报管理器 +func NewReportManager(hostUUID string) *ReportManager { + return &ReportManager{ + hostUUID: hostUUID, + faultDetector: NewFaultDetector(hostUUID), + } +} + +// ReportMetricData 上报指标数据 +func (rm *ReportManager) ReportMetricData(processedData []*ProcessedSensorData, sensorConfigs []SensorConfig) error { + if len(processedData) == 0 { + logger.Logger.Printf("There is no valid data to report") + return nil + } + + // 构建设备数据数组 + var deviceDataList []DeviceData + + for _, data := range processedData { + deviceData, err := rm.convertToDeviceData(data, sensorConfigs) + if err != nil { + logger.Logger.Printf("Failed to convert device data (Device: %s): %v", data.DeviceUUID, err) + continue + } + deviceDataList = append(deviceDataList, deviceData) + } + + // 构建数据上报的metric内容 + metricData := map[string]interface{}{ + "type": "data", + "args": deviceDataList, + } + + // 外层route_key(用于WebSocket路由) + outerRouteKey := fmt.Sprintf("/dthjjc/device/%s/data", rm.hostUUID) + + // 记录要发送的详细内容 + messageJSON, err := json.Marshal(map[string]interface{}{ + "method": "metric_data", + "params": map[string]interface{}{ + "route_key": outerRouteKey, + "metric": metricData, + }, + }) + if err != nil { + logger.Logger.Printf("Failed to serialize and report data: %v", err) + } else { + logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON)) + } + + logger.Logger.Printf("Prepare to submit indicator data and equipment count: %d", len(deviceDataList)) + + // 上报数据 + if err := connect.ReportMetricData(outerRouteKey, metricData); err != nil { + return fmt.Errorf("Failed to report indicator data: %v", err) + } + + // 记录每个设备的上报状态 + for _, data := range processedData { + switch data.SensorType { + case SensorTypeTemperatureHumidity: + if len(data.Values) >= 2 { + logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Temp=%.1f°C, Humidity=%.1f%%)", + data.DeviceUUID, data.Values[0], data.Values[1]) + } + case SensorTypeWaterLeak: + statusText := "online" + if data.Status == 1 { + statusText = "alarm" + } + logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Water Immersion Sensor Status: %s)", + data.DeviceUUID, statusText) + case SensorTypeSmoke: + statusText := "online" + if data.Status == 1 { + statusText = "alarm" + } + logger.Logger.Printf("Indicator snapshot has been submitted. (DeviceUUID=%s, Smoke Sensor Status: %s)", + data.DeviceUUID, statusText) + } + } + + logger.Logger.Printf("Asynchronous reporting successful") + return nil +} + +// ReportEventData 上报事件数据 +func (rm *ReportManager) ReportEventData(event *EventData) error { + // 新协议:我们不再单个发,而是偷偷攒进一个全局批量队列,统一用新格式发 + // 但为了不影响其他调用处,我们保留这个函数,只是内部实现变了 + + // 特殊处理:如果 event.Level == "system_start",直接忽略(取消这个事件) + if event.Level == "system_start" { + return nil + } + + // 转换为新批量格式(复用 EventData 的字段) + item := map[string]interface{}{ + "description": event.Description, + "timestamp": event.Timestamp, + "host_uuid": event.HostUUID, + "level": event.Level, + } + + if event.DeviceUUID != "" { + item["deviceuuid"] = event.DeviceUUID + } + if event.LastSuccessfulTime != "" { + item["last_successful_time"] = event.LastSuccessfulTime + } + if event.SensorType != "" { + item["sensor_type"] = event.SensorType + } + if event.Recommendation != "" { + item["recommendation"] = event.Recommendation + } + + // 关键:复用原来的 metric_data 通道发送,但 args 是数组 + var events []map[string]interface{} + events = append(events, item) + + metric := map[string]interface{}{ + "type": "event", + "args": events, // 关键:这里是数组,哪怕只有1条 + } + + routeKey := fmt.Sprintf("/dthjjc/device/%s/event", rm.hostUUID) + + messageJSON, _ := json.Marshal(map[string]interface{}{ + "method": "metric_data", + "params": map[string]interface{}{ + "route_key": routeKey, + "metric": metric, + }, + }) + logger.Logger.Printf("Sending WebSocket message: %s", string(messageJSON)) + + return connect.ReportMetricData(routeKey, metric) +} + +// convertToDeviceData 转换处理后的数据为上报格式 +func (rm *ReportManager) convertToDeviceData(data *ProcessedSensorData, sensorConfigs []SensorConfig) (DeviceData, error) { + // 查找设备配置 + var sensorConfig SensorConfig + found := false + for _, config := range sensorConfigs { + if config.DeviceUUID == data.DeviceUUID { + sensorConfig = config + found = true + break + } + } + + if !found { + return DeviceData{}, fmt.Errorf("No device configuration found: %s", data.DeviceUUID) + } + + deviceData := DeviceData{ + DeviceUUID: data.DeviceUUID, + Type: strconv.Itoa(int(data.SensorType)), + Facility: sensorConfig.Facility, + Address: sensorConfig.Address, + } + + // 根据传感器类型设置value字段 + switch data.SensorType { + case SensorTypeTemperatureHumidity: + if len(data.Values) >= 2 { + // 直接构造 map,不包数组!! + valueMap := map[string]string{ + "temp": fmt.Sprintf("%.1f", data.Values[0]), + "humidity": fmt.Sprintf("%.1f", data.Values[1]), + } + deviceData.Value = valueMap // 直接赋值对象,不是数组 + } else { + return DeviceData{}, fmt.Errorf("Temperature and humidity data is incomplete.: %v", data.Values) + } + + case SensorTypeWaterLeak, SensorTypeSmoke: + deviceData.Value = strconv.Itoa(data.Status) + + default: + return DeviceData{}, fmt.Errorf("Unknown sensor type: %d", data.SensorType) + } + + logger.Logger.Printf("DEBUG: Conversion of equipment data successful - Device: %s, Va1lue: %+v", data.DeviceUUID, deviceData.Value) + return deviceData, nil +} + +// UpdateAndReportDeviceStatus 更新设备状态并上报事件 +func (rm *ReportManager) UpdateAndReportDeviceStatus(deviceUUID string, success bool, errMsg string, sensorType SensorType) { + needReport, event := rm.faultDetector.UpdateDeviceStatus(deviceUUID, success, errMsg, sensorType) + if needReport && event != nil { + // 异步上报事件 + go func() { + if err := rm.ReportEventData(event); err != nil { + logger.Logger.Printf("Event reporting failed: %v", err) + } else { + logger.Logger.Printf("Asynchronous reporting successful") + } + }() + } +} + +// GetFaultDetector 获取故障检测器 +func (rm *ReportManager) GetFaultDetector() *FaultDetector { + return rm.faultDetector +} diff --git a/connect/ws_channel.go b/connect/ws_channel.go new file mode 100644 index 0000000..6d6ec2a --- /dev/null +++ b/connect/ws_channel.go @@ -0,0 +1,480 @@ +package connect + +import ( + "Dynamic_environmental_detection/logger" + "encoding/json" + "fmt" + "github.com/gorilla/websocket" + "sync" + "time" +) + +// Global variables +var ( + wsClient *WSClient + wsMutex sync.RWMutex + isConnecting bool + wsURL = "ws://172.17.0.1:18080/ws" +) + +type WSMessage struct { + ID int64 `json:"id,omitempty"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` + Error *WSError `json:"error,omitempty"` +} + +type WSError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type RegisterParams struct { + ServiceID string `json:"service_id"` + MetaData map[string]interface{} `json:"meta_data,omitempty"` + ContainerName string `json:"container_name,omitempty"` +} + +type SubscribeParams struct { + Topic string `json:"topic"` +} + +type MetricDataParams struct { + RouteKey string `json:"route_key"` + Metric interface{} `json:"metric"` +} + +type PublishParams struct { + Topic string `json:"topic"` + Data json.RawMessage `json:"data,omitempty"` +} + +type WSClient struct { + Conn *websocket.Conn + url string + serviceID string + metaData map[string]interface{} + containerName string + subscribers map[string]func(*PublishParams) + connected bool + mutex sync.Mutex + messageID int64 + callbacks map[int64]chan *WSMessage + closeChan chan struct{} + reconnectFlag bool +} + +// ==================== Public API ==================== + +//// GetWSClient returns the global WebSocket client instance +//func GetWSClient() *WSClient { +// wsMutex.RLock() +// defer wsMutex.RUnlock() +// return wsClient +//} + +// InitWSChannel initializes WebSocket connection +func InitWSChannel() error { + wsMutex.Lock() + defer wsMutex.Unlock() + + if wsClient != nil && wsClient.connected { + logger.Logger.Printf("WebSocket connection already exists") + return nil + } + + if isConnecting { + return fmt.Errorf("WebSocket connection is being established") + } + + isConnecting = true + defer func() { isConnecting = false }() + + logger.Logger.Printf("Starting WebSocket connection initialization: %s", wsURL) + + client, err := NewWSClient(wsURL) + if err != nil { + logger.Logger.Printf("Failed to create WebSocket client: %v", err) + return err + } + + wsClient = client + logger.Logger.Printf("WebSocket connection initialized successfully") + return nil +} + +// GenerateServiceID generates a unique service ID +func GenerateServiceID() string { + return fmt.Sprintf("fireleave-service-%d", time.Now().UnixNano()) +} + +// RegisterService registers the service with the server +func RegisterService(serviceID string, metaData map[string]interface{}, containerName string) error { + if wsClient == nil { + return fmt.Errorf("WebSocket client not initialized") + } + + // Store registration info for reconnection + wsClient.serviceID = serviceID + wsClient.metaData = metaData + wsClient.containerName = containerName + + params := RegisterParams{ + ServiceID: serviceID, + MetaData: metaData, + ContainerName: containerName, + } + + logger.Logger.Printf("Starting service registration: ServiceID=%s", serviceID) + response, err := wsClient.sendRequest("register", params, true) + if err != nil { + return fmt.Errorf("service registration failed: %v", err) + } + + if response.Error != nil { + return fmt.Errorf("service registration error: %s", response.Error.Message) + } + + var resultStr string + if err := json.Unmarshal(response.Result, &resultStr); err != nil { + return fmt.Errorf("failed to parse registration response: %v", err) + } + + if resultStr != "ok" { + return fmt.Errorf("service registration failed: server returned %s", resultStr) + } + + logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID) + return nil +} + +// SubscribeTopic subscribes to a topic +func SubscribeTopic(topic string, callback func(*PublishParams)) error { + if wsClient == nil { + return fmt.Errorf("WebSocket client not initialized") + } + + params := SubscribeParams{Topic: topic} + + logger.Logger.Printf("Starting topic subscription: Topic=%s", topic) + response, err := wsClient.sendRequest("subscribe", params, true) + if err != nil { + return fmt.Errorf("topic subscription failed: %v", err) + } + + if response.Error != nil { + return fmt.Errorf("topic subscription error: %s", response.Error.Message) + } + + wsClient.mutex.Lock() + wsClient.subscribers[topic] = callback + wsClient.mutex.Unlock() + + logger.Logger.Printf("Topic subscription successful: Topic=%s", topic) + return nil +} + +// ReportMetricData reports metric data to the server (no response expected) +func ReportMetricData(routeKey string, metric interface{}) error { + if wsClient == nil { + return fmt.Errorf("WebSocket client not initialized") + } + + params := MetricDataParams{ + RouteKey: routeKey, + Metric: metric, + } + + // metric_data method doesn't expect a response + _, err := wsClient.sendRequest("metric_data", params, false) + if err != nil { + return fmt.Errorf("metric data reporting failed: %v", err) + } + + logger.Logger.Printf("Metric data reported successfully: RouteKey=%s", routeKey) + return nil +} + +// Close closes the WebSocket connection +func Close() { + if wsClient != nil { + wsClient.close() + } +} + +// ==================== Internal Methods ==================== + +// NewWSClient creates a new WebSocket client +func NewWSClient(url string) (*WSClient, error) { + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + client := &WSClient{ + Conn: conn, + url: url, + connected: true, + messageID: 1, + callbacks: make(map[int64]chan *WSMessage), + subscribers: make(map[string]func(*PublishParams)), + closeChan: make(chan struct{}), + reconnectFlag: true, + } + + go client.readLoop() + go client.heartbeatLoop() + return client, nil +} + +func (c *WSClient) IsConnected() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.connected +} + +func (c *WSClient) sendRequest(method string, params interface{}, waitResponse bool) (*WSMessage, error) { + c.mutex.Lock() + id := c.messageID + c.messageID++ + c.mutex.Unlock() + + message := WSMessage{ + ID: id, + Method: method, + } + + if params != nil { + paramsData, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("parameter serialization failed: %v", err) + } + message.Params = paramsData + } + + if !waitResponse { + return nil, c.sendRawMessage(message) + } + + callback := make(chan *WSMessage, 1) + c.mutex.Lock() + c.callbacks[id] = callback + c.mutex.Unlock() + + defer func() { + c.mutex.Lock() + delete(c.callbacks, id) + c.mutex.Unlock() + }() + + if err := c.sendRawMessage(message); err != nil { + return nil, err + } + + select { + case response := <-callback: + return response, nil + case <-time.After(30 * time.Second): + return nil, fmt.Errorf("response timeout") + case <-c.closeChan: + return nil, fmt.Errorf("connection closed") + } +} + +func (c *WSClient) sendRawMessage(message interface{}) error { + c.mutex.Lock() + defer c.mutex.Unlock() + + if !c.connected { + return fmt.Errorf("WebSocket connection disconnected") + } + + messageData, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("message serialization failed: %v", err) + } + + err = c.Conn.WriteMessage(websocket.TextMessage, messageData) + if err != nil { + logger.Logger.Printf("WebSocket message sending failed: %v", err) + c.connected = false + go c.reconnect() + return fmt.Errorf("message sending failed: %v", err) + } + + return nil +} + +func (c *WSClient) readLoop() { + defer func() { + if r := recover(); r != nil { + logger.Logger.Printf("readLoop panic: %v", r) + } + }() + + for { + _, messageData, err := c.Conn.ReadMessage() + if err != nil { + logger.Logger.Printf("WebSocket message reading failed: %v", err) + c.mutex.Lock() + c.connected = false + c.mutex.Unlock() + go c.reconnect() + return + } + + if string(messageData) == "pong" { + continue + } + + var message WSMessage + if err := json.Unmarshal(messageData, &message); err != nil { + logger.Logger.Printf("WebSocket message parsing failed: %v", err) + continue + } + + // Handle callbacks for request-response messages + if message.ID > 0 { + c.mutex.Lock() + if callback, exists := c.callbacks[message.ID]; exists { + select { + case callback <- &message: + // Message delivered to callback + default: + logger.Logger.Printf("Callback channel full, discarding message: ID=%d", message.ID) + } + } + c.mutex.Unlock() + } + + // Handle publish messages (broadcasts) + if message.Method == "publish" { + c.handlePublishMessage(message) + } + } +} + +func (c *WSClient) handlePublishMessage(message WSMessage) { + var publishParams PublishParams + if err := json.Unmarshal(message.Params, &publishParams); err != nil { + logger.Logger.Printf("Failed to parse publish message parameters: %v", err) + return + } + + c.mutex.Lock() + callback, exists := c.subscribers[publishParams.Topic] + c.mutex.Unlock() + + if exists && callback != nil { + callback(&publishParams) + } +} + +func (c *WSClient) heartbeatLoop() { + ticker := time.NewTicker(25 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.mutex.Lock() + connected := c.connected + conn := c.Conn + c.mutex.Unlock() + + if connected && conn != nil { + err := conn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + logger.Logger.Printf("Failed to send ping: %v", err) + c.mutex.Lock() + c.connected = false + c.mutex.Unlock() + go c.reconnect() + } + } + case <-c.closeChan: + return + } + } +} + +func (c *WSClient) reconnect() { + if !c.reconnectFlag { + return + } + + for i := 0; i < 12; i++ { + delay := time.Duration(1< 30*time.Second { + delay = 30 * time.Second + } + time.Sleep(delay) + + conn, _, err := websocket.DefaultDialer.Dial(c.url, nil) + if err != nil { + logger.Logger.Printf("Reconnection attempt %d/12 failed: %v", i+1, err) + continue + } + + c.mutex.Lock() + c.Conn = conn + c.connected = true + c.mutex.Unlock() + + logger.Logger.Printf("WebSocket reconnection successful!") + + // Auto re-register and re-subscribe after reconnection + c.autoReRegister() + return + } + + logger.Logger.Printf("Reconnection failed after 12 attempts") + c.reconnectFlag = false +} + +func (c *WSClient) autoReRegister() { + // Re-register service if we have the registration info + if c.serviceID != "" { + time.Sleep(1 * time.Second) // Wait for connection to stabilize + + params := RegisterParams{ + ServiceID: c.serviceID, + MetaData: c.metaData, + ContainerName: c.containerName, + } + + if _, err := c.sendRequest("register", params, true); err != nil { + logger.Logger.Printf("Auto re-registration failed: %v", err) + } else { + logger.Logger.Printf("Auto re-registration successful") + } + + // Re-subscribe to all topics + c.mutex.Lock() + subscribers := make(map[string]func(*PublishParams)) + for topic, callback := range c.subscribers { + subscribers[topic] = callback + } + c.mutex.Unlock() + + for topic, callback := range subscribers { + if err := SubscribeTopic(topic, callback); err != nil { + logger.Logger.Printf("Failed to re-subscribe topic %s: %v", topic, err) + } + } + } +} + +func (c *WSClient) close() { + c.mutex.Lock() + defer c.mutex.Unlock() + + c.reconnectFlag = false + c.connected = false + close(c.closeChan) + + if c.Conn != nil { + c.Conn.Close() + } +} diff --git a/dthjjc b/dthjjc new file mode 100644 index 0000000..9231c4e Binary files /dev/null and b/dthjjc differ diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..9cbff33 --- /dev/null +++ b/go.mod @@ -0,0 +1,9 @@ +module Dynamic_environmental_detection + +go 1.19 + +require ( + github.com/goburrow/modbus v0.1.0 // indirect + github.com/goburrow/serial v0.1.0 // indirect + github.com/gorilla/websocket v1.5.3 // indirect +) diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..a3cd6f4 --- /dev/null +++ b/go.sum @@ -0,0 +1,6 @@ +github.com/goburrow/modbus v0.1.0 h1:DejRZY73nEM6+bt5JSP6IsFolJ9dVcqxsYbpLbeW/ro= +github.com/goburrow/modbus v0.1.0/go.mod h1:Kx552D5rLIS8E7TyUwQ/UdHEqvX5T8tyiGBTlzMcZBg= +github.com/goburrow/serial v0.1.0 h1:v2T1SQa/dlUqQiYIT8+Cu7YolfqAi3K96UmhwYyuSrA= +github.com/goburrow/serial v0.1.0/go.mod h1:sAiqG0nRVswsm1C97xsttiYCzSLBmUZ/VSlVLZJ8haA= +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..12d071f --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,139 @@ +package logger + +import ( + "log" + "os" + "path/filepath" + "sync" + "time" +) + +// Logger 全局日志实例 +var Logger *log.Logger +var logFile *os.File +var logFileName string +var mu sync.Mutex + +// InitLogger 初始化日志,创建 /data/logs 目录并按天命名文件 +func InitLogger() error { + mu.Lock() + defer mu.Unlock() + + // 创建 /data/logs 目录 + logDir := "/data/logs" + if err := os.MkdirAll(logDir, 0755); err != nil { + return err + } + + // 按天生成日志文件名:20260102.log + logFileName = filepath.Join(logDir, time.Now().Format("20060102")+".log") + file, err := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + logFile = file + Logger = log.New(file, "", log.LstdFlags) + + // 启动日志文件切换和清理的goroutine + go startLogMaintenance() + + return nil +} + +// startLogMaintenance 启动日志维护(文件切换和清理) +func startLogMaintenance() { + // 每天凌晨检查是否需要切换日志文件 + ticker := time.NewTicker(1 * time.Hour) // 每小时检查一次 + defer ticker.Stop() + + for range ticker.C { + checkLogFileSwitch() + + // 每天凌晨2点执行日志清理 + if time.Now().Hour() == 2 { + CleanupOldLogs() + } + } +} + +// checkLogFileSwitch 检查并切换日志文件 +func checkLogFileSwitch() { + mu.Lock() + defer mu.Unlock() + + if logFile == nil { + return + } + + newFileName := filepath.Join("/data/logs", time.Now().Format("20060102")+".log") + if newFileName != logFileName { + newFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("切换日志文件失败: %v", err) + return + } + + logFile.Close() + logFile = newFile + logFileName = newFileName + Logger = log.New(newFile, "", log.LstdFlags) + Logger.Printf("日志文件已切换到: %s", newFileName) + } +} + +// CloseLogger 关闭日志文件 +func CloseLogger() { + mu.Lock() + defer mu.Unlock() + if logFile != nil { + logFile.Close() + logFile = nil + } +} + +// Close 是 CloseLogger 的别名,提供兼容性 +func Close() { + CloseLogger() +} + +// CleanupOldLogs 删除超过30天的日志文件 +func CleanupOldLogs() { + logDir := "/data/logs" + files, err := os.ReadDir(logDir) + if err != nil { + if Logger != nil { + Logger.Printf("读取日志目录 %s 错误: %v", logDir, err) + } + return + } + + const maxAge = 30 * 24 * time.Hour // 30天 + cutoffTime := time.Now().Add(-maxAge) + + for _, file := range files { + if file.IsDir() { + continue + } + + info, err := file.Info() + if err != nil { + if Logger != nil { + Logger.Printf("获取文件信息 %s 错误: %v", file.Name(), err) + } + continue + } + + if info.ModTime().Before(cutoffTime) { + filePath := filepath.Join(logDir, file.Name()) + if err := os.Remove(filePath); err != nil { + if Logger != nil { + Logger.Printf("删除旧日志文件 %s 错误: %v", filePath, err) + } + } else { + if Logger != nil { + Logger.Printf("删除旧日志文件 %s 成功", filePath) + } + } + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..cc49847 --- /dev/null +++ b/main.go @@ -0,0 +1,173 @@ +package main + +import ( + "Dynamic_environmental_detection/business" + "Dynamic_environmental_detection/connect" + "Dynamic_environmental_detection/logger" + "encoding/json" + "flag" + "log" + "os" + "os/signal" + "syscall" + "time" +) + +func main() { + // 解析命令行参数 + debugMode := flag.Bool("debug", false, "Enable debug mode") + slaveID := flag.String("slave", "2", "Slave address (debug mode only)") + sensorType := flag.String("type", "1", "Sensor type 1-Temperature/Humidity, 2-Water Leak, 3-Smoke (debug mode only)") + rawCommand := flag.String("cmd", "", "Raw Modbus command (hexadecimal, debug mode only)") + flag.Parse() + + // 初始化日志 + if err := logger.InitLogger(); err != nil { + log.Fatal("Failed to initialize logger:", err) + } + defer logger.Close() + + // 调试模式 + if *debugMode { + runDebugMode(*slaveID, *sensorType, *rawCommand) + return + } + + // 正常模式 + runNormalMode() +} + +// runDebugMode 运行调试模式 +func runDebugMode(slaveID, sensorType, rawCommand string) { + logger.Logger.Printf("=== Environmental Detection System Debug Mode ===") + + debugHandler := business.NewDebugHandler() + + if rawCommand != "" { + // 原始命令模式 + if err := debugHandler.TestRawCommand(rawCommand); err != nil { + logger.Logger.Printf("Debug failed: %v", err) + os.Exit(1) + } + } else { + // 设备测试模式 + if err := debugHandler.TestDevice(slaveID, sensorType); err != nil { + logger.Logger.Printf("Debug failed: %v", err) + os.Exit(1) + } + } +} + +// setupGracefulShutdown 设置优雅关闭 +func setupGracefulShutdown() { + sigChan := make(chan os.Signal, 1) + signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM) + + go func() { + sig := <-sigChan + logger.Logger.Printf("Received signal: %v, starting shutdown", sig) + + // 关闭 WebSocket 连接 + connect.Close() + logger.Logger.Printf("WebSocket connection closed") + + logger.Logger.Printf("=== System exited ===") + os.Exit(0) + }() +} + +// runNormalMode 运行正常模式 +func runNormalMode() { + logger.Logger.Printf("=== Environmental Detection System Starting ===") + + // 原有的正常模式代码... + // 2. 初始化 WebSocket 连接 + logger.Logger.Printf("Initializing WebSocket connection...") + if err := connect.InitWSChannel(); err != nil { + log.Fatal("WebSocket connection initialization failed:", err) + } + + // 3. 解析配置文件 + logger.Logger.Printf("Parsing sensor configuration file...") + configs, err := business.ParseServiceConfig("/usr/local/etc/service.conf") + if err != nil { + log.Fatal("Configuration parsing failed:", err) + } + + // 4. 验证配置 + if err := business.ValidateConfig(configs); err != nil { + log.Fatal("Configuration validation failed:", err) + } + + // 5. 注册服务 + logger.Logger.Printf("Registering service to server...") + serviceID := connect.GenerateServiceID() + metaData := map[string]interface{}{ + "version": "1.0", + "type": "environment_detection", + "sensor_count": len(configs), + "description": "Environmental Detection Service", + } + + if err := connect.RegisterService(serviceID, metaData, "environment-detection-container"); err != nil { + log.Fatal("Service registration failed:", err) + } + + // 6. 创建业务管理器 + logger.Logger.Printf("Creating business manager...") + businessManager, err := business.NewBusinessManager(configs) + if err != nil { + log.Fatal("Failed to create business manager:", err) + } + defer businessManager.Close() + + // 7. 订阅主题处理服务器消息 + logger.Logger.Printf("Subscribing to device updates topic...") + if err := connect.SubscribeTopic("device_updates", handleDeviceUpdates); err != nil { + log.Fatal("Topic subscription failed:", err) + } + + // 8. 启动数据采集循环 + logger.Logger.Printf("Starting data collection loop...") + go startDataCollectionLoop(businessManager) + + // 9. 设置优雅关闭 + setupGracefulShutdown() + + logger.Logger.Printf("=== System startup completed, beginning operation ===") + + // 永久运行 + select {} +} + +// handleDeviceUpdates 处理设备更新消息 +func handleDeviceUpdates(params *connect.PublishParams) { + logger.Logger.Printf("Received device update message, topic: %s", params.Topic) + + var updateData map[string]interface{} + if err := json.Unmarshal(params.Data, &updateData); err != nil { + logger.Logger.Printf("Failed to parse device update message: %v", err) + return + } + + logger.Logger.Printf("Device update content: %+v", updateData) +} + +// startDataCollectionLoop 启动数据采集循环 +func startDataCollectionLoop(businessManager *business.BusinessManager) { + + collectionInterval := 20 * time.Second + ticker := time.NewTicker(collectionInterval) + defer ticker.Stop() + logger.Logger.Printf("Data collection loop started, interval: %v", collectionInterval) + + for { + select { + case <-ticker.C: + if err := businessManager.StartDataCollection(); err != nil { + logger.Logger.Printf("Data collection failed: %v", err) + } + } + } + +}