commit 7a9d103943ee2beb347aac36eaf2c5560ba64f76 Author: gqc Date: Tue Nov 18 17:30:28 2025 +0800 初始提交 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/FireLeave_tool.iml b/.idea/FireLeave_tool.iml new file mode 100644 index 0000000..5e764c4 --- /dev/null +++ b/.idea/FireLeave_tool.iml @@ -0,0 +1,9 @@ + + + + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..7805a87 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/vcs.xml b/.idea/vcs.xml new file mode 100644 index 0000000..94a25f7 --- /dev/null +++ b/.idea/vcs.xml @@ -0,0 +1,6 @@ + + + + + + \ No newline at end of file diff --git a/bin/fireLeave-tool-app-linux-arm64 b/bin/fireLeave-tool-app-linux-arm64 new file mode 100644 index 0000000..f5ff4aa Binary files /dev/null and b/bin/fireLeave-tool-app-linux-arm64 differ diff --git a/connect/device_storage.go b/connect/device_storage.go new file mode 100644 index 0000000..2cd4235 --- /dev/null +++ b/connect/device_storage.go @@ -0,0 +1,217 @@ +package connect + +import ( + "FireLeave_tool/logger" + "encoding/json" + "fmt" + "os" + "path/filepath" + "strings" +) + +// Modified device data storage path +var ( + devicesBaseDir = "/data/devices" + deviceDataList []DeviceData +) + +// InitializeDevicesFromServiceConfig Initialize device data from service configuration +func InitializeDevicesFromServiceConfig() ([]DeviceData, error) { + deviceDataMutex.Lock() + defer deviceDataMutex.Unlock() + + logger.Logger.Printf("Initializing device data from service configuration...") + + // Ensure device directory exists + if err := os.MkdirAll(devicesBaseDir, 0755); err != nil { + logger.Logger.Printf("Failed to create device directory: %v", err) + return nil, err + } + logger.Logger.Printf("Device directory confirmed: %s", devicesBaseDir) + + // Load from service configuration + configs, err := LoadServiceConfig() + if err != nil { + logger.Logger.Printf("Failed to load from service configuration: %v", err) + return nil, err + } + + logger.Logger.Printf("Loaded %d devices from service configuration", len(configs)) + + // Clear existing device data + deviceDataList = []DeviceData{} + + // Process each device and save to individual files + for i, config := range configs { + processedData := ProcessDeviceDataForInference(config) + deviceDataList = append(deviceDataList, processedData) + + // Save to individual device file + if err := saveDeviceToFile(processedData); err != nil { + logger.Logger.Printf("Failed to save the device file [%s]: %v", processedData.DeviceUUID, err) + continue + } + logger.Logger.Printf("Processed device [%d]: UUID=%s", i, processedData.DeviceUUID) + } + + logger.Logger.Printf("Device data initialization completed, %d devices", len(deviceDataList)) + return deviceDataList, nil +} + +// saveDeviceToFile Save individual device data to separate file +func saveDeviceToFile(deviceData DeviceData) error { + filePath := filepath.Join(devicesBaseDir, fmt.Sprintf("device_%s.json", deviceData.DeviceUUID)) + + // Clean device data to ensure no invalid characters + cleanedData := cleanDeviceData(deviceData) + + data, err := json.MarshalIndent(cleanedData, "", " ") + if err != nil { + logger.Logger.Printf("Device data serialization failed: %v, Device data: %+v", err, cleanedData) + return fmt.Errorf("Device data serialization failed: %v", err) + } + + // Write directly to file + if err := os.WriteFile(filePath, data, 0644); err != nil { + return fmt.Errorf("Failed to write device file: %v", err) + } + + logger.Logger.Printf("Device data saved successfully: %s", filePath) + return nil +} + +// cleanDeviceData Clean device data, remove possible invalid characters +func cleanDeviceData(data DeviceData) DeviceData { + cleaned := data + + // Clean string fields + cleaned.DeviceUUID = cleanString(data.DeviceUUID) + cleaned.TaskID = cleanString(data.TaskID) + cleaned.CameraRTSP = cleanString(data.CameraRTSP) + cleaned.CameraIP = cleanString(data.CameraIP) + cleaned.UploadURL = cleanString(data.UploadURL) + cleaned.DetectAreaStr = cleanString(data.DetectAreaStr) + + // Ensure numeric fields are valid + if cleaned.Confidence < 0 || cleaned.Confidence > 100 { + cleaned.Confidence = 70 // Default value + } + if cleaned.CameraChannel < 0 { + cleaned.CameraChannel = 0 + } + if cleaned.PersonCount < 0 { + cleaned.PersonCount = 0 + } + if cleaned.Temperature < 0 { + cleaned.Temperature = 0 + } + if cleaned.TemperatureThreshold <= 0 { + cleaned.TemperatureThreshold = 45.0 // Default value + } + + return cleaned +} + +func cleanString(s string) string { + // Remove control characters and invalid Unicode characters + return strings.Map(func(r rune) rune { + if r == 0xFFFD { // Unicode replacement character + return -1 + } + if r < 32 && r != 9 && r != 10 && r != 13 { // Keep tab, newline, carriage return + return -1 + } + return r + }, s) +} + +// GetDeviceDataList Get device data list +func GetDeviceDataList() []DeviceData { + deviceDataMutex.Lock() + defer deviceDataMutex.Unlock() + + // If no data in memory, try to load from device files + if len(deviceDataList) == 0 { + logger.Logger.Printf("No device data in memory, loading from device files...") + if err := loadDevicesFromFiles(); err != nil { + logger.Logger.Printf("Failed to load from device files: %v", err) + return []DeviceData{} + } + } + + // Return copy + result := make([]DeviceData, len(deviceDataList)) + copy(result, deviceDataList) + return result +} + +// loadDevicesFromFiles Load data from device files +func loadDevicesFromFiles() error { + // Clear existing data + deviceDataList = []DeviceData{} + + // Read all files in device directory + files, err := filepath.Glob(filepath.Join(devicesBaseDir, "device_*.json")) + if err != nil { + return fmt.Errorf("Failed to find device files: %v", err) + } + + logger.Logger.Printf("Found %d device files", len(files)) + + for _, file := range files { + data, err := os.ReadFile(file) + if err != nil { + logger.Logger.Printf("Failed to read device file %s: %v", file, err) + continue + } + + var deviceData DeviceData + if err := json.Unmarshal(data, &deviceData); err != nil { + logger.Logger.Printf("Failed to parse device file %s: %v", file, err) + continue + } + + deviceDataList = append(deviceDataList, deviceData) + logger.Logger.Printf("Loaded device from file: UUID=%s", deviceData.DeviceUUID) + } + + logger.Logger.Printf("Loaded %d devices from device files", len(deviceDataList)) + return nil +} + +// UpdateDeviceData Update device data +func UpdateDeviceData(data DeviceData) error { + deviceDataMutex.Lock() + defer deviceDataMutex.Unlock() + + //logger.Logger.Printf("Updating device data [%s]", data.DeviceUUID) + + // Clean input data + cleanedData := cleanDeviceData(data) + + // Find if device with same DeviceUUID already exists + found := false + for i, existing := range deviceDataList { + if existing.DeviceUUID == cleanedData.DeviceUUID { + deviceDataList[i] = cleanedData + found = true + //logger.Logger.Printf("Updated existing device: DeviceUUID=%s", cleanedData.DeviceUUID) + break + } + } + + // If not exists, add to list + if !found { + logger.Logger.Printf("Adding new device to list: DeviceUUID=%s", cleanedData.DeviceUUID) + deviceDataList = append(deviceDataList, cleanedData) + } + + // Save to individual file + if err := saveDeviceToFile(cleanedData); err != nil { + logger.Logger.Printf("Failed to save device file: %v", err) + return err + } + + logger.Logger.Printf("Device data update completed: DeviceUUID=%s", cleanedData.DeviceUUID) + return nil +} diff --git a/connect/port_manager.go b/connect/port_manager.go new file mode 100644 index 0000000..658d3d2 --- /dev/null +++ b/connect/port_manager.go @@ -0,0 +1,96 @@ +package connect + +import ( + "errors" + "fmt" + "net" + "sync" + + "FireLeave_tool/logger" +) + +// PortManager 管理端口分配 +type PortManager struct { + mu sync.Mutex + allocated map[string]int // DeviceUUID 到推理端口的映射 + portPool []int // 推理端口池(HTTP) + available map[int]bool // 可用端口标记 +} + +func NewPortManager() *PortManager { + // 推理端口范围:58881-58890(HTTP),支持最多10个设备 + ports := []int{58881, 58882, 58883, 58884, 58885, 58886, 58887, 58888, 58889, 58890} + + // 初始化可用端口映射 + available := make(map[int]bool) + for _, port := range ports { + available[port] = true + } + + return &PortManager{ + allocated: make(map[string]int), + portPool: ports, + available: available, + } +} + +func (pm *PortManager) AllocatePort(deviceUUID string) (int, error) { + pm.mu.Lock() + defer pm.mu.Unlock() + + // 检查设备是否已分配端口 + if port, exists := pm.allocated[deviceUUID]; exists { + logger.Logger.Printf("Device %s already allocated port: %d", deviceUUID, port) + return port, nil + } + + // 查找可用端口 + for _, port := range pm.portPool { + if !pm.available[port] { + continue // 端口已被分配 + } + + tempPort := port + 1000 // 温度端口(TCP) + + // 检查推理端口(HTTP)是否可用 + inferenceLn, err := net.Listen("tcp", fmt.Sprintf(":%d", port)) + if err != nil { + logger.Logger.Printf("Port %d (inference) is not available: %v", port, err) + continue + } + inferenceLn.Close() + + // 检查温度端口(TCP)是否可用 + tempLn, err := net.Listen("tcp", fmt.Sprintf(":%d", tempPort)) + if err != nil { + logger.Logger.Printf("Port %d (temperature) is not available: %v", tempPort, err) + continue + } + tempLn.Close() + + // 端口可用,记录分配 + pm.allocated[deviceUUID] = port + pm.available[port] = false + + logger.Logger.Printf("Port allocated for device %s: inference=%d, temperature=%d", + deviceUUID, port, tempPort) + return port, nil + } + + return 0, errors.New("no available ports for inference and temperature") +} + +func (pm *PortManager) ReleasePort(deviceUUID string) { + pm.mu.Lock() + defer pm.mu.Unlock() + + // 删除设备端口映射 + if port, exists := pm.allocated[deviceUUID]; exists { + delete(pm.allocated, deviceUUID) + pm.available[port] = true // 标记端口为可用 + logger.Logger.Printf("Port released for device %s: %d", deviceUUID, port) + } +} + +// GlobalPortManager 全局端口管理器 +var GlobalPortManager = NewPortManager() diff --git a/connect/ws_channel.go b/connect/ws_channel.go new file mode 100644 index 0000000..d4dd034 --- /dev/null +++ b/connect/ws_channel.go @@ -0,0 +1,720 @@ +package connect + +import ( + "context" + "encoding/json" + "fmt" + "os" + "strings" + "sync" + "time" + + "FireLeave_tool/logger" + "github.com/gorilla/websocket" +) + +// Global variables +var ( + wsClient *WSClient + wsMutex sync.RWMutex + isConnecting bool + wsURL = "ws://172.17.0.1:18080/ws" // Updated to specified address + deviceDataMutex sync.Mutex + ConfigUpdateChan chan DeviceData + currentServiceID string // New: current service's server_id +) + +func GetWSClient() *WSClient { + wsMutex.RLock() + defer wsMutex.RUnlock() + return wsClient +} + +func (c *WSClient) IsConnected() bool { + c.mutex.Lock() + defer c.mutex.Unlock() + return c.connected +} + +const ( + serviceConfigFile = "/usr/local/etc/service.conf" +) + +// DeviceData struct +type DeviceData struct { + DeviceUUID string `json:"device_uuid"` + TaskID string `json:"task_id"` + CameraRTSP string `json:"camera_rtsp"` + CameraIP string `json:"camera_ip"` + Confidence int `json:"confidence"` + DetectArea []Area `json:"detect_area"` + DetectAreaStr string `json:"detect_area_str,omitempty"` + PersonCount int `json:"person_count"` + DetectionTime json.Number `json:"detection_time"` // Using json.Number + AlarmTime json.Number `json:"alarm_time"` // Using json.Number + TemperatureThreshold float64 `json:"temperature_threshold"` + Temperature float64 `json:"temperature"` + UploadURL string `json:"upload_url"` + WSConn *websocket.Conn + CameraChannel int `json:"camera_channel"` + LastAlarmTime int64 `json:"-"` // Last alarm time + AlarmPauseUntil int64 `json:"-"` // Pause alarm untilw + HostUUID string `json:"host_uuid"` + Address string `json:"address"` +} + +type InferenceParams struct { + DeviceUUID string `json:"device_uuid"` + CameraRTSP string `json:"camera_rtsp"` + CameraChannel int `json:"camera_channel"` + Confidence int `json:"confidence"` + DetectArea string `json:"detect_area"` // String format detection area + Port int `json:"port"` // HTTP server port +} + +type Area struct { + ID int `json:"id"` + Pts []Point `json:"pts"` +} + +type Point struct { + X float64 `json:"x"` + Y float64 `json:"y"` +} + +// WSMessage struct +type WSMessage struct { + ID int64 `json:"id,omitempty"` + Method string `json:"method"` + Params json.RawMessage `json:"params,omitempty"` + Result json.RawMessage `json:"result,omitempty"` // Modified: string -> json.RawMessage + Error *WSError `json:"error,omitempty"` +} + +type WSError struct { + Code int `json:"code"` + Message string `json:"message"` +} + +type RegisterParams struct { + ContainerName string `json:"container_name,omitempty"` + ServiceID string `json:"service_id,omitempty"` + MetaData map[string]interface{} `json:"meta_data,omitempty"` // New: metadata field +} + +type SubscribeParams struct { + Topic string `json:"topic"` +} + +type PublishParams struct { + EventType string `json:"event_type,omitempty"` + Body json.RawMessage `json:"body,omitempty"` + Topic string `json:"topic,omitempty"` + Data json.RawMessage `json:"data,omitempty"` +} + +type WSClient struct { + Conn *websocket.Conn + url string + serviceID string + serverID string + connected bool + mutex sync.Mutex + messageID int64 + callbacks map[int64]chan *WSMessage + subscribers map[string]func([]byte) + closeChan chan struct{} + reconnectFlag bool +} + +// NewWSClient creates WebSocket client +func NewWSClient(url string) (*WSClient, error) { + conn, _, err := websocket.DefaultDialer.Dial(url, nil) + if err != nil { + return nil, err + } + + client := &WSClient{ + Conn: conn, + url: url, + connected: true, + messageID: 1, + callbacks: make(map[int64]chan *WSMessage), + subscribers: make(map[string]func([]byte)), + closeChan: make(chan struct{}), + reconnectFlag: true, + } + + go client.readLoop() + go client.heartbeatLoop() + return client, nil +} + +func (c *WSClient) connect() error { + conn, _, err := websocket.DefaultDialer.Dial(c.url, nil) + if err != nil { + return err + } + c.Conn = conn + c.connected = true + go c.readLoop() + go c.heartbeatLoop() + return nil +} + +// InitWSChannel initializes WebSocket +func InitWSChannel() error { + wsMutex.Lock() + defer wsMutex.Unlock() + + if wsClient != nil && wsClient.connected { + logger.Logger.Printf("WebSocket connection already exists") + return nil + } + + if isConnecting { + logger.Logger.Printf("WebSocket connection is being established") + return fmt.Errorf("WebSocket connection is being established") + } + + isConnecting = true + defer func() { isConnecting = false }() + + logger.Logger.Printf("Starting WebSocket connection initialization: %s", wsURL) + + // Connect WebSocket asynchronously to avoid blocking + go func() { + client, err := NewWSClient(wsURL) + if err != nil { + logger.Logger.Printf("Failed to create WebSocket client: %v", err) + return + } + + if ConfigUpdateChan == nil { + ConfigUpdateChan = make(chan DeviceData, 100) + } + + // Note: No longer calling LoadDeviceData() here to avoid duplicate loading + wsClient = client + logger.Logger.Printf("WebSocket connection initialized successfully (asynchronous)") + }() + + return nil +} + +// New: function to generate server_id +func generateServiceID() string { + return fmt.Sprintf("fireleave-service-%d", time.Now().UnixNano()) +} + +// New: set current server_id +//func SetServerID(serverID string) { +// currentServerID = serverID +//} + +func GetServiceID() string { + if currentServiceID == "" { + currentServiceID = generateServiceID() + } + return currentServiceID +} + +// RegisterService service registration - modified: now requires providing own server_id +func RegisterService(containerName string) (string, error) { + if wsClient == nil { + return "", fmt.Errorf("WebSocket client not initialized") + } + + serviceID := GetServiceID() + + params := RegisterParams{ + ContainerName: containerName, + ServiceID: serviceID, + MetaData: map[string]interface{}{ // Add metadata + "version": "1.0", + "type": "fire_leave", + }, + } + logger.Logger.Printf("Starting service registration: ContainerName=%s, ServiceID=%s", containerName, serviceID) + response, err := wsClient.sendRequest("register", params, true) + if err != nil { + logger.Logger.Printf("Service registration failed: %v", err) + return "", err + } + if response.Error != nil { + logger.Logger.Printf("Service registration error: Code=%d, Message=%s", response.Error.Code, response.Error.Message) + return "", fmt.Errorf("Service registration error: %s", response.Error.Message) + } + + // Parse response + var resultStr string + if err := json.Unmarshal(response.Result, &resultStr); err != nil { + logger.Logger.Printf("Failed to parse service registration response: %v, Raw response: %s", err, string(response.Result)) + return "", fmt.Errorf("Failed to parse service registration response: %v", err) + } + + if resultStr != "ok" { + logger.Logger.Printf("Service registration failed: Expected result 'ok', got '%s'", resultStr) + return "", fmt.Errorf("Service registration failed: Server returned abnormal response") + } + + wsClient.serviceID = serviceID + logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID) + return serviceID, nil +} + +// SubscribeTopic topic subscription +func SubscribeTopic(topic string, callback func([]byte)) error { + if wsClient == nil { + return fmt.Errorf("WebSocket client not initialized") + } + params := SubscribeParams{ + Topic: topic, + } + logger.Logger.Printf("Starting topic subscription: Topic=%s", topic) + response, err := wsClient.sendRequest("subscribe", params, true) + if err != nil { + logger.Logger.Printf("Topic subscription failed: %v", err) + return err + } + if response.Error != nil { + logger.Logger.Printf("Topic subscription error: Code=%d, Message=%s", response.Error.Code, response.Error.Message) + return fmt.Errorf("Topic subscription error: %s", response.Error.Message) + } + wsClient.mutex.Lock() + wsClient.subscribers[topic] = callback + wsClient.mutex.Unlock() + logger.Logger.Printf("Topic subscription successful: Topic=%s", topic) + return nil +} + +func (c *WSClient) SendRawMessage(message interface{}) error { + c.mutex.Lock() + defer c.mutex.Unlock() + if !c.connected { + return fmt.Errorf("WebSocket connection disconnected") + } + messageData, err := json.Marshal(message) + if err != nil { + return fmt.Errorf("Message serialization failed: %v", err) + } + logger.Logger.Printf("Sending WebSocket message: %s", string(messageData)) + err = c.Conn.WriteMessage(websocket.TextMessage, messageData) + if err != nil { + logger.Logger.Printf("WebSocket message sending failed: %v", err) + c.connected = false + go c.reconnect() + return fmt.Errorf("Message sending failed: %v", err) + } + return nil +} + +// sendRequest sends request +func (c *WSClient) sendRequest(method string, params interface{}, waitResponse bool) (*WSMessage, error) { + c.mutex.Lock() + id := c.messageID + c.messageID++ + c.mutex.Unlock() + + message := WSMessage{ + ID: id, + Method: method, + } + if params != nil { + paramsData, err := json.Marshal(params) + if err != nil { + return nil, fmt.Errorf("Parameter serialization failed: %v", err) + } + message.Params = paramsData + } + + if waitResponse { + callback := make(chan *WSMessage, 1) + c.mutex.Lock() + c.callbacks[id] = callback + c.mutex.Unlock() + + defer func() { + c.mutex.Lock() + delete(c.callbacks, id) + c.mutex.Unlock() + logger.Logger.Printf("Cleaning up callback channel: ID=%d", id) + }() + + if err := c.SendRawMessage(message); err != nil { + return nil, err + } + + logger.Logger.Printf("Waiting for response: ID=%d", id) + select { + case response := <-callback: + logger.Logger.Printf("Received response: ID=%d", id) + return response, nil + case <-time.After(30 * time.Second): + logger.Logger.Printf("Response timeout: ID=%d", id) + return nil, fmt.Errorf("Response timeout") + case <-c.closeChan: + logger.Logger.Printf("Connection closed, stopping response wait: ID=%d", id) + return nil, fmt.Errorf("Connection closed") + } + } + return nil, c.SendRawMessage(message) +} + +// readLoop message reading loop +func (c *WSClient) readLoop() { + logger.Logger.Printf("Starting WebSocket message reading loop") + defer func() { + if r := recover(); r != nil { + logger.Logger.Printf("readLoop panic: %v", r) + } + // Important: do not close(closeChan)! + logger.Logger.Printf("WebSocket read loop exited, connection marked as disconnected") + }() + + for { + _, messageData, err := c.Conn.ReadMessage() + if err != nil { + logger.Logger.Printf("WebSocket message reading failed: %v", err) + c.mutex.Lock() + c.connected = false + c.mutex.Unlock() + go c.reconnect() // Trigger reconnection + return + } + + if string(messageData) == "pong" { + logger.Logger.Printf("Received pong response") + continue + } + + logger.Logger.Printf("Received WebSocket message: %s", string(messageData)) + + var message WSMessage + if err := json.Unmarshal(messageData, &message); err != nil { + logger.Logger.Printf("WebSocket message parsing failed: %v", err) + continue + } + + if message.ID > 0 { + c.mutex.Lock() + if callback, exists := c.callbacks[message.ID]; exists { + select { + case callback <- &message: + logger.Logger.Printf("Successfully sent message to callback channel: ID=%d", message.ID) + default: + logger.Logger.Printf("Callback channel full, discarding message: ID=%d", message.ID) + } + } + c.mutex.Unlock() + } + + if message.Method == "publish" { + c.handlePublishMessage(message) + } + } +} + +// handlePublishMessage handles publish messages +func (c *WSClient) handlePublishMessage(message WSMessage) { + logger.Logger.Printf("Processing publish message") + var publishParams PublishParams + if err := json.Unmarshal(message.Params, &publishParams); err != nil { + logger.Logger.Printf("Failed to parse publish message parameters: %v", err) + return + } + + if publishParams.Topic != "" { + c.mutex.Lock() + callback, exists := c.subscribers[publishParams.Topic] + c.mutex.Unlock() + if exists && callback != nil { + callback(message.Params) + } + } + + if publishParams.Data != nil { + var deviceData DeviceData + if err := json.Unmarshal(publishParams.Data, &deviceData); err != nil { + logger.Logger.Printf("Failed to parse DeviceData: %v", err) + return + } + + // Check if DeviceUUID is valid + if deviceData.DeviceUUID == "" { + logger.Logger.Printf("Invalid DeviceData: DeviceUUID is empty, ignoring") + return + } + + logger.Logger.Printf("Received valid DeviceData: DeviceUUID=%s", deviceData.DeviceUUID) + + // 1. Save original configuration to service config file (using SaveSingleServiceConfig) + if err := SaveSingleServiceConfig(deviceData); err != nil { + logger.Logger.Printf("Failed to save service configuration: %v", err) + } + + // 2. Process data and save to device data file + processedData := ProcessDeviceDataForInference(deviceData) + if err := UpdateDeviceData(processedData); err != nil { + logger.Logger.Printf("Failed to update DeviceData: %v", err) + return + } + + select { + case ConfigUpdateChan <- processedData: + logger.Logger.Printf("New DeviceData sent to config update channel: DeviceUUID=%s", processedData.DeviceUUID) + default: + logger.Logger.Printf("Config update channel full, discarding message") + } + + } +} + +// heartbeatLoop heartbeat loop - fixed version +func (c *WSClient) heartbeatLoop() { + logger.Logger.Printf("Starting WebSocket heartbeat loop") + ticker := time.NewTicker(25 * time.Second) + defer ticker.Stop() + + for { + select { + case <-ticker.C: + c.mutex.Lock() + connected := c.connected + conn := c.Conn + c.mutex.Unlock() + + if connected && conn != nil { + err := conn.WriteMessage(websocket.PingMessage, []byte{}) + if err != nil { + logger.Logger.Printf("Failed to send ping: %v", err) + c.mutex.Lock() + c.connected = false + c.mutex.Unlock() + go c.reconnect() + } else { + logger.Logger.Printf("Ping sent successfully") + } + } + case <-c.closeChan: + logger.Logger.Printf("Heartbeat loop received close signal, exiting") + ticker.Stop() // Explicitly stop ticker + return + } + } +} +func (c *WSClient) reRegisterAndResubscribe() { + // 1. 等待连接稳定 + time.Sleep(1 * time.Second) + + // 2. 重新注册 + containerName := "fireleave-container" + serviceID, err := RegisterService(containerName) + if err != nil { + logger.Logger.Printf("Failed to re-register service after reconnect: %v", err) + return + } + logger.Logger.Printf("Re-registration successful: ServiceID=%s", serviceID) + + // 3. 重新订阅 + for topic, callback := range c.subscribers { + if err := SubscribeTopic(topic, callback); err != nil { + logger.Logger.Printf("Failed to re-subscribe topic %s: %v", topic, err) + } else { + logger.Logger.Printf("Re-subscribed to topic: %s", topic) + } + } +} + +// connect/wschannel.go → reconnect complete fix +func (c *WSClient) reconnect() { + if !c.reconnectFlag { + return + } + + for i := 0; i < 12; i++ { + delay := time.Duration(1< 30*time.Second { + delay = 30 * time.Second + } + time.Sleep(delay) + + conn, _, err := websocket.DefaultDialer.Dial(c.url, nil) + if err != nil { + logger.Logger.Printf("Reconnection attempt %d/12 failed: %v", i+1, err) + continue + } + + c.mutex.Lock() + c.Conn = conn + c.connected = true + c.mutex.Unlock() + + logger.Logger.Printf("WebSocket reconnection successful!") + + // 关键:重连后必须先 register! + go c.reRegisterAndResubscribe() + + // 重启读写循环 + go c.readLoop() + go c.heartbeatLoop() + return + } + + logger.Logger.Printf("Reconnection failed after 12 attempts") + c.reconnectFlag = false +} +func (d *DeviceData) ToInferenceParams(port int) InferenceParams { + return InferenceParams{ + DeviceUUID: d.DeviceUUID, + CameraRTSP: d.CameraRTSP, + CameraChannel: d.CameraChannel, + Confidence: d.Confidence, + DetectArea: ProcessDetectAreaForInference(d.DetectArea), + Port: port, + } +} + +// SaveServiceConfig saves multiple device configurations to service config file +func SaveServiceConfig(deviceDataList []DeviceData) error { + configData, err := json.MarshalIndent(deviceDataList, "", " ") + if err != nil { + logger.Logger.Printf("Failed to serialize service configuration: %v", err) + return err + } + + if err := os.WriteFile(serviceConfigFile, configData, 0644); err != nil { + logger.Logger.Printf("Failed to write service config file: %v", err) + return err + } + + logger.Logger.Printf("Service configuration saved to: %s, %d devices", serviceConfigFile, len(deviceDataList)) + return nil +} + +// SaveSingleServiceConfig saves single device configuration (compatible with existing calls) +func SaveSingleServiceConfig(deviceData DeviceData) error { + // First read existing configuration + existingConfigs, err := LoadServiceConfig() + if err != nil { + // If reading fails, create new configuration list + existingConfigs = []DeviceData{deviceData} + } else { + // Find if configuration with same DeviceUUID already exists + found := false + for i, existing := range existingConfigs { + if existing.DeviceUUID == deviceData.DeviceUUID { + existingConfigs[i] = deviceData + found = true + break + } + } + // If not exists, add to list + if !found { + existingConfigs = append(existingConfigs, deviceData) + } + } + + return SaveServiceConfig(existingConfigs) +} + +// LoadServiceConfig loads multiple device data from service config file +func LoadServiceConfig() ([]DeviceData, error) { + if _, err := os.Stat(serviceConfigFile); os.IsNotExist(err) { + logger.Logger.Printf("Service config file does not exist: %s", serviceConfigFile) + return nil, fmt.Errorf("service config file does not exist") + } + + data, err := os.ReadFile(serviceConfigFile) + if err != nil { + logger.Logger.Printf("Failed to read service config file: %v", err) + return nil, err + } + + // Check if file content is empty + if len(data) == 0 { + logger.Logger.Printf("Service config file is empty") + return nil, fmt.Errorf("service config file is empty") + } + + // Parse as DeviceData array + var deviceDataList []DeviceData + if err := json.Unmarshal(data, &deviceDataList); err != nil { + logger.Logger.Printf("Failed to parse service config file: %v", err) + return nil, err + } + + // Filter out configurations with empty DeviceUUID + var validDeviceDataList []DeviceData + for _, deviceData := range deviceDataList { + if deviceData.DeviceUUID != "" { + validDeviceDataList = append(validDeviceDataList, deviceData) + } + } + const FixedUploadURL = "http://192.168.80.168/api/v1/upload/video" + for i := range validDeviceDataList { + if validDeviceDataList[i].UploadURL == "" { + validDeviceDataList[i].UploadURL = FixedUploadURL + logger.Logger.Printf("Auto-completed UploadURL: %s (DeviceUUID=%s)", + FixedUploadURL, validDeviceDataList[i].DeviceUUID) + } + } + if len(validDeviceDataList) == 0 { + logger.Logger.Printf("No valid device configurations found in service config file") + return nil, fmt.Errorf("no valid device configurations") + } + + logger.Logger.Printf("Loaded %d device configurations from service config file", len(validDeviceDataList)) + return validDeviceDataList, nil + +} + +// ProcessDetectAreaForInference processes detection area data, converts to format required by inference program +func ProcessDetectAreaForInference(areas []Area) string { + if len(areas) == 0 { + return "" + } + + // Only take the first detection area + area := areas[0] + var points []string + + for _, point := range area.Pts { + // Directly round, remove decimals + x := int(point.X) + y := int(point.Y) + points = append(points, fmt.Sprintf("%d,%d", x, y)) + } + + return strings.Join(points, ",") +} + +func ProcessDeviceDataForInference(data DeviceData) DeviceData { + // Create a copy, only process fields that need conversion + processed := data + processed.DetectAreaStr = ProcessDetectAreaForInference(data.DetectArea) + return processed +} + +func (c *WSClient) SendAsync(message interface{}) { + go func() { + ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second) + defer cancel() + + done := make(chan error, 1) + go func() { + done <- c.SendRawMessage(message) + }() + + select { + case err := <-done: + if err != nil { + logger.Logger.Printf("Asynchronous reporting failed: %v", err) + } else { + logger.Logger.Printf("Asynchronous reporting successful") + } + case <-ctx.Done(): + logger.Logger.Printf("Asynchronous reporting timeout (8 seconds no response), continuing business process") + } + }() +} diff --git a/fireleave_tool b/fireleave_tool new file mode 100644 index 0000000..19bd3b5 Binary files /dev/null and b/fireleave_tool differ diff --git a/go.mod b/go.mod new file mode 100644 index 0000000..fb4c44a --- /dev/null +++ b/go.mod @@ -0,0 +1,5 @@ +module FireLeave_tool + +go 1.19 + +require github.com/gorilla/websocket v1.5.3 diff --git a/go.sum b/go.sum new file mode 100644 index 0000000..25a9fc4 --- /dev/null +++ b/go.sum @@ -0,0 +1,2 @@ +github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg= +github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE= diff --git a/httpreader/httpreader.go b/httpreader/httpreader.go new file mode 100644 index 0000000..00329dc --- /dev/null +++ b/httpreader/httpreader.go @@ -0,0 +1,181 @@ +package httpreader + +import ( + "bytes" + "context" + "encoding/json" + "fmt" + "io" + "net/http" + "sync" + "time" + + "FireLeave_tool/logger" +) + +// InferenceResult 推理结果 +type InferenceResult struct { + Serial string `json:"serial"` + At int64 `json:"at"` + Type int `json:"type"` + Params []struct { + ClassIdx int `json:"class_idx"` + Name string `json:"name"` + Number int `json:"number"` + } `json:"params"` +} + +// InferenceServer 推理数据接收服务器 +type InferenceServer struct { + port int + deviceUUID string + server *http.Server + dataChan chan InferenceResult + mu sync.RWMutex + running bool +} + +// NewInferenceServer 创建推理数据接收服务器 +func NewInferenceServer(deviceUUID string, port int) *InferenceServer { + return &InferenceServer{ + port: port, + deviceUUID: deviceUUID, + dataChan: make(chan InferenceResult, 100), // 缓冲通道,避免阻塞 + running: false, + } +} + +// Start 启动HTTP服务器 +func (is *InferenceServer) Start() error { + is.mu.Lock() + defer is.mu.Unlock() + + if is.running { + return fmt.Errorf("服务器已在运行") + } + + mux := http.NewServeMux() + mux.HandleFunc("/video/post", is.handleVideoPost) + mux.HandleFunc("/health", is.handleHealthCheck) + + is.server = &http.Server{ + Addr: fmt.Sprintf(":%d", is.port), + Handler: mux, + ReadTimeout: 10 * time.Second, + WriteTimeout: 10 * time.Second, + } + + go func() { + logger.Logger.Printf("Start the inference data receiving server (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port) + if err := is.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { + logger.Logger.Printf("The inference data receiving server failed to start (DeviceUUID=%s): %v", is.deviceUUID, err) + } + }() + + // 等待服务器启动 + time.Sleep(100 * time.Millisecond) + is.running = true + + logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port) + return nil +} + +// handleVideoPost 处理视频数据POST请求 +func (is *InferenceServer) handleVideoPost(w http.ResponseWriter, r *http.Request) { + startTime := time.Now() + + if r.Method != "POST" { + http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) + return + } + + // 读取原始 body + bodyBytes, err := io.ReadAll(r.Body) + if err != nil { + logger.Logger.Printf("Read request body failed: %v", err) + http.Error(w, "Bad request", http.StatusBadRequest) + return + } + + // 重新设置 body 用于解析 + r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) + + var result InferenceResult + if err := json.NewDecoder(r.Body).Decode(&result); err != nil { + logger.Logger.Printf("Parse JSON failed: %v", err) + http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest) + return + } + + // 验证必要字段 + if result.Serial == "" { + logger.Logger.Printf("Data validation failed: Serial is empty") + http.Error(w, `{"error":"Missing serial field"}`, http.StatusBadRequest) + return + } + + // 只在人员数量变化或重要事件时记录详细日志 + if len(result.Params) > 0 { + hasPerson := false + for _, param := range result.Params { + if param.ClassIdx == 1 && param.Number > 0 { + hasPerson = true + break + } + } + if hasPerson { + logger.Logger.Printf("Inference data: Serial=%s, Person detected, ParamsCount=%d", + result.Serial, len(result.Params)) + } + } + + // 发送到处理通道 + select { + case is.dataChan <- result: + w.Header().Set("Content-Type", "application/json") + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{"status":"ok", "message":"data received"}`)) + // 使用 startTime 记录处理时间 + logger.Logger.Printf("Inference data processed in %v", time.Since(startTime)) + default: + logger.Logger.Printf("Data channel full, discard data") + http.Error(w, `{"error":"Channel full"}`, http.StatusServiceUnavailable) + } +} + +func (is *InferenceServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) { + w.Header().Set("Content-Type", "application/json") + + is.mu.RLock() + running := is.running + is.mu.RUnlock() + + if running { + w.WriteHeader(http.StatusOK) + w.Write([]byte(`{ + "status": "healthy", + "device_uuid": "` + is.deviceUUID + `", + "port": ` + fmt.Sprintf("%d", is.port) + `, + "timestamp": "` + time.Now().Format(time.RFC3339) + `" + }`)) + } else { + w.WriteHeader(http.StatusServiceUnavailable) + w.Write([]byte(`{"status": "unavailable"}`)) + } +} + +func (is *InferenceServer) GetDataChan() <-chan InferenceResult { + return is.dataChan +} + +// StopWithContext 带上下文的停止方法 +func (s *InferenceServer) StopWithContext(ctx context.Context) { + if s.server != nil { + // 使用带超时的关闭 + shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) + defer cancel() + + s.server.Shutdown(shutdownCtx) + logger.Logger.Printf("The HTTP server has been shut down (DeviceUUID=%s)", s.deviceUUID) + } +} diff --git a/httpreader/utils.go b/httpreader/utils.go new file mode 100644 index 0000000..8de6ae0 --- /dev/null +++ b/httpreader/utils.go @@ -0,0 +1,121 @@ +package httpreader + +import ( + "fmt" + "net" + "net/http" + "time" + + "FireLeave_tool/logger" +) + +// CheckHTTPService 检查HTTP服务是否可用 +func CheckHTTPService(port int, timeout time.Duration) bool { + client := &http.Client{ + Timeout: timeout, + } + + url := fmt.Sprintf("http://127.0.0.1:%d/health", port) + resp, err := client.Get(url) + if err != nil { + logger.Logger.Printf("HTTP服务健康检查失败 (Port=%d): %v", port, err) + return false + } + defer resp.Body.Close() + + return resp.StatusCode == http.StatusOK +} + +// CheckPortAvailable 检查端口是否可用 +func CheckPortAvailable(port int) bool { + address := fmt.Sprintf(":%d", port) + listener, err := net.Listen("tcp", address) + if err != nil { + return false + } + listener.Close() + return true +} + +// WaitForHTTPService 等待HTTP服务启动 +func WaitForHTTPService(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + logger.Logger.Printf("等待HTTP服务启动 (Port=%d, Timeout=%v)", port, timeout) + + attempt := 0 + for time.Now().Before(deadline) { + attempt++ + + if CheckHTTPService(port, 2*time.Second) { + logger.Logger.Printf("HTTP服务启动成功 (Port=%d, 尝试次数=%d)", port, attempt) + return true + } + + if attempt == 1 || attempt%5 == 0 { + logger.Logger.Printf("等待HTTP服务... (Port=%d, 尝试次数=%d)", port, attempt) + } + + time.Sleep(1 * time.Second) + } + + logger.Logger.Printf("HTTP服务启动超时 (Port=%d, 总尝试次数=%d)", port, attempt) + return false +} + +// ValidateInferenceResult 验证推理结果数据 +func ValidateInferenceResult(result *InferenceResult) error { + if result.Serial == "" { + return fmt.Errorf("serial字段不能为空") + } + if result.At <= 0 { + return fmt.Errorf("at时间戳无效") + } + + // 验证参数数据 + for i, param := range result.Params { + if param.Number < 0 { + return fmt.Errorf("参数[%d]的number不能为负数", i) + } + if param.ClassIdx < 0 { + return fmt.Errorf("参数[%d]的class_idx不能为负数", i) + } + } + + return nil +} + +// GetPersonCountFromResult 从推理结果中获取人员数量 +func GetPersonCountFromResult(result *InferenceResult) int { + if result == nil || len(result.Params) == 0 { + return 0 + } + + // 查找人员类别的参数(假设 class_idx=1 表示人员) + for _, param := range result.Params { + if param.ClassIdx == 1 { // 人员类别 + return param.Number + } + } + + return 0 +} + +// CreateTestInferenceResult 创建测试用的推理结果(用于调试) +func CreateTestInferenceResult(deviceUUID string, personCount int) InferenceResult { + return InferenceResult{ + Serial: deviceUUID, + At: time.Now().Unix(), + Type: 1, + Params: []struct { + ClassIdx int `json:"class_idx"` + Name string `json:"name"` + Number int `json:"number"` + }{ + { + ClassIdx: 1, + Name: "person", + Number: personCount, + }, + }, + } +} diff --git a/logger/logger.go b/logger/logger.go new file mode 100644 index 0000000..5cb8500 --- /dev/null +++ b/logger/logger.go @@ -0,0 +1,111 @@ +package logger + +import ( + "log" + "os" + "path/filepath" + "sync" + "time" +) + +// Logger 全局日志实例 +var Logger *log.Logger +var logFile *os.File +var logFileName string +var mu sync.Mutex + +// InitLogger 初始化日志,创建 /data/logs 目录并按天命名文件 +func InitLogger() error { + mu.Lock() + defer mu.Unlock() + + // 创建 /data/logs 目录 + logDir := "/data/logs" + if err := os.MkdirAll(logDir, 0755); err != nil { + return err + } + + // 按天生成日志文件名:20260102.log + logFileName = filepath.Join(logDir, time.Now().Format("20060102")+".log") + file, err := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + return err + } + logFile = file + Logger = log.New(file, "", log.LstdFlags) + + // 每天凌晨检查是否需要切换日志文件 + go func() { + for { + // 每天检查一次(凌晨 00:05 执行) + now := time.Now() + next := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 5, 0, 0, now.Location()) + time.Sleep(time.Until(next)) + + mu.Lock() + newFileName := filepath.Join(logDir, time.Now().Format("20060102")+".log") + if newFileName != logFileName { + newFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644) + if err != nil { + log.Printf("Failed to switch the log file: %v", err) + mu.Unlock() + continue + } + logFile.Close() + logFile = newFile + logFileName = newFileName + Logger = log.New(newFile, "", log.LstdFlags) + Logger.Println("The log file has been switched:", newFileName) + } + mu.Unlock() + } + }() + return nil +} + +// CloseLogger 关闭日志文件 +func CloseLogger() { + mu.Lock() + defer mu.Unlock() + if logFile != nil { + logFile.Close() + logFile = nil + } +} + +// CleanupOldLogs 删除超过30天的日志文件 +func CleanupOldLogs() { + logDir := "/data/logs" + files, err := os.ReadDir(logDir) + if err != nil { + if Logger != nil { + Logger.Printf("Read the log directory %s Error: %v", logDir, err) + } + return + } + const maxAge = 30 * 24 * time.Hour // 30天 + for _, file := range files { + if file.IsDir() { + continue + } + info, err := file.Info() + if err != nil { + if Logger != nil { + Logger.Printf("Obtain file information %s Error: %v", file.Name(), err) + } + continue + } + if time.Since(info.ModTime()) > maxAge { + filePath := filepath.Join(logDir, file.Name()) + if err := os.Remove(filePath); err != nil { + if Logger != nil { + Logger.Printf("Delete the old log files %s Error: %v", filePath, err) + } + } else { + if Logger != nil { + Logger.Printf("Delete the old log files %s succeed", filePath) + } + } + } + } +} diff --git a/main.go b/main.go new file mode 100644 index 0000000..db1220c --- /dev/null +++ b/main.go @@ -0,0 +1,843 @@ +package main + +import ( + "FireLeave_tool/connect" + "FireLeave_tool/httpreader" + "FireLeave_tool/logger" + "FireLeave_tool/tcpreader" + "FireLeave_tool/video_server" + "context" + "encoding/json" + "flag" + "fmt" + "log" + "math" + "net" + "net/http" + "os" + "os/exec" + "os/signal" + "path/filepath" + "sync" + "syscall" + "time" +) + +type deviceAlarmState struct { + InCondition bool + StartTime int64 + AlarmPauseUntil int64 +} + +// 全局 map:每个设备独立状态 +var deviceStates sync.Map // map[string]*deviceAlarmState +var mergingAlarms sync.Map + +// DeviceManager 设备管理器 +type DeviceManager struct { + mu sync.Mutex + devices map[string]*connect.DeviceData +} + +// NewDeviceManager 初始化设备管理器 +func NewDeviceManager() *DeviceManager { + return &DeviceManager{ + devices: make(map[string]*connect.DeviceData), + } +} +func getDeviceState(deviceUUID string) *deviceAlarmState { + if s, ok := deviceStates.Load(deviceUUID); ok { + return s.(*deviceAlarmState) + } + s := &deviceAlarmState{} + deviceStates.Store(deviceUUID, s) + return s +} + +// AddDevice 添加设备 +func (dm *DeviceManager) AddDevice(deviceData connect.DeviceData) { + dm.mu.Lock() + defer dm.mu.Unlock() + dm.devices[deviceData.DeviceUUID] = &deviceData + logger.Logger.Printf("add equipments: DeviceUUID=%s", deviceData.DeviceUUID) +} + +// StopDevice 停止设备 +func (dm *DeviceManager) StopDevice(deviceUUID string) { + dm.mu.Lock() + defer dm.mu.Unlock() + if _, exists := dm.devices[deviceUUID]; exists { + delete(dm.devices, deviceUUID) + logger.Logger.Printf("stop equipments: DeviceUUID=%s", deviceUUID) + } +} + +// GetDevice 获取设备 +func (dm *DeviceManager) GetDevice(deviceUUID string) (*connect.DeviceData, bool) { + dm.mu.Lock() + defer dm.mu.Unlock() + device, exists := dm.devices[deviceUUID] + return device, exists +} + +// GlobalDeviceManager 全局设备管理器 +var GlobalDeviceManager = NewDeviceManager() + +// 工具函数 +func waitForHTTPService(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + address := fmt.Sprintf("127.0.0.1:%d", port) + + logger.Logger.Printf("Start waiting for the HTTP service to start (Port=%d, Timeout=%v)", port, timeout) + + attempt := 0 + for time.Now().Before(deadline) { + attempt++ + + // 1. 先检查TCP连接 + conn, err := net.DialTimeout("tcp", address, 2*time.Second) + if err != nil { + if attempt == 1 || attempt%5 == 0 { + logger.Logger.Printf("Wait for the TCP connection... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err) + } + time.Sleep(1 * time.Second) + continue + } + conn.Close() + + // 2. 再检查HTTP服务是否真正就绪 + client := &http.Client{Timeout: 3 * time.Second} + resp, err := client.Get(fmt.Sprintf("http://%s/health", address)) + if err == nil { + resp.Body.Close() + if resp.StatusCode == http.StatusOK { + logger.Logger.Printf("The HTTP service has started successfully (Port=%d, Number of attempts=%d)", port, attempt) + return true + } + } + + if attempt == 1 || attempt%5 == 0 { + logger.Logger.Printf("Wait for the HTTP service to be ready... (Port=%d, Total number of attempts=%d)", port, attempt) + } + time.Sleep(1 * time.Second) + } + + logger.Logger.Printf("HTTP service startup timeout (Port=%d, Total number of attempts=%d)", port, attempt) + return false +} + +// waitForTCPService 增强版本 - 支持更长的等待时间和更好的错误处理 +func waitForTCPService(port int, timeout time.Duration) bool { + deadline := time.Now().Add(timeout) + address := fmt.Sprintf("127.0.0.1:%d", port) + + logger.Logger.Printf("Start waiting for the TCP service to start (Port=%d, Timeout=%v)", port, timeout) + + attempt := 0 + lastError := "" + + for time.Now().Before(deadline) { + attempt++ + conn, err := net.DialTimeout("tcp", address, 2*time.Second) + if err == nil { + conn.Close() + logger.Logger.Printf("The TCP service has been successfully started (Port=%d, Number of attempts=%d)", port, attempt) + return true + } + + currentError := err.Error() + if currentError != lastError { + logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err) + lastError = currentError + } else if attempt%5 == 0 { + // 每5次尝试记录一次相同的错误 + logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err) + } + + time.Sleep(2 * time.Second) // 增加等待间隔 + } + + logger.Logger.Printf("TCP service startup timeout (Port=%d, Total number of attempts=%d, Final error: %s)", port, attempt, lastError) + return false +} + +func logProcessInfo(pid int, deviceUUID string) error { + cmd := exec.Command("ps", "-o", "pid,ppid,user,stat,start_time,cmd", "-p", fmt.Sprintf("%d", pid)) + output, err := cmd.Output() + if err != nil { + return fmt.Errorf("Failed to obtain process information: %v", err) + } + + logger.Logger.Printf("Detailed information about the reasoning program process (DeviceUUID=%s, PID=%d):\n%s", deviceUUID, pid, string(output)) + return nil +} + +func getIntFromNumber(num json.Number, def int) int { + if i, err := num.Int64(); err == nil { + return int(i) + } + return def +} + +// processDevice 处理单个设备 +func processDevice(ctx context.Context, deviceData connect.DeviceData) { + + logger.Logger.Printf("Start processing the equipment: DeviceUUID=%s", deviceData.DeviceUUID) + debugDeviceData(deviceData) + + // 首先将设备添加到设备管理器 + GlobalDeviceManager.AddDevice(deviceData) + defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID) + + // 创建带取消的子上下文 + deviceCtx, cancel := context.WithCancel(ctx) + defer cancel() + + // 分配端口 + port, err := connect.GlobalPortManager.AllocatePort(deviceData.DeviceUUID) + if err != nil { + logger.Logger.Printf(" DeviceUUID=%s Port allocation failed: %v", deviceData.DeviceUUID, err) + return + } + defer connect.GlobalPortManager.ReleasePort(deviceData.DeviceUUID) + logger.Logger.Printf("Port allocation successful: DeviceUUID=%s, Port=%d", deviceData.DeviceUUID, port) + + inferenceServer := httpreader.NewInferenceServer(deviceData.DeviceUUID, port) + if err := inferenceServer.Start(); err != nil { + logger.Logger.Printf("Failed to start the inference data receiving server (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + // 使用带上下文的停止 + defer inferenceServer.StopWithContext(deviceCtx) + logger.Logger.Printf("Wait for the inference data receiving server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port) + + // 等待HTTP服务器启动 + if !waitForHTTPService(port, 10*time.Second) { + logger.Logger.Printf("The inference data receiving server has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port) + return + } + + logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port) + + // 处理检测区域为推理程序需要的格式 + detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea) + logger.Logger.Printf("The processed detection area: %s", detectAreaStr) + + // 启动推理程序 + inferenceCmd := exec.Command( + "./yolov5", + "-s", deviceData.CameraRTSP, + "-m", "model", + "-c", deviceData.DeviceUUID, + "-p", fmt.Sprintf("%d", port), + "-w", detectAreaStr, + "-r", fmt.Sprintf("%d", deviceData.Confidence), + ) + inferenceCmd.Stdout = os.Stdout + inferenceCmd.Stderr = os.Stderr + //logger.Logger.Printf("推理程序输出仅显示在控制台") + + logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args) + + if err := inferenceCmd.Start(); err != nil { + logger.Logger.Printf("The reasoning program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + // 检查推理程序是否启动成功 + time.Sleep(2 * time.Second) + if inferenceCmd.Process == nil { + logger.Logger.Printf("The reasoning program process has not been created (DeviceUUID=%s)", deviceData.DeviceUUID) + return + } + + if err := inferenceCmd.Process.Signal(syscall.Signal(0)); err != nil { + logger.Logger.Printf("The reasoning program has exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + logger.Logger.Printf("The reasoning program process has been started,PID: %d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID) + + if err := logProcessInfo(inferenceCmd.Process.Pid, deviceData.DeviceUUID); err != nil { + logger.Logger.Printf("Failed to record process information: %v", err) + } + + defer func() { + logger.Logger.Printf("Start to stop the reasoning program (DeviceUUID=%s)", deviceData.DeviceUUID) + if inferenceCmd.Process != nil { + if err := inferenceCmd.Process.Kill(); err != nil { + logger.Logger.Printf("The termination of the reasoning program failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + } else { + logger.Logger.Printf("The reasoning program has stopped (DeviceUUID=%s)", deviceData.DeviceUUID) + } + } + }() + + // 启动温度程序 + // 在 processDevice 函数中修改温度程序启动部分 + // 启动温度程序 + tempPort := port + 1000 + tempCmdArgs := []string{ + "-s", deviceData.CameraRTSP, + "-t", fmt.Sprintf("%d", tempPort), + } + + if deviceData.CameraChannel > 0 { + tempCmdArgs = append(tempCmdArgs, "-c", fmt.Sprintf("%d", deviceData.CameraChannel)) + } + + tempCmd := exec.Command("./release/run_thermometry.sh", tempCmdArgs...) + tempCmd.Stdout = os.Stdout + tempCmd.Stderr = os.Stderr + logger.Logger.Printf("Start the temperature program command: %v", tempCmd.Args) + + if err := tempCmd.Start(); err != nil { + logger.Logger.Printf("The temperature program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + // 增强进程检查 + time.Sleep(3 * time.Second) // 减少等待时间 + + if tempCmd.Process == nil || tempCmd.Process.Pid == 0 { + logger.Logger.Printf("The temperature program was not started correctly, and the process PID is 0 (DeviceUUID=%s)", deviceData.DeviceUUID) + return + } + + // 检查进程是否真的在运行 + if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil { + logger.Logger.Printf("The temperature program process check failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + logger.Logger.Printf("The temperature program has been successfully started,PID: %d (DeviceUUID=%s)", tempCmd.Process.Pid, deviceData.DeviceUUID) + + // 增加TCP服务启动前的额外检查 + logger.Logger.Printf("Wait for the temperature program to be initialized... (DeviceUUID=%s)", deviceData.DeviceUUID) + time.Sleep(2 * time.Second) + + // 等待 TCP 服务器就绪 + logger.Logger.Printf("Wait for the temperature program TCP server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort) + if !waitForTCPService(tempPort, 20*time.Second) { // 增加超时时间 + logger.Logger.Printf("The TCP server of the temperature program has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort) + + // 检查温度程序是否还在运行 + if tempCmd.Process != nil { + if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil { + logger.Logger.Printf("The temperature program has abnormally exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + } else { + logger.Logger.Printf("The temperature program is still running but the TCP service has not started. There might be a configuration issue(DeviceUUID=%s)", deviceData.DeviceUUID) + } + } + return + } + + // 创建 TCP 客户端 + tempClient, err := tcpreader.NewTCPClient(deviceData.DeviceUUID, tempPort) + if err != nil { + logger.Logger.Printf("Failed to create a TCP client (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + defer tempClient.Close() + + logger.Logger.Printf("The TCP client has been successfully created (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort) + + // 确保上传目录存在 + if err := os.MkdirAll("/data/upload", 0755); err != nil { + logger.Logger.Printf("Failed to create the upload directory: %v", err) + return + } + logger.Logger.Printf("The directory upload has been successfully created: /data/upload") + + // 主循环:从通道接收推理数据 + logger.Logger.Printf("Start monitoring the equipment (DeviceUUID=%s)", deviceData.DeviceUUID) + + //var lastPersonCount = -1 + //var lastTemperature float64 = -1 + monitorTicker := time.NewTicker(2 * time.Second) + defer monitorTicker.Stop() + + for { + select { + case <-deviceCtx.Done(): + logger.Logger.Printf("Equipment processing has been cancelled: DeviceUUID=%s", deviceData.DeviceUUID) + // 强制停止所有子进程 + if inferenceCmd.Process != nil { + inferenceCmd.Process.Kill() + } + if tempCmd.Process != nil { + tempCmd.Process.Kill() + } + tempClient.Close() + return + + case inferenceResult, ok := <-inferenceServer.GetDataChan(): + if !ok { + logger.Logger.Printf("The inference data channel is closed: DeviceUUID=%s", deviceData.DeviceUUID) + return + } + currentPersonCount := 0 + for _, param := range inferenceResult.Params { + if param.ClassIdx == 1 { + currentPersonCount = param.Number + break + } + } + + if currentPersonCount != deviceData.PersonCount { + deviceData.PersonCount = currentPersonCount + connect.UpdateDeviceData(deviceData) + logger.Logger.Printf("Personnel quantity update: %d (DeviceUUID=%s)", currentPersonCount, deviceData.DeviceUUID) + } + + case <-monitorTicker.C: + now := time.Now().Unix() + + // === 获取当前设备独立状态 === + state := getDeviceState(deviceData.DeviceUUID) + + // === 读取温度(防抖)=== + tempResult, err := tempClient.ReadTemperatureResult() + if err == nil { + if math.Abs(tempResult.Tmp-deviceData.Temperature) >= 0.5 { + oldTemp := deviceData.Temperature + deviceData.Temperature = tempResult.Tmp + connect.UpdateDeviceData(deviceData) + GlobalDeviceManager.AddDevice(deviceData) + logger.Logger.Printf("Temperature update: %.1f°C → %.1f°C (DeviceUUID=%s)", + oldTemp, tempResult.Tmp, deviceData.DeviceUUID) + } + } + + // 获取参数 + detectionSec := getIntFromNumber(deviceData.DetectionTime, 20) + alarmCooldown := getIntFromNumber(deviceData.AlarmTime, 300) + + // === 防抖:无人 + 温度稳定超阈值 === + noPerson := deviceData.PersonCount == 0 + + // 温度防抖 + var tempStableHigh bool + threshold := deviceData.TemperatureThreshold + if deviceData.Temperature > threshold+0.5 { + tempStableHigh = true + } else if deviceData.Temperature < threshold-0.5 { + tempStableHigh = false + } else { + tempStableHigh = deviceData.Temperature > threshold + } + + conditionMet := noPerson && tempStableHigh + + // 1. 冷却期检查 + if now < state.AlarmPauseUntil { + if state.InCondition { + logger.Logger.Printf("Alarm in cooldown: %ds remaining (DeviceUUID=%s)", + state.AlarmPauseUntil-now, deviceData.DeviceUUID) + state.InCondition = false + } + continue + } + + // 2. 条件满足 + if conditionMet { + if !state.InCondition { + state.StartTime = now + state.InCondition = true + logger.Logger.Printf("Alarm condition met, start timing (%ds required): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)", + detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID) + } else if now-state.StartTime >= int64(detectionSec) { + logger.Logger.Printf("ALARM TRIGGERED (after %ds): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)", + detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID) + + go triggerAlarmWithDelay(deviceData, state.StartTime, alarmCooldown) + + state.AlarmPauseUntil = now + int64(alarmCooldown) + state.InCondition = false + + deviceDataCopy := deviceData + deviceDataCopy.AlarmPauseUntil = state.AlarmPauseUntil + connect.UpdateDeviceData(deviceDataCopy) + } + } else { + if state.InCondition { + logger.Logger.Printf("Alarm condition interrupted: lasted %ds < %ds (DeviceUUID=%s)", + now-state.StartTime, detectionSec, deviceData.DeviceUUID) + state.InCondition = false + } + } + + } + } +} + +func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup) { + stop := make(chan os.Signal, 1) + signal.Notify(stop, os.Interrupt, syscall.SIGTERM) + + select { + case <-stop: + logger.Logger.Printf("Upon receiving the stop signal, start cleaning...") + + // First cancel all contexts + cancel() + + // Wait for all goroutines to complete, but with timeout + done := make(chan struct{}) + go func() { + wg.Wait() + close(done) + }() + + select { + case <-done: + logger.Logger.Printf("All goroutines exited normally") + case <-time.After(10 * time.Second): + logger.Logger.Printf("When the waiting time exceeds the limit, force exit") + // Log status before force exit + activeDevices := GlobalDeviceManager.GetAllDevices() + logger.Logger.Printf("It still exists when forced to exit %d An active device", len(activeDevices)) + for uuid := range activeDevices { + logger.Logger.Printf("Forcibly stop the equipment: UUID=%s", uuid) + } + } + + case <-ctx.Done(): + logger.Logger.Printf("The context has been cancelled.") + } + + logger.Logger.Printf("The program exits successfully.") + os.Exit(0) +} + +// initWebSocketAsync Asynchronously initializes WebSocket +func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) { + logger.Logger.Printf("Start the initialization of asynchronous WebSocket...") + + // 1. Initialize WebSocket channel + if err := connect.InitWSChannel(); err != nil { + logger.Logger.Printf("Failed to initialize WebSocket channel: %v", err) + return + } + + // 2. Wait for connection to be ready (with context cancellation) + logger.Logger.Printf("Waiting for WebSocket connection to be established...") + containerName := "fireleave-container" + + for i := 0; i < 30; i++ { + select { + case <-ctx.Done(): + logger.Logger.Printf("Context cancelled, aborting WebSocket registration") + return + case <-time.After(1 * time.Second): + client := connect.GetWSClient() + if client != nil && client.IsConnected() { + // Connection successful → proceed to register + goto register + } + } + } + + logger.Logger.Printf("WebSocket connection timeout after 30 seconds") + return + +register: + // 3. Register service + serviceID, err := connect.RegisterService(containerName) + if err != nil { + logger.Logger.Printf("Service registration failed: %v", err) + } else { + logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID) + } +} + +// GetAllDevices Get all devices (new method) +func (dm *DeviceManager) GetAllDevices() map[string]*connect.DeviceData { + dm.mu.Lock() + defer dm.mu.Unlock() + + // Return copy to avoid concurrent modification + result := make(map[string]*connect.DeviceData) + for k, v := range dm.devices { + result[k] = v + } + return result +} +func jsonString(v any) string { + b, err := json.Marshal(v) + if err != nil { + logger.Logger.Printf("JSON编码失败: %v", err) + return "" + } + return string(b) +} + +// main.go → triggerAlarmWithDelay +func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, cooldown int) { + key := fmt.Sprintf("%s_%d", deviceData.DeviceUUID, alarmTime) + if _, loaded := mergingAlarms.LoadOrStore(key, true); loaded { + return + } + defer mergingAlarms.Delete(key) + + // === 1. 合并视频 === + mergedFile, err := video_server.MergeVideo(deviceData.DeviceUUID, alarmTime) + if err != nil { + logger.Logger.Printf("视频合并失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + // === 2. 上传 Nginx === + var hostUUID string + configs, err := connect.LoadServiceConfig() + if err == nil { + for _, cfg := range configs { + if cfg.DeviceUUID == deviceData.DeviceUUID { + hostUUID = cfg.HostUUID + break + } + } + } + + eventParamsForNginx := map[string]any{ + "alarm_time": alarmTime, + "description": "无人工作区且温度超阈值报警", + "device_UUID": deviceData.DeviceUUID, + "host_uuid": hostUUID, + "person_count": deviceData.PersonCount, + "temperature": deviceData.Temperature, + } + + videoURL, err := video_server.UploadToNginx(mergedFile, eventParamsForNginx, deviceData.UploadURL) + videoURLStr := "" + if err != nil { + logger.Logger.Printf("Nginx上传失败: %v", err) + } else { + videoURLStr = videoURL + logger.Logger.Printf("Nginx上传成功: %s", videoURL) + } + + client := connect.GetWSClient() + if client == nil || !client.IsConnected() { + logger.Logger.Printf("WS未就绪,延迟上报 (DeviceUUID=%s)", deviceData.DeviceUUID) + os.Remove(mergedFile) + return + } + + // === 3. 上报事件 (event) === + eventPayload := map[string]any{ + "type": "event", + "args": map[string]any{ + "level": "alarm", + "description": "无人工作区且温度超阈值报警", + "device_uuid": deviceData.DeviceUUID, + "host_uuid": hostUUID, + "video_url": videoURLStr, + "alarm_time": alarmTime, + "address": deviceData.Address, + }, + } + + eventMsg := map[string]any{ + "method": "metric_data", + "params": map[string]any{ + "route_key": fmt.Sprintf("/dhlr/device/%s/event", deviceData.DeviceUUID), + "metric": jsonString(eventPayload), + }, + } + + client.SendAsync(eventMsg) + logger.Logger.Printf("报警事件已上报 (DeviceUUID=%s)", deviceData.DeviceUUID) + + // === 4. 上报数据快照 (data) === + dataPayload := map[string]any{ + "type": "data", + "args": map[string]any{ + "person_count": deviceData.PersonCount, + "temperature": deviceData.Temperature, + "camera_rtsp": deviceData.CameraRTSP, + "task_id": deviceData.TaskID, + "camera_ip": deviceData.CameraIP, + "confidence": deviceData.Confidence, + "alarm_time": alarmTime, + "address": deviceData.Address, + }, + } + + dataMsg := map[string]any{ + "method": "metric_data", + "params": map[string]any{ + "route_key": fmt.Sprintf("/dhlr/device/%s/data", deviceData.DeviceUUID), + "metric": jsonString(dataPayload), + }, + } + + client.SendAsync(dataMsg) + logger.Logger.Printf("指标快照已上报 (DeviceUUID=%s, Temp=%.1f°C, 人:%d)", + deviceData.DeviceUUID, deviceData.Temperature, deviceData.PersonCount) + + // === 5. 清理本地文件 === + os.Remove(mergedFile) + video_server.CleanupOldVideos(deviceData.DeviceUUID) +} +func waitAndRegister() error { + containerName := os.Getenv("CONTAINER_NAME") + if containerName == "" { + containerName = "fireleave-service-default" + } + + // 等待 WS 连接成功 + for i := 0; i < 30; i++ { + client := connect.GetWSClient() + if client != nil && client.IsConnected() { + // 连接成功 → 立即注册 + serviceID, err := connect.RegisterService(containerName) + if err != nil { + return fmt.Errorf("服务注册失败: %v", err) + } + logger.Logger.Printf("服务注册成功!ServiceID=%s", serviceID) + return nil + } + time.Sleep(1 * time.Second) + } + return fmt.Errorf("WebSocket 连接超时,无法注册服务") +} +func main() { + // Define command line parameters (remove ws parameter) + var rtspURL string + var deviceUUID string + flag.StringVar(&rtspURL, "rtsp", "", "RTSP stream URL for testing") + flag.StringVar(&deviceUUID, "uuid", "", "Device UUID for testing") + flag.Parse() + + // Initialize logger + if err := logger.InitLogger(); err != nil { + log.Fatalf("Logger initialization failed: %v", err) + } + defer logger.CloseLogger() + + // Log cleanup + go func() { + for { + logger.CleanupOldLogs() + time.Sleep(24 * time.Hour) + } + }() + + // Directly enter normal mode (remove wsMode judgment) + logger.Logger.Printf("=== Starting FireLeave Tool ===") + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + var wg sync.WaitGroup + + // 1. Initialize device data directly from service config + logger.Logger.Printf("Step 1: Initialize device data from service config") + deviceDataList, err := connect.InitializeDevicesFromServiceConfig() + if err != nil { + logger.Logger.Printf("Device data initialization failed: %v", err) + logger.Logger.Printf("Will try to use existing device files...") + // If initialization fails, try to load from existing files + deviceDataList = connect.GetDeviceDataList() + } + + logger.Logger.Printf("Step 2: Starting %d devices", len(deviceDataList)) + + for i, deviceData := range deviceDataList { + logger.Logger.Printf("Starting device[%d]: UUID=%s", i, deviceData.DeviceUUID) + wg.Add(1) + go func(deviceData connect.DeviceData, index int) { + defer wg.Done() + defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID) + logger.Logger.Printf("Device processing goroutine started: UUID=%s, Index=%d", deviceData.DeviceUUID, index) + processDevice(ctx, deviceData) + }(deviceData, i) + } + + logger.Logger.Printf("Step 3: All devices started") + + // 3. Asynchronously initialize WebSocket + logger.Logger.Printf("Step 4: Asynchronously initialize WebSocket connection") + go initWebSocketAsync(ctx, &wg) + + logger.Logger.Printf("=== Main program startup completed ===") + logger.Logger.Printf("System status: Running devices=%d", len(deviceDataList)) + + // Add device status monitoring + go monitorDeviceStatus(ctx, deviceDataList) + + // Wait for termination signal + waitForShutdown(ctx, cancel, &wg) +} +func printGlobalDeviceStatus() { + now := time.Now().Unix() + activeDevices := GlobalDeviceManager.GetAllDevices() + + if len(activeDevices) == 0 { + logger.Logger.Printf("=== Global Device Status monitoring === No running devices for the time being") + return + } + + logger.Logger.Printf("=== Global Device Status monitoring === Currently running %d", len(activeDevices)) + + i := 1 + for uuid, data := range activeDevices { + // 从 deviceStates 获取冷却时间 + stateInf, _ := deviceStates.Load(uuid) + state := stateInf.(*deviceAlarmState) + cooldownLeft := int64(0) + if now < state.AlarmPauseUntil { + cooldownLeft = state.AlarmPauseUntil - now + } + + logger.Logger.Printf(" [%d] DeviceUUID=%s | person:%d | temp:%.1f°C | threshold value:%.1f°C | cooling time:%ds", + i, + uuid, + data.PersonCount, + data.Temperature, + data.TemperatureThreshold, + cooldownLeft, + ) + i++ + } +} +func monitorDeviceStatus(ctx context.Context, initialDevices []connect.DeviceData) { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + + logger.Logger.Printf("Start global device status monitoring (print once every 30 seconds)") + + for { + select { + case <-ticker.C: + printGlobalDeviceStatus() + if files, err := filepath.Glob("/data/devices/device_*.json"); err == nil { + logger.Logger.Printf("Device file status: A total of %d device files", len(files)) + for i, file := range files { + if i < 3 { + logger.Logger.Printf(" device file[%d]: %s", i, filepath.Base(file)) + } + } + } + + logger.Logger.Printf("Operating equipment %d ", len(GlobalDeviceManager.GetAllDevices())) + + case <-ctx.Done(): + logger.Logger.Printf("Global device status monitoring has stopped") + return + } + } +} + +// debugDeviceData Debug device data +func debugDeviceData(deviceData connect.DeviceData) { + logger.Logger.Printf("Debug device data: UUID=%s", deviceData.DeviceUUID) + logger.Logger.Printf(" TaskID: %q", deviceData.TaskID) + logger.Logger.Printf(" CameraRTSP: %q", deviceData.CameraRTSP) + logger.Logger.Printf(" CameraIP: %q", deviceData.CameraIP) + logger.Logger.Printf(" UploadURL: %q", deviceData.UploadURL) + logger.Logger.Printf(" DetectAreaStr: %q", deviceData.DetectAreaStr) + logger.Logger.Printf(" PersonCount: %d", deviceData.PersonCount) + logger.Logger.Printf(" Temperature: %f", deviceData.Temperature) +} diff --git a/main_aarch64 b/main_aarch64 new file mode 100644 index 0000000..d645dc3 Binary files /dev/null and b/main_aarch64 differ diff --git a/tcpreader/tcpreader.go b/tcpreader/tcpreader.go new file mode 100644 index 0000000..183e783 --- /dev/null +++ b/tcpreader/tcpreader.go @@ -0,0 +1,76 @@ +package tcpreader + +import ( + "bufio" + "encoding/json" + "fmt" + "net" + "sync" + "time" + + "FireLeave_tool/logger" +) + +// TemperatureResult 温度读取程序返回的数据 +type TemperatureResult struct { + Tmp float64 `json:"tmp"` +} + +// TCPClient TCP 客户端 +type TCPClient struct { + addr string // 地址,例如 "127.0.0.1:59881" + deviceUUID string // 关联的 DeviceUUID + conn net.Conn + mu sync.Mutex +} + +// NewTCPClient 创建新的 TCP 客户端 +func NewTCPClient(deviceUUID string, tempPort int) (*TCPClient, error) { + addr := fmt.Sprintf("127.0.0.1:%d", tempPort) + + conn, err := net.DialTimeout("tcp", addr, 5*time.Second) + if err != nil { + logger.Logger.Printf("Connect TCP server %s failed (DeviceUUID=%s): %v", addr, deviceUUID, err) + return nil, fmt.Errorf("connect TCP server %s failed: %v", addr, err) + } + + client := &TCPClient{ + addr: addr, + deviceUUID: deviceUUID, + conn: conn, + } + logger.Logger.Printf("TCP client created successfully: DeviceUUID=%s, Address=%s", deviceUUID, addr) + return client, nil +} + +// ReadTemperatureResult 读取温度程序数据 +func (tc *TCPClient) ReadTemperatureResult() (TemperatureResult, error) { + tc.mu.Lock() + defer tc.mu.Unlock() + + tc.conn.SetReadDeadline(time.Now().Add(10 * time.Second)) + reader := bufio.NewReader(tc.conn) + message, err := reader.ReadString('\n') + if err != nil { + logger.Logger.Printf("Read temperature data failed (DeviceUUID=%s, Address=%s): %v", tc.deviceUUID, tc.addr, err) + return TemperatureResult{}, fmt.Errorf("read temperature data failed: %v", err) + } + + var result TemperatureResult + if err := json.Unmarshal([]byte(message), &result); err != nil { + logger.Logger.Printf("Parse temperature data failed (DeviceUUID=%s, Address=%s): %v", tc.deviceUUID, tc.addr, err) + return TemperatureResult{}, fmt.Errorf("parse temperature data failed: %v", err) + } + + return result, nil +} + +// Close 关闭 TCP 连接 +func (tc *TCPClient) Close() { + tc.mu.Lock() + defer tc.mu.Unlock() + if tc.conn != nil { + tc.conn.Close() + logger.Logger.Printf("Close TCP connection: DeviceUUID=%s, Address=%s", tc.deviceUUID, tc.addr) + } +} diff --git a/video_server/video_merge.go b/video_server/video_merge.go new file mode 100644 index 0000000..5051f90 --- /dev/null +++ b/video_server/video_merge.go @@ -0,0 +1,294 @@ +package video_server + +import ( + "fmt" + "io" + "os" + "os/exec" + "path/filepath" + "sort" + "strconv" + "strings" + "time" + + "FireLeave_tool/logger" +) + +// MergeVideo 合并视频文件 - 完全适配 mp4_merge --out 模式 +func MergeVideo(deviceUUID string, alarmTime int64) (string, error) { + videoDir := fmt.Sprintf("/usr/data/camera/%s", deviceUUID) + if _, err := os.Stat(videoDir); os.IsNotExist(err) { + return "", fmt.Errorf("video directory not exist: %s", videoDir) + } + + // === 1. 时间对齐到 10 秒 === + alarmTimeAligned := alignTo10Seconds(alarmTime) + alarmTimeStr := time.Unix(alarmTimeAligned, 0).Format("20060102150405") + + beforeTime := alarmTimeAligned - 10 + afterTime := alarmTimeAligned + 10 + + beforeStr := time.Unix(beforeTime, 0).Format("20060102150405") + afterStr := time.Unix(afterTime, 0).Format("20060102150405") + + paths := map[string]string{ + beforeStr: filepath.Join(videoDir, beforeStr+".mp4"), + alarmTimeStr: filepath.Join(videoDir, alarmTimeStr+".mp4"), + afterStr: filepath.Join(videoDir, afterStr+".mp4"), + } + + videoFiles := []string{} + for name, path := range paths { + if _, err := os.Stat(path); err == nil { + videoFiles = append(videoFiles, path) + logger.Logger.Printf("Found video: %s", name+".mp4") + } else { + logger.Logger.Printf("Missing video: %s", name+".mp4") + } + } + + // === 2. 补充替代文件 === + if len(videoFiles) < 3 { + allFiles, _ := os.ReadDir(videoDir) + var candidates []struct { + path string + ts int64 + } + for _, f := range allFiles { + if !f.IsDir() && strings.HasSuffix(f.Name(), ".mp4") && !strings.Contains(f.Name(), "_joined") { + if ts, err := parseSimpleTimestamp(f.Name()); err == nil { + candidates = append(candidates, struct { + path string + ts int64 + }{filepath.Join(videoDir, f.Name()), ts}) + } + } + } + targets := []int64{beforeTime, alarmTimeAligned, afterTime} + for _, target := range targets { + if containsTimestamp(videoFiles, target) { + continue + } + closest := findClosest(candidates, target) + if closest != "" && !contains(videoFiles, closest) { + videoFiles = append(videoFiles, closest) + logger.Logger.Printf("Using fallback: %s", filepath.Base(closest)) + } + } + } + + if len(videoFiles) < 2 { + return "", fmt.Errorf("insufficient video files: %d", len(videoFiles)) + } + + // === 3. 排序 === + sort.Slice(videoFiles, func(i, j int) bool { + return getTimestampFromPath(videoFiles[i]) < getTimestampFromPath(videoFiles[j]) + }) + + logger.Logger.Printf("Merging %d videos:", len(videoFiles)) + for i, f := range videoFiles { + logger.Logger.Printf(" [%d] %s", i, filepath.Base(f)) + } + + // === 4. 显式指定输出文件(100% 可控)=== + outputFile := fmt.Sprintf("%s.mp4_joined.mp4", alarmTimeStr) + joinedPath := filepath.Join(videoDir, outputFile) + + // 清理旧文件(防止冲突) + if err := os.Remove(joinedPath); err != nil && !os.IsNotExist(err) { + logger.Logger.Printf("Failed to remove old output file %s: %v", joinedPath, err) + } + + // === 5. 调用 mp4_merge,显式 --out === + args := append(videoFiles, "--out", joinedPath) + cmd := exec.Command("mp4_merge", args...) + cmd.Stdout = logger.Logger.Writer() + cmd.Stderr = logger.Logger.Writer() + + if err := cmd.Run(); err != nil { + return "", fmt.Errorf("mp4_merge failed: %v", err) + } + + // === 6. 验证输出文件存在 === + if _, err := os.Stat(joinedPath); err != nil { + return "", fmt.Errorf("output file not created: %s, error: %v", joinedPath, err) + } + + info, err := os.Stat(joinedPath) + if err != nil { + return "", fmt.Errorf("stat output file failed: %v", err) + } + logger.Logger.Printf("Merged file created: %s (size: %d bytes)", outputFile, info.Size()) + + // === 7. 移动到 /data/upload/ === + t := time.Unix(alarmTimeAligned, 0) + year, month, day := t.Date() + timestampPart := fmt.Sprintf("%d-%d-%d-%d", year, int(month), day, alarmTimeAligned) + finalName := fmt.Sprintf("%s-%s.MP4", deviceUUID, timestampPart) + finalPath := filepath.Join("/data/upload", finalName) + + if err := os.MkdirAll("/data/upload", 0755); err != nil { + return "", fmt.Errorf("create /data/upload failed: %v", err) + } + + // 跨分区复制 + 删除 + if err := copyFile(joinedPath, finalPath); err != nil { + return "", fmt.Errorf("copy failed: %v", err) + } + if err := os.Remove(joinedPath); err != nil { + logger.Logger.Printf("Failed to delete temp file %s: %v", joinedPath, err) + } + + logger.Logger.Printf("Video merged and moved successfully: %s", finalPath) + return finalPath, nil +} + +// containsTimestamp 检查是否已有该时间戳 +func containsTimestamp(files []string, ts int64) bool { + for _, f := range files { + if getTimestampFromPath(f) == ts { + return true + } + } + return false +} + +// findClosest 查找最接近的时间戳文件 +func findClosest(candidates []struct { + path string + ts int64 +}, target int64) string { + var best string + var minDiff int64 = 1<<63 - 1 + for _, c := range candidates { + diff := abs(c.ts - target) + if diff < minDiff { + minDiff = diff + best = c.path + } + } + if minDiff <= 30 { + return best + } + return "" +} + +func abs(x int64) int64 { + if x < 0 { + return -x + } + return x +} + +// copyFile 支持跨分区复制 +func copyFile(src, dst string) error { + in, err := os.Open(src) + if err != nil { + return err + } + defer in.Close() + + out, err := os.Create(dst) + if err != nil { + return err + } + defer out.Close() + + if _, err = io.Copy(out, in); err != nil { + return err + } + return out.Sync() +} + +// alignTo10Seconds 将时间对齐到最近的10秒间隔 +func alignTo10Seconds(timestamp int64) int64 { + t := time.Unix(timestamp, 0) + // 获取秒数并对齐到10秒 + seconds := t.Second() + alignedSeconds := (seconds / 10) * 10 + // 创建新的时间 + alignedTime := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), alignedSeconds, 0, t.Location()) + return alignedTime.Unix() +} + +// parseSimpleTimestamp 解析简单的时间戳文件名 (如: 20251029181740.mp4) +func parseSimpleTimestamp(filename string) (int64, error) { + // 移除文件扩展名 + nameWithoutExt := strings.TrimSuffix(filename, filepath.Ext(filename)) + + // 文件名应该就是14位时间戳 + if len(nameWithoutExt) != 14 { + //return 0, fmt.Errorf("无效的时间戳格式: %s (期望14位数字)", nameWithoutExt) + } + + // 解析时间戳 + year, _ := strconv.Atoi(nameWithoutExt[0:4]) + month, _ := strconv.Atoi(nameWithoutExt[4:6]) + day, _ := strconv.Atoi(nameWithoutExt[6:8]) + hour, _ := strconv.Atoi(nameWithoutExt[8:10]) + minute, _ := strconv.Atoi(nameWithoutExt[10:12]) + second, _ := strconv.Atoi(nameWithoutExt[12:14]) + + t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local) + return t.Unix(), nil +} + +// getTimestampFromPath 从文件路径中提取时间戳 +func getTimestampFromPath(filePath string) int64 { + filename := filepath.Base(filePath) + timestamp, err := parseSimpleTimestamp(filename) + if err != nil { + return 0 + } + return timestamp +} + +// contains 检查切片是否包含某个元素 +func contains(slice []string, item string) bool { + for _, s := range slice { + if s == item { + return true + } + } + return false +} + +// CleanupOldVideos 清理旧的合并视频文件,保留最近的 10 个 +func CleanupOldVideos(deviceUUID string) { + const maxFiles = 10 + videoDir := "/data/upload" + prefix := fmt.Sprintf("%s_", deviceUUID) + + files, err := os.ReadDir(videoDir) + if err != nil { + logger.Logger.Printf("read the signals %s Error: %v", videoDir, err) + return + } + + var videoFiles []os.DirEntry + for _, file := range files { + if strings.HasPrefix(file.Name(), prefix) && strings.HasSuffix(file.Name(), "_merged.mp4") { + videoFiles = append(videoFiles, file) + } + } + + if len(videoFiles) <= maxFiles { + return + } + + sort.Slice(videoFiles, func(i, j int) bool { + infoI, _ := videoFiles[i].Info() + infoJ, _ := videoFiles[j].Info() + return infoI.ModTime().Before(infoJ.ModTime()) + }) + + for i := 0; i < len(videoFiles)-maxFiles; i++ { + filePath := filepath.Join(videoDir, videoFiles[i].Name()) + if err := os.Remove(filePath); err != nil { + logger.Logger.Printf("Clean up old videos %s Error: %v", filePath, err) + } else { + logger.Logger.Printf("Clean up old videos: %s", filePath) + } + } +} diff --git a/video_server/video_upload.go b/video_server/video_upload.go new file mode 100644 index 0000000..df760c2 --- /dev/null +++ b/video_server/video_upload.go @@ -0,0 +1,100 @@ +package video_server + +import ( + "bytes" + "encoding/json" + "fmt" + "io" + "mime/multipart" + "net/http" + "os" + "path/filepath" + "time" + + "FireLeave_tool/logger" +) + +// UploadToNginx 上传视频到 Nginx 服务器,返回 URL +func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string) (string, error) { + // === 1. 打开文件 === + file, err := os.Open(filePath) + if err != nil { + return "", fmt.Errorf("open file failed: %w", err) + } + defer file.Close() + + // === 2. 创建表单 === + body := new(bytes.Buffer) + writer := multipart.NewWriter(body) + + // --- 上传文件 --- + part, err := writer.CreateFormFile("file", filepath.Base(filePath)) + if err != nil { + return "", fmt.Errorf("create form file failed: %w", err) + } + if _, err = io.Copy(part, file); err != nil { + return "", fmt.Errorf("copy file failed: %w", err) + } + + // --- 上传 JSON --- + eventJSON, err := json.Marshal(eventParams) + if err != nil { + return "", fmt.Errorf("marshal event failed: %w", err) + } + logger.Logger.Printf("[Upload Event parameters]eventParams: %v", eventParams) + logger.Logger.Printf("[Upload Event parameters]eventJSON: %s", string(eventJSON)) + + if err := writer.WriteField("event", string(eventJSON)); err != nil { + return "", fmt.Errorf("write event field failed: %w", err) + } + + if err := writer.Close(); err != nil { + return "", fmt.Errorf("close writer failed: %w", err) + } + + // === 3. 打印请求头 === + logger.Logger.Printf("[HTTP Request]URL: %s", uploadURL) + logger.Logger.Printf("[HTTP Request]Content-Type: %s", writer.FormDataContentType()) + logger.Logger.Printf("[HTTP Request]Body 长度: %d bytes", body.Len()) + + // === 4. 发送请求 === + req, err := http.NewRequest("POST", uploadURL, body) + if err != nil { + return "", fmt.Errorf("create request failed: %w", err) + } + req.Header.Set("Content-Type", writer.FormDataContentType()) + + client := &http.Client{Timeout: 180 * time.Second} + resp, err := client.Do(req) + if err != nil { + return "", fmt.Errorf("upload failed: %w", err) + } + defer resp.Body.Close() + + // === 5. 打印响应 === + respBody, _ := io.ReadAll(resp.Body) + logger.Logger.Printf("[nginx response]Status: %d", resp.StatusCode) + logger.Logger.Printf("[nginx response]Body: %s", string(respBody)) + + if resp.StatusCode != http.StatusOK { + return "", fmt.Errorf("nginx error %d: %s", resp.StatusCode, string(respBody)) + } + + // === 6. 解析 video_url === + var result struct { + Code int `json:"code"` + Result json.RawMessage `json:"result"` + } + if err := json.Unmarshal(respBody, &result); err != nil { + return "", fmt.Errorf("parse response failed: %w", err) + } + if result.Code != 0 { + return "", fmt.Errorf("nginx error: %s", string(respBody)) + } + + var videoURL string + json.Unmarshal(result.Result, &videoURL) + + logger.Logger.Printf("[ Upload Successful ] Local file: %s → Remote URL: %s", filePath, videoURL) + return videoURL, nil +}