diff --git a/Dockerfile b/Dockerfile new file mode 100644 index 0000000..e923c1e --- /dev/null +++ b/Dockerfile @@ -0,0 +1,22 @@ +FROM debian:bullseye-slim + +RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \ + echo "Asia/Shanghai" > /etc/timezone + +# 复制 deb 包到容器中 +COPY mp4-merge_0.1.11-1_arm64.deb /tmp/ + + +RUN dpkg -i /tmp/mp4-merge_0.1.11-1_arm64.deb || (apt-get update && apt-get install -f -y) + + +COPY models/ /usr/local/models/ + +# 复制启动脚本 +COPY start.sh /usr/local/start.sh +RUN chmod +x /usr/local/start.sh + +WORKDIR /usr/local/models +RUN chmod +x yolov5 + +CMD ["/usr/local/start.sh"] \ No newline at end of file diff --git a/connect/device_storage.go b/connect/device_storage.go index 2cd4235..11a08dd 100644 --- a/connect/device_storage.go +++ b/connect/device_storage.go @@ -9,13 +9,11 @@ import ( "strings" ) -// Modified device data storage path var ( devicesBaseDir = "/data/devices" deviceDataList []DeviceData ) -// InitializeDevicesFromServiceConfig Initialize device data from service configuration func InitializeDevicesFromServiceConfig() ([]DeviceData, error) { deviceDataMutex.Lock() defer deviceDataMutex.Unlock() @@ -38,15 +36,12 @@ func InitializeDevicesFromServiceConfig() ([]DeviceData, error) { logger.Logger.Printf("Loaded %d devices from service configuration", len(configs)) - // Clear existing device data deviceDataList = []DeviceData{} - // Process each device and save to individual files for i, config := range configs { processedData := ProcessDeviceDataForInference(config) deviceDataList = append(deviceDataList, processedData) - // Save to individual device file if err := saveDeviceToFile(processedData); err != nil { logger.Logger.Printf("Failed to save the device file [%s]: %v", processedData.DeviceUUID, err) continue @@ -58,11 +53,9 @@ func InitializeDevicesFromServiceConfig() ([]DeviceData, error) { return deviceDataList, nil } -// saveDeviceToFile Save individual device data to separate file func saveDeviceToFile(deviceData DeviceData) error { filePath := filepath.Join(devicesBaseDir, fmt.Sprintf("device_%s.json", deviceData.DeviceUUID)) - // Clean device data to ensure no invalid characters cleanedData := cleanDeviceData(deviceData) data, err := json.MarshalIndent(cleanedData, "", " ") @@ -71,7 +64,6 @@ func saveDeviceToFile(deviceData DeviceData) error { return fmt.Errorf("Device data serialization failed: %v", err) } - // Write directly to file if err := os.WriteFile(filePath, data, 0644); err != nil { return fmt.Errorf("Failed to write device file: %v", err) } @@ -80,11 +72,9 @@ func saveDeviceToFile(deviceData DeviceData) error { return nil } -// cleanDeviceData Clean device data, remove possible invalid characters func cleanDeviceData(data DeviceData) DeviceData { cleaned := data - // Clean string fields cleaned.DeviceUUID = cleanString(data.DeviceUUID) cleaned.TaskID = cleanString(data.TaskID) cleaned.CameraRTSP = cleanString(data.CameraRTSP) @@ -92,9 +82,8 @@ func cleanDeviceData(data DeviceData) DeviceData { cleaned.UploadURL = cleanString(data.UploadURL) cleaned.DetectAreaStr = cleanString(data.DetectAreaStr) - // Ensure numeric fields are valid if cleaned.Confidence < 0 || cleaned.Confidence > 100 { - cleaned.Confidence = 70 // Default value + cleaned.Confidence = 70 } if cleaned.CameraChannel < 0 { cleaned.CameraChannel = 0 @@ -106,14 +95,14 @@ func cleanDeviceData(data DeviceData) DeviceData { cleaned.Temperature = 0 } if cleaned.TemperatureThreshold <= 0 { - cleaned.TemperatureThreshold = 45.0 // Default value + cleaned.TemperatureThreshold = 45.0 } return cleaned } func cleanString(s string) string { - // Remove control characters and invalid Unicode characters + return strings.Map(func(r rune) rune { if r == 0xFFFD { // Unicode replacement character return -1 @@ -125,12 +114,10 @@ func cleanString(s string) string { }, s) } -// GetDeviceDataList Get device data list func GetDeviceDataList() []DeviceData { deviceDataMutex.Lock() defer deviceDataMutex.Unlock() - // If no data in memory, try to load from device files if len(deviceDataList) == 0 { logger.Logger.Printf("No device data in memory, loading from device files...") if err := loadDevicesFromFiles(); err != nil { @@ -139,18 +126,15 @@ func GetDeviceDataList() []DeviceData { } } - // Return copy result := make([]DeviceData, len(deviceDataList)) copy(result, deviceDataList) return result } -// loadDevicesFromFiles Load data from device files func loadDevicesFromFiles() error { // Clear existing data deviceDataList = []DeviceData{} - // Read all files in device directory files, err := filepath.Glob(filepath.Join(devicesBaseDir, "device_*.json")) if err != nil { return fmt.Errorf("Failed to find device files: %v", err) @@ -179,7 +163,6 @@ func loadDevicesFromFiles() error { return nil } -// UpdateDeviceData Update device data func UpdateDeviceData(data DeviceData) error { deviceDataMutex.Lock() defer deviceDataMutex.Unlock() @@ -189,7 +172,6 @@ func UpdateDeviceData(data DeviceData) error { // Clean input data cleanedData := cleanDeviceData(data) - // Find if device with same DeviceUUID already exists found := false for i, existing := range deviceDataList { if existing.DeviceUUID == cleanedData.DeviceUUID { @@ -200,13 +182,11 @@ func UpdateDeviceData(data DeviceData) error { } } - // If not exists, add to list if !found { logger.Logger.Printf("Adding new device to list: DeviceUUID=%s", cleanedData.DeviceUUID) deviceDataList = append(deviceDataList, cleanedData) } - // Save to individual file if err := saveDeviceToFile(cleanedData); err != nil { logger.Logger.Printf("Failed to save device file: %v", err) return err diff --git a/connect/heartbeat.go b/connect/heartbeat.go new file mode 100644 index 0000000..99d0256 --- /dev/null +++ b/connect/heartbeat.go @@ -0,0 +1,88 @@ +package connect + +import ( + "fmt" + "os" + "os/exec" + "sync" + "time" + + "FireLeave_tool/logger" +) + +var ( + InferenceProcesses sync.Map // key: deviceUUID string, value: *exec.Cmd + + // 记录最后一次收到心跳的时间 + inferenceLastHeartbeat sync.Map // key: deviceUUID string, value: time.Time +) + +// UpdateInferenceHeartbeat 每收到一帧 /video/post 就调用这个 +func UpdateInferenceHeartbeat(deviceUUID string) { + inferenceLastHeartbeat.Store(deviceUUID, time.Now()) + +} + +// ShouldRestartInference 判断是否需要重启推理程序 +func ShouldRestartInference(deviceUUID string) bool { + val, ok := inferenceLastHeartbeat.Load(deviceUUID) + if !ok { + return true // 从未收到过心跳,肯定要启动 + } + last := val.(time.Time) + return time.Since(last) > 85*time.Second +} + +// StopInferenceProcess 停止推理进程(供 main 和 DeviceManager 调用) +func StopInferenceProcess(deviceUUID string) { + if raw, ok := InferenceProcesses.Load(deviceUUID); ok { + if cmd, ok := raw.(*exec.Cmd); ok && cmd.Process != nil { + logger.Logger.Printf("正在停止推理进程 (DeviceUUID=%s, PID=%d)", deviceUUID, cmd.Process.Pid) + cmd.Process.Kill() + cmd.Wait() + } + InferenceProcesses.Delete(deviceUUID) + } +} + +// StartInferenceProcess 重启推理程序(保持原端口!) +func StartInferenceProcess(deviceData *DeviceData) { + + StopInferenceProcess(deviceData.DeviceUUID) + + port, err := GlobalPortManager.GetPort(deviceData.DeviceUUID) + if err != nil { + logger.Logger.Printf("获取端口失败,无法重启推理程序 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + detectAreaStr := ProcessDetectAreaForInference(deviceData.DetectArea) + + inferenceCmd := exec.Command( + "./yolov5", + "-s", deviceData.CameraRTSP, + "-m", "model", + "-c", deviceData.DeviceUUID, + "-p", fmt.Sprintf("%d", port), + "-w", detectAreaStr, + "-r", fmt.Sprintf("%d", deviceData.Confidence), + ) + + inferenceCmd.Stdout = os.Stdout + inferenceCmd.Stderr = os.Stderr + + logger.Logger.Printf("正在重启推理程序 (使用原端口 %d): %v", port, inferenceCmd.Args) + + if err := inferenceCmd.Start(); err != nil { + logger.Logger.Printf("推理程序重启失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return + } + + // 保存进程对象 + InferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd) + logger.Logger.Printf("推理程序已成功重启,PID=%d (DeviceUUID=%s, Port=%d)", + inferenceCmd.Process.Pid, deviceData.DeviceUUID, port) + + // 重启后立即刷新一次心跳,防止 30s 检测又触发 + UpdateInferenceHeartbeat(deviceData.DeviceUUID) +} diff --git a/connect/port_manager.go b/connect/port_manager.go index 1316105..d556e71 100644 --- a/connect/port_manager.go +++ b/connect/port_manager.go @@ -20,7 +20,7 @@ type PortManager struct { } func NewPortManager() *PortManager { - // 推理端口范围:58881-58890(HTTP),支持最多10个设备 + ports := []int{58881, 58882, 58883, 58884, 58885, 58886, 58887, 58888, 58889, 58890} // 初始化可用端口映射 diff --git a/connect/ws_channel.go b/connect/ws_channel.go index df4ffea..14c4674 100644 --- a/connect/ws_channel.go +++ b/connect/ws_channel.go @@ -511,6 +511,7 @@ func (c *WSClient) heartbeatLoop() { } } } + func (c *WSClient) reRegisterAndResubscribe() { // 1. 等待连接稳定 time.Sleep(1 * time.Second) diff --git a/fire_leave_config b/fire_leave_config new file mode 100644 index 0000000..8a656ea --- /dev/null +++ b/fire_leave_config @@ -0,0 +1,24 @@ +{ + "image": "fire_leave:latest", + "container_name": "fire_leave", + "command": [ + "/bin/bash", + "/usr/local/start.sh" + ], + "volumes": [ + "/lib/npu:/lib/npu", + "/lib:/lib:ro", + "/usr/lib:/usr/lib:ro" + ], + "restart": "always", + "network_mode": "host", + "working_dir": "/usr/local/models", + "hostname": "myhost", + "privileged": true, + "devices": [ + "/dev/galcore:/dev/galcore", + "/dev/dri/card0:/dev/dri/card0", + "/dev/vpu_service:/dev/vpu_service", + "/dev/rga:/dev/rga" + ] + } \ No newline at end of file diff --git a/fireleave_tool b/fireleave_tool index f7e78b9..5da733e 100644 Binary files a/fireleave_tool and b/fireleave_tool differ diff --git a/httpreader/httpreader.go b/httpreader/httpreader.go index 00329dc..208a109 100644 --- a/httpreader/httpreader.go +++ b/httpreader/httpreader.go @@ -1,12 +1,14 @@ package httpreader import ( + "FireLeave_tool/connect" "bytes" "context" "encoding/json" "fmt" "io" "net/http" + "os" "sync" "time" @@ -113,20 +115,17 @@ func (is *InferenceServer) handleVideoPost(w http.ResponseWriter, r *http.Reques http.Error(w, `{"error":"Missing serial field"}`, http.StatusBadRequest) return } + now := time.Now().Format("2006/01/02 15:04:05.000") + logger.Logger.Printf("♥ 推理心跳收到 | 时间: %s | UUID: %s | Serial: %s | 人数量: %d", + now, is.deviceUUID, result.Serial, getPersonCount(result)) - // 只在人员数量变化或重要事件时记录详细日志 - if len(result.Params) > 0 { - hasPerson := false - for _, param := range result.Params { - if param.ClassIdx == 1 && param.Number > 0 { - hasPerson = true - break - } - } - if hasPerson { - logger.Logger.Printf("Inference data: Serial=%s, Person detected, ParamsCount=%d", - result.Serial, len(result.Params)) - } + // 这一行就是你缺失的救命稻草! + connect.UpdateInferenceHeartbeat(is.deviceUUID) + + // 额外写一份永不丢失的独立心跳日志 + if f, err := os.OpenFile("/data/heartbeat.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644); err == nil { + defer f.Close() + fmt.Fprintf(f, "[%s] HEARTBEAT OK | %s | person=%d\n", now, is.deviceUUID, getPersonCount(result)) } // 发送到处理通道 @@ -179,3 +178,11 @@ func (s *InferenceServer) StopWithContext(ctx context.Context) { logger.Logger.Printf("The HTTP server has been shut down (DeviceUUID=%s)", s.deviceUUID) } } +func getPersonCount(result InferenceResult) int { + for _, p := range result.Params { + if p.ClassIdx == 1 { + return p.Number + } + } + return 0 +} diff --git a/httpreader/utils.go b/httpreader/utils.go deleted file mode 100644 index 8de6ae0..0000000 --- a/httpreader/utils.go +++ /dev/null @@ -1,121 +0,0 @@ -package httpreader - -import ( - "fmt" - "net" - "net/http" - "time" - - "FireLeave_tool/logger" -) - -// CheckHTTPService 检查HTTP服务是否可用 -func CheckHTTPService(port int, timeout time.Duration) bool { - client := &http.Client{ - Timeout: timeout, - } - - url := fmt.Sprintf("http://127.0.0.1:%d/health", port) - resp, err := client.Get(url) - if err != nil { - logger.Logger.Printf("HTTP服务健康检查失败 (Port=%d): %v", port, err) - return false - } - defer resp.Body.Close() - - return resp.StatusCode == http.StatusOK -} - -// CheckPortAvailable 检查端口是否可用 -func CheckPortAvailable(port int) bool { - address := fmt.Sprintf(":%d", port) - listener, err := net.Listen("tcp", address) - if err != nil { - return false - } - listener.Close() - return true -} - -// WaitForHTTPService 等待HTTP服务启动 -func WaitForHTTPService(port int, timeout time.Duration) bool { - deadline := time.Now().Add(timeout) - logger.Logger.Printf("等待HTTP服务启动 (Port=%d, Timeout=%v)", port, timeout) - - attempt := 0 - for time.Now().Before(deadline) { - attempt++ - - if CheckHTTPService(port, 2*time.Second) { - logger.Logger.Printf("HTTP服务启动成功 (Port=%d, 尝试次数=%d)", port, attempt) - return true - } - - if attempt == 1 || attempt%5 == 0 { - logger.Logger.Printf("等待HTTP服务... (Port=%d, 尝试次数=%d)", port, attempt) - } - - time.Sleep(1 * time.Second) - } - - logger.Logger.Printf("HTTP服务启动超时 (Port=%d, 总尝试次数=%d)", port, attempt) - return false -} - -// ValidateInferenceResult 验证推理结果数据 -func ValidateInferenceResult(result *InferenceResult) error { - if result.Serial == "" { - return fmt.Errorf("serial字段不能为空") - } - if result.At <= 0 { - return fmt.Errorf("at时间戳无效") - } - - // 验证参数数据 - for i, param := range result.Params { - if param.Number < 0 { - return fmt.Errorf("参数[%d]的number不能为负数", i) - } - if param.ClassIdx < 0 { - return fmt.Errorf("参数[%d]的class_idx不能为负数", i) - } - } - - return nil -} - -// GetPersonCountFromResult 从推理结果中获取人员数量 -func GetPersonCountFromResult(result *InferenceResult) int { - if result == nil || len(result.Params) == 0 { - return 0 - } - - // 查找人员类别的参数(假设 class_idx=1 表示人员) - for _, param := range result.Params { - if param.ClassIdx == 1 { // 人员类别 - return param.Number - } - } - - return 0 -} - -// CreateTestInferenceResult 创建测试用的推理结果(用于调试) -func CreateTestInferenceResult(deviceUUID string, personCount int) InferenceResult { - return InferenceResult{ - Serial: deviceUUID, - At: time.Now().Unix(), - Type: 1, - Params: []struct { - ClassIdx int `json:"class_idx"` - Name string `json:"name"` - Number int `json:"number"` - }{ - { - ClassIdx: 1, - Name: "person", - Number: personCount, - }, - }, - } -} diff --git a/main.go b/main.go index 811ebf0..659411c 100644 --- a/main.go +++ b/main.go @@ -23,10 +23,6 @@ import ( "time" ) -var ( - inferenceLastHeartbeat sync.Map // key: deviceUUID, value: time.Time -) - type deviceAlarmState struct { InCondition bool StartTime int64 @@ -250,12 +246,12 @@ func processDevice(ctx context.Context, deviceData connect.DeviceData) { } // 关键:保存进程对象到全局 map - inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd) + connect.InferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd) logger.Logger.Printf("推理程序已启动,PID=%d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID) // 初始化心跳 - UpdateInferenceHeartbeat(deviceData.DeviceUUID) + connect.UpdateInferenceHeartbeat(deviceData.DeviceUUID) // 检查推理程序是否启动成功 time.Sleep(2 * time.Second) @@ -492,10 +488,7 @@ func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.Wa case <-stop: logger.Logger.Printf("Upon receiving the stop signal, start cleaning...") - // First cancel all contexts cancel() - - // Wait for all goroutines to complete, but with timeout done := make(chan struct{}) go func() { wg.Wait() @@ -523,17 +516,14 @@ func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.Wa os.Exit(0) } -// initWebSocketAsync Asynchronously initializes WebSocket func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) { logger.Logger.Printf("Start the initialization of asynchronous WebSocket...") - // 1. Initialize WebSocket channel if err := connect.InitWSChannel(); err != nil { logger.Logger.Printf("Failed to initialize WebSocket channel: %v", err) return } - // 2. Wait for connection to be ready (with context cancellation) logger.Logger.Printf("Waiting for WebSocket connection to be established...") containerName := "fireleave-container" @@ -545,7 +535,7 @@ func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) { case <-time.After(1 * time.Second): client := connect.GetWSClient() if client != nil && client.IsConnected() { - // Connection successful → proceed to register + goto register } } @@ -555,7 +545,7 @@ func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) { return register: - // 3. Register service + serviceID, err := connect.RegisterService(containerName) if err != nil { logger.Logger.Printf("Service registration failed: %v", err) @@ -571,12 +561,10 @@ register: } } -// GetAllDevices Get all devices (new method) func (dm *DeviceManager) GetAllDevices() map[string]*connect.DeviceData { dm.mu.Lock() defer dm.mu.Unlock() - // Return copy to avoid concurrent modification result := make(map[string]*connect.DeviceData) for k, v := range dm.devices { result[k] = v @@ -701,99 +689,29 @@ func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, coold video_server.CleanupOldRawVideos(deviceData.DeviceUUID) video_server.CleanupOldVideos(deviceData.DeviceUUID) } -func UpdateInferenceHeartbeat(deviceUUID string) { - inferenceLastHeartbeat.Store(deviceUUID, time.Now()) -} -func ShouldRestartInference(deviceUUID string) bool { - if val, ok := inferenceLastHeartbeat.Load(deviceUUID); ok { - last := val.(time.Time) - return time.Since(last) > 90*time.Second - } - return true // 从未收到过,也要启动 -} func (dm *DeviceManager) RangeDevices(fn func(uuid string, device *connect.DeviceData) bool) { dm.mu.Lock() defer dm.mu.Unlock() - for uuid, device := range dm.devices { - // 拷贝一份指针,防止外部修改影响遍历 - devCopy := device - if !fn(uuid, devCopy) { + + if !fn(uuid, device) { return } } } - -// StopInferenceProcess 停止指定设备的推理进程(你现在 defer 中就是这么干的,抽出来统一管理) -func (dm *DeviceManager) StopInferenceProcess(deviceUUID string) { - // 这里你之前没有保存 cmd,我们需要一个地方存起来! - // 所以我们要加一个 map 保存每个设备的推理进程 - if cmd, ok := inferenceProcesses.Load(deviceUUID); ok { - if process, ok2 := cmd.(*exec.Cmd); ok2 && process.Process != nil { - logger.Logger.Printf("正在停止推理进程 (DeviceUUID=%s, PID=%d)", deviceUUID, process.Process.Pid) - process.Process.Kill() - process.Wait() // 等待退出,防止僵尸进程 - } - inferenceProcesses.Delete(deviceUUID) - } -} - -// StartInferenceProcess 启动推理进程(把你 processDevice 里启动推理的部分抽出来) -func (dm *DeviceManager) StartInferenceProcess(deviceData *connect.DeviceData) { - dm.StopInferenceProcess(deviceData.DeviceUUID) - - // 正确:只查询,不分配 - port, err := connect.GlobalPortManager.GetPort(deviceData.DeviceUUID) - if err != nil { - logger.Logger.Printf("获取端口失败,无法重启推理程序 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) - return - } - - detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea) - - inferenceCmd := exec.Command( - "./yolov5", - "-s", deviceData.CameraRTSP, - "-m", "model", - "-c", deviceData.DeviceUUID, - "-p", fmt.Sprintf("%d", port), // 用老端口! - "-w", detectAreaStr, - "-r", fmt.Sprintf("%d", deviceData.Confidence), - ) - inferenceCmd.Stdout = os.Stdout - inferenceCmd.Stderr = os.Stderr - - logger.Logger.Printf("正在重启推理程序 (使用原端口 %d): %v", port, inferenceCmd.Args) - - if err := inferenceCmd.Start(); err != nil { - logger.Logger.Printf("推理程序重启失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) - return - } - - inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd) - logger.Logger.Printf("推理程序已成功重启,PID=%d (DeviceUUID=%s, Port=%d)", - inferenceCmd.Process.Pid, deviceData.DeviceUUID, port) - - UpdateInferenceHeartbeat(deviceData.DeviceUUID) -} - -var inferenceProcesses sync.Map - func main() { - // Define command line parameters (remove ws parameter) + var rtspURL string var deviceUUID string flag.StringVar(&rtspURL, "rtsp", "", "RTSP stream URL for testing") flag.StringVar(&deviceUUID, "uuid", "", "Device UUID for testing") flag.Parse() - // Initialize logger if err := logger.InitLogger(); err != nil { log.Fatalf("Logger initialization failed: %v", err) } defer logger.CloseLogger() - // Log cleanup go func() { for { logger.CleanupOldLogs() @@ -801,20 +719,18 @@ func main() { } }() - // Directly enter normal mode (remove wsMode judgment) logger.Logger.Printf("=== Starting FireLeave Tool ===") ctx, cancel := context.WithCancel(context.Background()) defer cancel() var wg sync.WaitGroup - // 1. Initialize device data directly from service config logger.Logger.Printf("Step 1: Initialize device data from service config") deviceDataList, err := connect.InitializeDevicesFromServiceConfig() if err != nil { logger.Logger.Printf("Device data initialization failed: %v", err) logger.Logger.Printf("Will try to use existing device files...") - // If initialization fails, try to load from existing files + deviceDataList = connect.GetDeviceDataList() } @@ -833,7 +749,6 @@ func main() { logger.Logger.Printf("Step 3: All devices started") - // 3. Asynchronously initialize WebSocket logger.Logger.Printf("Step 4: Asynchronously initialize WebSocket connection") go initWebSocketAsync(ctx, &wg) @@ -848,12 +763,12 @@ func main() { defer ticker.Stop() for range ticker.C { GlobalDeviceManager.RangeDevices(func(uuid string, device *connect.DeviceData) bool { - if ShouldRestartInference(uuid) { + if connect.ShouldRestartInference(uuid) { logger.Logger.Printf("推理程序心跳超时(>90s),准备重启: DeviceUUID=%s", uuid) - GlobalDeviceManager.StopInferenceProcess(uuid) - time.Sleep(2 * time.Second) - GlobalDeviceManager.StartInferenceProcess(device) - UpdateInferenceHeartbeat(uuid) // 防止重复重启 + connect.StopInferenceProcess(uuid) + time.Sleep(6 * time.Second) + connect.StartInferenceProcess(device) + connect.UpdateInferenceHeartbeat(uuid) } return true }) @@ -867,7 +782,7 @@ func main() { } } }() - // Wait for termination signal + waitForShutdown(ctx, cancel, &wg) } func printGlobalDeviceStatus() { @@ -883,7 +798,7 @@ func printGlobalDeviceStatus() { i := 1 for uuid, data := range activeDevices { - // 从 deviceStates 获取冷却时间 + stateInf, _ := deviceStates.Load(uuid) state := stateInf.(*deviceAlarmState) cooldownLeft := int64(0) @@ -930,7 +845,6 @@ func monitorDeviceStatus(ctx context.Context, initialDevices []connect.DeviceDat } } -// debugDeviceData Debug device data func debugDeviceData(deviceData connect.DeviceData) { logger.Logger.Printf("Debug device data: UUID=%s", deviceData.DeviceUUID) logger.Logger.Printf(" TaskID: %q", deviceData.TaskID) diff --git a/tcpreader/tcpreader.go b/tcpreader/tcpreader.go index 183e783..57fc4d1 100644 --- a/tcpreader/tcpreader.go +++ b/tcpreader/tcpreader.go @@ -18,8 +18,8 @@ type TemperatureResult struct { // TCPClient TCP 客户端 type TCPClient struct { - addr string // 地址,例如 "127.0.0.1:59881" - deviceUUID string // 关联的 DeviceUUID + addr string + deviceUUID string conn net.Conn mu sync.Mutex } diff --git a/video_server/video_merge.go b/video_server/video_merge.go index e17a00e..d47ecdd 100644 --- a/video_server/video_merge.go +++ b/video_server/video_merge.go @@ -212,14 +212,13 @@ func alignTo10Seconds(timestamp int64) int64 { return alignedTime.Unix() } -// parseSimpleTimestamp 解析简单的时间戳文件名 (如: 20251029181740.mp4) +// parseSimpleTimestamp 解析简单的时间戳文件名 func parseSimpleTimestamp(filename string) (int64, error) { - // 移除文件扩展名 + nameWithoutExt := strings.TrimSuffix(filename, filepath.Ext(filename)) - // 文件名应该就是14位时间戳 if len(nameWithoutExt) != 14 { - //return 0, fmt.Errorf("无效的时间戳格式: %s (期望14位数字)", nameWithoutExt) + } // 解析时间戳 @@ -297,7 +296,7 @@ func CleanupOldVideos(deviceUUID string) { func CleanupOldRawVideos(deviceUUID string) { rawDir := filepath.Join("/usr/data/camera", deviceUUID) if _, err := os.Stat(rawDir); os.IsNotExist(err) { - return // 目录不存在,直接返回 + return } files, err := os.ReadDir(rawDir) @@ -308,7 +307,7 @@ func CleanupOldRawVideos(deviceUUID string) { var videoFiles []string for _, file := range files { - if !file.IsDir() && filepath.Ext(file.Name()) == ".mp4" { // 假设是mp4,也可以加更多判断 + if !file.IsDir() && filepath.Ext(file.Name()) == ".mp4" { videoFiles = append(videoFiles, filepath.Join(rawDir, file.Name())) } } @@ -317,14 +316,12 @@ func CleanupOldRawVideos(deviceUUID string) { return // 不超过60条,不清理 } - // 按修改时间排序(旧 → 新) sort.Slice(videoFiles, func(i, j int) bool { fi, _ := os.Stat(videoFiles[i]) fj, _ := os.Stat(videoFiles[j]) return fi.ModTime().Before(fj.ModTime()) }) - // 删除最早的(超过60条的部分) for i := 0; i < len(videoFiles)-60; i++ { if err := os.Remove(videoFiles[i]); err != nil { logger.Logger.Printf("Failed to delete the old original video %s: %v", videoFiles[i], err) diff --git a/video_server/video_upload.go b/video_server/video_upload.go index df760c2..f88fcd7 100644 --- a/video_server/video_upload.go +++ b/video_server/video_upload.go @@ -16,18 +16,16 @@ import ( // UploadToNginx 上传视频到 Nginx 服务器,返回 URL func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string) (string, error) { - // === 1. 打开文件 === + file, err := os.Open(filePath) if err != nil { return "", fmt.Errorf("open file failed: %w", err) } defer file.Close() - // === 2. 创建表单 === body := new(bytes.Buffer) writer := multipart.NewWriter(body) - // --- 上传文件 --- part, err := writer.CreateFormFile("file", filepath.Base(filePath)) if err != nil { return "", fmt.Errorf("create form file failed: %w", err) @@ -36,7 +34,6 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string return "", fmt.Errorf("copy file failed: %w", err) } - // --- 上传 JSON --- eventJSON, err := json.Marshal(eventParams) if err != nil { return "", fmt.Errorf("marshal event failed: %w", err) @@ -52,12 +49,10 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string return "", fmt.Errorf("close writer failed: %w", err) } - // === 3. 打印请求头 === logger.Logger.Printf("[HTTP Request]URL: %s", uploadURL) logger.Logger.Printf("[HTTP Request]Content-Type: %s", writer.FormDataContentType()) logger.Logger.Printf("[HTTP Request]Body 长度: %d bytes", body.Len()) - // === 4. 发送请求 === req, err := http.NewRequest("POST", uploadURL, body) if err != nil { return "", fmt.Errorf("create request failed: %w", err) @@ -71,7 +66,6 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string } defer resp.Body.Close() - // === 5. 打印响应 === respBody, _ := io.ReadAll(resp.Body) logger.Logger.Printf("[nginx response]Status: %d", resp.StatusCode) logger.Logger.Printf("[nginx response]Body: %s", string(respBody)) @@ -80,7 +74,6 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string return "", fmt.Errorf("nginx error %d: %s", resp.StatusCode, string(respBody)) } - // === 6. 解析 video_url === var result struct { Code int `json:"code"` Result json.RawMessage `json:"result"`