diff --git a/connect/gpio_connect.go b/connect/gpio_connect.go new file mode 100644 index 0000000..d366f27 --- /dev/null +++ b/connect/gpio_connect.go @@ -0,0 +1,113 @@ +package connect + +import ( + "FireLeave_tool/logger" + "encoding/json" + "fmt" + "os" + "strconv" + "time" +) + +const BuzzerGPIO = 109 + +// TriggerBuzzer 触发蜂鸣器响指定秒数 +func TriggerBuzzer(seconds int) { + if seconds <= 0 { + seconds = 10 + } + + fmt.Printf("Buzzer starting to ring for %d seconds...\n", seconds) + + gpioPath := fmt.Sprintf("/sys/class/gpio/gpio%d", BuzzerGPIO) + + // 1. 强制导出( + os.WriteFile("/sys/class/gpio/export", []byte(strconv.Itoa(BuzzerGPIO)), 0666) + time.Sleep(50 * time.Millisecond) + + // 2. 强制设为输出 + os.WriteFile(fmt.Sprintf("%s/direction", gpioPath), []byte("out"), 0666) + time.Sleep(10 * time.Millisecond) + + // 3. 拉高 → 响 + if err := os.WriteFile(fmt.Sprintf("%s/value", gpioPath), []byte("1"), 0666); err != nil { + fmt.Printf("Failed to set GPIO high: %v\n", err) + } else { + fmt.Println("GPIO109 set high → Buzzer ringing!") + } + + time.Sleep(time.Duration(seconds) * time.Second) + + // 4. 拉低 → 停 + if err := os.WriteFile(fmt.Sprintf("%s/value", gpioPath), []byte("0"), 0666); err != nil { + fmt.Printf("Failed to set GPIO low: %v\n", err) + } else { + fmt.Println("GPIO109 set low → Buzzer stopped") + } +} + +func HandleBuzzerAlert(message []byte) { + logger.Logger.Printf("Received platform alert push → Raw message: %s", string(message)) + + type PublishMsg struct { + Topic string `json:"topic"` + Content json.RawMessage `json:"content"` // 保持原始字节 + } + + var pub PublishMsg + if err := json.Unmarshal(message, &pub); err != nil { + logger.Logger.Printf("Outer parsing failed: %v", err) + go TriggerBuzzer(3) + return + } + + var realCmd struct { + Type int `json:"type"` + BuzzerDuration interface{} `json:"buzzer_duration,omitempty"` + } + + if err := json.Unmarshal(pub.Content, &realCmd); err == nil { + + } else { + + var contentStr string + if json.Unmarshal(pub.Content, &contentStr) != nil { + logger.Logger.Printf("Content is neither string nor JSON → Using default 3 seconds") + go TriggerBuzzer(3) + return + } + + logger.Logger.Printf("Detected escaped content → Second parsing: %s", contentStr) + if err := json.Unmarshal([]byte(contentStr), &realCmd); err != nil { + logger.Logger.Printf("Second parsing failed: %v → Using default 3 seconds", err) + go TriggerBuzzer(3) + return + } + } + + if realCmd.Type != 1 { + logger.Logger.Printf("Not a sound/light alert command (type=%d), ignoring", realCmd.Type) + return + } + + duration := 3 + switch v := realCmd.BuzzerDuration.(type) { + case float64: + duration = int(v) + case string: + if n, err := strconv.Atoi(v); err == nil { + duration = n + } + case int: + duration = v + case nil: + duration = 3 + } + + if duration <= 0 || duration > 300 { + duration = 3 + } + + logger.Logger.Printf("Platform alert command → Local sound/light alarm ringing for %d seconds!!!", duration) + go TriggerBuzzer(duration) +} diff --git a/connect/port_manager.go b/connect/port_manager.go index 658d3d2..1316105 100644 --- a/connect/port_manager.go +++ b/connect/port_manager.go @@ -15,6 +15,8 @@ type PortManager struct { allocated map[string]int // DeviceUUID 到推理端口的映射 portPool []int // 推理端口池(HTTP) available map[int]bool // 可用端口标记 + basePort int + maxPort int } func NewPortManager() *PortManager { @@ -94,3 +96,14 @@ func (pm *PortManager) ReleasePort(deviceUUID string) { // GlobalPortManager 全局端口管理器 var GlobalPortManager = NewPortManager() + +func (pm *PortManager) GetPort(deviceUUID string) (int, error) { + pm.mu.Lock() + defer pm.mu.Unlock() + + if port, exists := pm.allocated[deviceUUID]; exists { + return port, nil + } + + return 0, fmt.Errorf("device %s has no allocated port", deviceUUID) +} diff --git a/connect/ws_channel.go b/connect/ws_channel.go index d4dd034..df4ffea 100644 --- a/connect/ws_channel.go +++ b/connect/ws_channel.go @@ -24,6 +24,19 @@ var ( currentServiceID string // New: current service's server_id ) +type PublishParams struct { + Topic string `json:"topic"` + Data json.RawMessage `json:"data,omitempty"` + Content json.RawMessage `json:"content,omitempty"` +} + +type InferenceHeartbeat struct { + DeviceUUID string `json:"device_uuid"` + PersonCount int `json:"person_count"` + Temperature float64 `json:"temperature,omitempty"` + Timestamp int64 `json:"timestamp"` // 上报时间戳 +} + func GetWSClient() *WSClient { wsMutex.RLock() defer wsMutex.RUnlock() @@ -106,13 +119,6 @@ type SubscribeParams struct { Topic string `json:"topic"` } -type PublishParams struct { - EventType string `json:"event_type,omitempty"` - Body json.RawMessage `json:"body,omitempty"` - Topic string `json:"topic,omitempty"` - Data json.RawMessage `json:"data,omitempty"` -} - type WSClient struct { Conn *websocket.Conn url string @@ -423,6 +429,8 @@ func (c *WSClient) handlePublishMessage(message WSMessage) { logger.Logger.Printf("Failed to parse publish message parameters: %v", err) return } + logger.Logger.Printf("WebSocket 收到 publish 消息 → Topic: %s | 原始数据: %s", + publishParams.Topic, string(publishParams.Data)) if publishParams.Topic != "" { c.mutex.Lock() diff --git a/fireleave_tool b/fireleave_tool index 19bd3b5..f7e78b9 100644 Binary files a/fireleave_tool and b/fireleave_tool differ diff --git a/main.go b/main.go index db1220c..811ebf0 100644 --- a/main.go +++ b/main.go @@ -23,6 +23,10 @@ import ( "time" ) +var ( + inferenceLastHeartbeat sync.Map // key: deviceUUID, value: time.Time +) + type deviceAlarmState struct { InCondition bool StartTime int64 @@ -241,10 +245,18 @@ func processDevice(ctx context.Context, deviceData connect.DeviceData) { logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args) if err := inferenceCmd.Start(); err != nil { - logger.Logger.Printf("The reasoning program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + logger.Logger.Printf("推理程序启动失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) return } + // 关键:保存进程对象到全局 map + inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd) + + logger.Logger.Printf("推理程序已启动,PID=%d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID) + + // 初始化心跳 + UpdateInferenceHeartbeat(deviceData.DeviceUUID) + // 检查推理程序是否启动成功 time.Sleep(2 * time.Second) if inferenceCmd.Process == nil { @@ -550,6 +562,13 @@ register: } else { logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID) } + + // 新增:订阅本地告警 topic + if err := connect.SubscribeTopic("/dhlr/alert", connect.HandleBuzzerAlert); err != nil { + logger.Logger.Printf("订阅告警topic失败 (/dhlr/alert): %v", err) + } else { + logger.Logger.Printf("成功订阅本地告警topic: /dhlr/alert") + } } // GetAllDevices Get all devices (new method) @@ -648,7 +667,7 @@ func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, coold } client.SendAsync(eventMsg) - logger.Logger.Printf("报警事件已上报 (DeviceUUID=%s)", deviceData.DeviceUUID) + logger.Logger.Printf("The alarm incident has been reported (DeviceUUID=%s)", deviceData.DeviceUUID) // === 4. 上报数据快照 (data) === dataPayload := map[string]any{ @@ -679,30 +698,87 @@ func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, coold // === 5. 清理本地文件 === os.Remove(mergedFile) + video_server.CleanupOldRawVideos(deviceData.DeviceUUID) video_server.CleanupOldVideos(deviceData.DeviceUUID) } -func waitAndRegister() error { - containerName := os.Getenv("CONTAINER_NAME") - if containerName == "" { - containerName = "fireleave-service-default" +func UpdateInferenceHeartbeat(deviceUUID string) { + inferenceLastHeartbeat.Store(deviceUUID, time.Now()) +} +func ShouldRestartInference(deviceUUID string) bool { + if val, ok := inferenceLastHeartbeat.Load(deviceUUID); ok { + last := val.(time.Time) + return time.Since(last) > 90*time.Second + } + return true // 从未收到过,也要启动 +} +func (dm *DeviceManager) RangeDevices(fn func(uuid string, device *connect.DeviceData) bool) { + dm.mu.Lock() + defer dm.mu.Unlock() + + for uuid, device := range dm.devices { + // 拷贝一份指针,防止外部修改影响遍历 + devCopy := device + if !fn(uuid, devCopy) { + return + } + } +} + +// StopInferenceProcess 停止指定设备的推理进程(你现在 defer 中就是这么干的,抽出来统一管理) +func (dm *DeviceManager) StopInferenceProcess(deviceUUID string) { + // 这里你之前没有保存 cmd,我们需要一个地方存起来! + // 所以我们要加一个 map 保存每个设备的推理进程 + if cmd, ok := inferenceProcesses.Load(deviceUUID); ok { + if process, ok2 := cmd.(*exec.Cmd); ok2 && process.Process != nil { + logger.Logger.Printf("正在停止推理进程 (DeviceUUID=%s, PID=%d)", deviceUUID, process.Process.Pid) + process.Process.Kill() + process.Wait() // 等待退出,防止僵尸进程 + } + inferenceProcesses.Delete(deviceUUID) + } +} + +// StartInferenceProcess 启动推理进程(把你 processDevice 里启动推理的部分抽出来) +func (dm *DeviceManager) StartInferenceProcess(deviceData *connect.DeviceData) { + dm.StopInferenceProcess(deviceData.DeviceUUID) + + // 正确:只查询,不分配 + port, err := connect.GlobalPortManager.GetPort(deviceData.DeviceUUID) + if err != nil { + logger.Logger.Printf("获取端口失败,无法重启推理程序 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return } - // 等待 WS 连接成功 - for i := 0; i < 30; i++ { - client := connect.GetWSClient() - if client != nil && client.IsConnected() { - // 连接成功 → 立即注册 - serviceID, err := connect.RegisterService(containerName) - if err != nil { - return fmt.Errorf("服务注册失败: %v", err) - } - logger.Logger.Printf("服务注册成功!ServiceID=%s", serviceID) - return nil - } - time.Sleep(1 * time.Second) + detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea) + + inferenceCmd := exec.Command( + "./yolov5", + "-s", deviceData.CameraRTSP, + "-m", "model", + "-c", deviceData.DeviceUUID, + "-p", fmt.Sprintf("%d", port), // 用老端口! + "-w", detectAreaStr, + "-r", fmt.Sprintf("%d", deviceData.Confidence), + ) + inferenceCmd.Stdout = os.Stdout + inferenceCmd.Stderr = os.Stderr + + logger.Logger.Printf("正在重启推理程序 (使用原端口 %d): %v", port, inferenceCmd.Args) + + if err := inferenceCmd.Start(); err != nil { + logger.Logger.Printf("推理程序重启失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) + return } - return fmt.Errorf("WebSocket 连接超时,无法注册服务") + + inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd) + logger.Logger.Printf("推理程序已成功重启,PID=%d (DeviceUUID=%s, Port=%d)", + inferenceCmd.Process.Pid, deviceData.DeviceUUID, port) + + UpdateInferenceHeartbeat(deviceData.DeviceUUID) } + +var inferenceProcesses sync.Map + func main() { // Define command line parameters (remove ws parameter) var rtspURL string @@ -767,6 +843,30 @@ func main() { // Add device status monitoring go monitorDeviceStatus(ctx, deviceDataList) + go func() { + ticker := time.NewTicker(30 * time.Second) + defer ticker.Stop() + for range ticker.C { + GlobalDeviceManager.RangeDevices(func(uuid string, device *connect.DeviceData) bool { + if ShouldRestartInference(uuid) { + logger.Logger.Printf("推理程序心跳超时(>90s),准备重启: DeviceUUID=%s", uuid) + GlobalDeviceManager.StopInferenceProcess(uuid) + time.Sleep(2 * time.Second) + GlobalDeviceManager.StartInferenceProcess(device) + UpdateInferenceHeartbeat(uuid) // 防止重复重启 + } + return true + }) + } + }() + go func() { + ticker := time.NewTicker(5 * time.Minute) + for range ticker.C { + for _, device := range GlobalDeviceManager.GetAllDevices() { + video_server.CleanupOldRawVideos(device.DeviceUUID) + } + } + }() // Wait for termination signal waitForShutdown(ctx, cancel, &wg) } diff --git a/video_server/video_merge.go b/video_server/video_merge.go index 5051f90..e17a00e 100644 --- a/video_server/video_merge.go +++ b/video_server/video_merge.go @@ -292,3 +292,44 @@ func CleanupOldVideos(deviceUUID string) { } } } + +// CleanupOldRawVideos 清理推理程序生成的原始10s视频,保留最近60条 +func CleanupOldRawVideos(deviceUUID string) { + rawDir := filepath.Join("/usr/data/camera", deviceUUID) + if _, err := os.Stat(rawDir); os.IsNotExist(err) { + return // 目录不存在,直接返回 + } + + files, err := os.ReadDir(rawDir) + if err != nil { + logger.Logger.Printf("Failed to read the original video directory %s: %v", rawDir, err) + return + } + + var videoFiles []string + for _, file := range files { + if !file.IsDir() && filepath.Ext(file.Name()) == ".mp4" { // 假设是mp4,也可以加更多判断 + videoFiles = append(videoFiles, filepath.Join(rawDir, file.Name())) + } + } + + if len(videoFiles) <= 60 { + return // 不超过60条,不清理 + } + + // 按修改时间排序(旧 → 新) + sort.Slice(videoFiles, func(i, j int) bool { + fi, _ := os.Stat(videoFiles[i]) + fj, _ := os.Stat(videoFiles[j]) + return fi.ModTime().Before(fj.ModTime()) + }) + + // 删除最早的(超过60条的部分) + for i := 0; i < len(videoFiles)-60; i++ { + if err := os.Remove(videoFiles[i]); err != nil { + logger.Logger.Printf("Failed to delete the old original video %s: %v", videoFiles[i], err) + } else { + logger.Logger.Printf("Clear the old original videos: %s", videoFiles[i]) + } + } +}