添加gpio控制/进程健康管理

This commit is contained in:
gqc 2025-12-03 14:12:38 +08:00
parent 7a9d103943
commit 9b02bd9779
6 changed files with 302 additions and 27 deletions

113
connect/gpio_connect.go Normal file
View File

@ -0,0 +1,113 @@
package connect
import (
"FireLeave_tool/logger"
"encoding/json"
"fmt"
"os"
"strconv"
"time"
)
const BuzzerGPIO = 109
// TriggerBuzzer 触发蜂鸣器响指定秒数
func TriggerBuzzer(seconds int) {
if seconds <= 0 {
seconds = 10
}
fmt.Printf("Buzzer starting to ring for %d seconds...\n", seconds)
gpioPath := fmt.Sprintf("/sys/class/gpio/gpio%d", BuzzerGPIO)
// 1. 强制导出(
os.WriteFile("/sys/class/gpio/export", []byte(strconv.Itoa(BuzzerGPIO)), 0666)
time.Sleep(50 * time.Millisecond)
// 2. 强制设为输出
os.WriteFile(fmt.Sprintf("%s/direction", gpioPath), []byte("out"), 0666)
time.Sleep(10 * time.Millisecond)
// 3. 拉高 → 响
if err := os.WriteFile(fmt.Sprintf("%s/value", gpioPath), []byte("1"), 0666); err != nil {
fmt.Printf("Failed to set GPIO high: %v\n", err)
} else {
fmt.Println("GPIO109 set high → Buzzer ringing!")
}
time.Sleep(time.Duration(seconds) * time.Second)
// 4. 拉低 → 停
if err := os.WriteFile(fmt.Sprintf("%s/value", gpioPath), []byte("0"), 0666); err != nil {
fmt.Printf("Failed to set GPIO low: %v\n", err)
} else {
fmt.Println("GPIO109 set low → Buzzer stopped")
}
}
func HandleBuzzerAlert(message []byte) {
logger.Logger.Printf("Received platform alert push → Raw message: %s", string(message))
type PublishMsg struct {
Topic string `json:"topic"`
Content json.RawMessage `json:"content"` // 保持原始字节
}
var pub PublishMsg
if err := json.Unmarshal(message, &pub); err != nil {
logger.Logger.Printf("Outer parsing failed: %v", err)
go TriggerBuzzer(3)
return
}
var realCmd struct {
Type int `json:"type"`
BuzzerDuration interface{} `json:"buzzer_duration,omitempty"`
}
if err := json.Unmarshal(pub.Content, &realCmd); err == nil {
} else {
var contentStr string
if json.Unmarshal(pub.Content, &contentStr) != nil {
logger.Logger.Printf("Content is neither string nor JSON → Using default 3 seconds")
go TriggerBuzzer(3)
return
}
logger.Logger.Printf("Detected escaped content → Second parsing: %s", contentStr)
if err := json.Unmarshal([]byte(contentStr), &realCmd); err != nil {
logger.Logger.Printf("Second parsing failed: %v → Using default 3 seconds", err)
go TriggerBuzzer(3)
return
}
}
if realCmd.Type != 1 {
logger.Logger.Printf("Not a sound/light alert command (type=%d), ignoring", realCmd.Type)
return
}
duration := 3
switch v := realCmd.BuzzerDuration.(type) {
case float64:
duration = int(v)
case string:
if n, err := strconv.Atoi(v); err == nil {
duration = n
}
case int:
duration = v
case nil:
duration = 3
}
if duration <= 0 || duration > 300 {
duration = 3
}
logger.Logger.Printf("Platform alert command → Local sound/light alarm ringing for %d seconds!!!", duration)
go TriggerBuzzer(duration)
}

View File

@ -15,6 +15,8 @@ type PortManager struct {
allocated map[string]int // DeviceUUID 到推理端口的映射 allocated map[string]int // DeviceUUID 到推理端口的映射
portPool []int // 推理端口池HTTP portPool []int // 推理端口池HTTP
available map[int]bool // 可用端口标记 available map[int]bool // 可用端口标记
basePort int
maxPort int
} }
func NewPortManager() *PortManager { func NewPortManager() *PortManager {
@ -94,3 +96,14 @@ func (pm *PortManager) ReleasePort(deviceUUID string) {
// GlobalPortManager 全局端口管理器 // GlobalPortManager 全局端口管理器
var GlobalPortManager = NewPortManager() var GlobalPortManager = NewPortManager()
func (pm *PortManager) GetPort(deviceUUID string) (int, error) {
pm.mu.Lock()
defer pm.mu.Unlock()
if port, exists := pm.allocated[deviceUUID]; exists {
return port, nil
}
return 0, fmt.Errorf("device %s has no allocated port", deviceUUID)
}

View File

@ -24,6 +24,19 @@ var (
currentServiceID string // New: current service's server_id currentServiceID string // New: current service's server_id
) )
type PublishParams struct {
Topic string `json:"topic"`
Data json.RawMessage `json:"data,omitempty"`
Content json.RawMessage `json:"content,omitempty"`
}
type InferenceHeartbeat struct {
DeviceUUID string `json:"device_uuid"`
PersonCount int `json:"person_count"`
Temperature float64 `json:"temperature,omitempty"`
Timestamp int64 `json:"timestamp"` // 上报时间戳
}
func GetWSClient() *WSClient { func GetWSClient() *WSClient {
wsMutex.RLock() wsMutex.RLock()
defer wsMutex.RUnlock() defer wsMutex.RUnlock()
@ -106,13 +119,6 @@ type SubscribeParams struct {
Topic string `json:"topic"` Topic string `json:"topic"`
} }
type PublishParams struct {
EventType string `json:"event_type,omitempty"`
Body json.RawMessage `json:"body,omitempty"`
Topic string `json:"topic,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
type WSClient struct { type WSClient struct {
Conn *websocket.Conn Conn *websocket.Conn
url string url string
@ -423,6 +429,8 @@ func (c *WSClient) handlePublishMessage(message WSMessage) {
logger.Logger.Printf("Failed to parse publish message parameters: %v", err) logger.Logger.Printf("Failed to parse publish message parameters: %v", err)
return return
} }
logger.Logger.Printf("WebSocket 收到 publish 消息 → Topic: %s | 原始数据: %s",
publishParams.Topic, string(publishParams.Data))
if publishParams.Topic != "" { if publishParams.Topic != "" {
c.mutex.Lock() c.mutex.Lock()

Binary file not shown.

140
main.go
View File

@ -23,6 +23,10 @@ import (
"time" "time"
) )
var (
inferenceLastHeartbeat sync.Map // key: deviceUUID, value: time.Time
)
type deviceAlarmState struct { type deviceAlarmState struct {
InCondition bool InCondition bool
StartTime int64 StartTime int64
@ -241,10 +245,18 @@ func processDevice(ctx context.Context, deviceData connect.DeviceData) {
logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args) logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args)
if err := inferenceCmd.Start(); err != nil { if err := inferenceCmd.Start(); err != nil {
logger.Logger.Printf("The reasoning program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err) logger.Logger.Printf("推理程序启动失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return return
} }
// 关键:保存进程对象到全局 map
inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
logger.Logger.Printf("推理程序已启动PID=%d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
// 初始化心跳
UpdateInferenceHeartbeat(deviceData.DeviceUUID)
// 检查推理程序是否启动成功 // 检查推理程序是否启动成功
time.Sleep(2 * time.Second) time.Sleep(2 * time.Second)
if inferenceCmd.Process == nil { if inferenceCmd.Process == nil {
@ -550,6 +562,13 @@ register:
} else { } else {
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID) logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
} }
// 新增:订阅本地告警 topic
if err := connect.SubscribeTopic("/dhlr/alert", connect.HandleBuzzerAlert); err != nil {
logger.Logger.Printf("订阅告警topic失败 (/dhlr/alert): %v", err)
} else {
logger.Logger.Printf("成功订阅本地告警topic: /dhlr/alert")
}
} }
// GetAllDevices Get all devices (new method) // GetAllDevices Get all devices (new method)
@ -648,7 +667,7 @@ func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, coold
} }
client.SendAsync(eventMsg) client.SendAsync(eventMsg)
logger.Logger.Printf("报警事件已上报 (DeviceUUID=%s)", deviceData.DeviceUUID) logger.Logger.Printf("The alarm incident has been reported (DeviceUUID=%s)", deviceData.DeviceUUID)
// === 4. 上报数据快照 (data) === // === 4. 上报数据快照 (data) ===
dataPayload := map[string]any{ dataPayload := map[string]any{
@ -679,30 +698,87 @@ func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, coold
// === 5. 清理本地文件 === // === 5. 清理本地文件 ===
os.Remove(mergedFile) os.Remove(mergedFile)
video_server.CleanupOldRawVideos(deviceData.DeviceUUID)
video_server.CleanupOldVideos(deviceData.DeviceUUID) video_server.CleanupOldVideos(deviceData.DeviceUUID)
} }
func waitAndRegister() error { func UpdateInferenceHeartbeat(deviceUUID string) {
containerName := os.Getenv("CONTAINER_NAME") inferenceLastHeartbeat.Store(deviceUUID, time.Now())
if containerName == "" { }
containerName = "fireleave-service-default" func ShouldRestartInference(deviceUUID string) bool {
if val, ok := inferenceLastHeartbeat.Load(deviceUUID); ok {
last := val.(time.Time)
return time.Since(last) > 90*time.Second
}
return true // 从未收到过,也要启动
}
func (dm *DeviceManager) RangeDevices(fn func(uuid string, device *connect.DeviceData) bool) {
dm.mu.Lock()
defer dm.mu.Unlock()
for uuid, device := range dm.devices {
// 拷贝一份指针,防止外部修改影响遍历
devCopy := device
if !fn(uuid, devCopy) {
return
}
}
}
// StopInferenceProcess 停止指定设备的推理进程(你现在 defer 中就是这么干的,抽出来统一管理)
func (dm *DeviceManager) StopInferenceProcess(deviceUUID string) {
// 这里你之前没有保存 cmd我们需要一个地方存起来
// 所以我们要加一个 map 保存每个设备的推理进程
if cmd, ok := inferenceProcesses.Load(deviceUUID); ok {
if process, ok2 := cmd.(*exec.Cmd); ok2 && process.Process != nil {
logger.Logger.Printf("正在停止推理进程 (DeviceUUID=%s, PID=%d)", deviceUUID, process.Process.Pid)
process.Process.Kill()
process.Wait() // 等待退出,防止僵尸进程
}
inferenceProcesses.Delete(deviceUUID)
}
}
// StartInferenceProcess 启动推理进程(把你 processDevice 里启动推理的部分抽出来)
func (dm *DeviceManager) StartInferenceProcess(deviceData *connect.DeviceData) {
dm.StopInferenceProcess(deviceData.DeviceUUID)
// 正确:只查询,不分配
port, err := connect.GlobalPortManager.GetPort(deviceData.DeviceUUID)
if err != nil {
logger.Logger.Printf("获取端口失败,无法重启推理程序 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
} }
// 等待 WS 连接成功 detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea)
for i := 0; i < 30; i++ {
client := connect.GetWSClient() inferenceCmd := exec.Command(
if client != nil && client.IsConnected() { "./yolov5",
// 连接成功 → 立即注册 "-s", deviceData.CameraRTSP,
serviceID, err := connect.RegisterService(containerName) "-m", "model",
if err != nil { "-c", deviceData.DeviceUUID,
return fmt.Errorf("服务注册失败: %v", err) "-p", fmt.Sprintf("%d", port), // 用老端口!
"-w", detectAreaStr,
"-r", fmt.Sprintf("%d", deviceData.Confidence),
)
inferenceCmd.Stdout = os.Stdout
inferenceCmd.Stderr = os.Stderr
logger.Logger.Printf("正在重启推理程序 (使用原端口 %d): %v", port, inferenceCmd.Args)
if err := inferenceCmd.Start(); err != nil {
logger.Logger.Printf("推理程序重启失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
} }
logger.Logger.Printf("服务注册成功ServiceID=%s", serviceID)
return nil inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
} logger.Logger.Printf("推理程序已成功重启PID=%d (DeviceUUID=%s, Port=%d)",
time.Sleep(1 * time.Second) inferenceCmd.Process.Pid, deviceData.DeviceUUID, port)
}
return fmt.Errorf("WebSocket 连接超时,无法注册服务") UpdateInferenceHeartbeat(deviceData.DeviceUUID)
} }
var inferenceProcesses sync.Map
func main() { func main() {
// Define command line parameters (remove ws parameter) // Define command line parameters (remove ws parameter)
var rtspURL string var rtspURL string
@ -767,6 +843,30 @@ func main() {
// Add device status monitoring // Add device status monitoring
go monitorDeviceStatus(ctx, deviceDataList) go monitorDeviceStatus(ctx, deviceDataList)
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
GlobalDeviceManager.RangeDevices(func(uuid string, device *connect.DeviceData) bool {
if ShouldRestartInference(uuid) {
logger.Logger.Printf("推理程序心跳超时(>90s准备重启: DeviceUUID=%s", uuid)
GlobalDeviceManager.StopInferenceProcess(uuid)
time.Sleep(2 * time.Second)
GlobalDeviceManager.StartInferenceProcess(device)
UpdateInferenceHeartbeat(uuid) // 防止重复重启
}
return true
})
}
}()
go func() {
ticker := time.NewTicker(5 * time.Minute)
for range ticker.C {
for _, device := range GlobalDeviceManager.GetAllDevices() {
video_server.CleanupOldRawVideos(device.DeviceUUID)
}
}
}()
// Wait for termination signal // Wait for termination signal
waitForShutdown(ctx, cancel, &wg) waitForShutdown(ctx, cancel, &wg)
} }

View File

@ -292,3 +292,44 @@ func CleanupOldVideos(deviceUUID string) {
} }
} }
} }
// CleanupOldRawVideos 清理推理程序生成的原始10s视频保留最近60条
func CleanupOldRawVideos(deviceUUID string) {
rawDir := filepath.Join("/usr/data/camera", deviceUUID)
if _, err := os.Stat(rawDir); os.IsNotExist(err) {
return // 目录不存在,直接返回
}
files, err := os.ReadDir(rawDir)
if err != nil {
logger.Logger.Printf("Failed to read the original video directory %s: %v", rawDir, err)
return
}
var videoFiles []string
for _, file := range files {
if !file.IsDir() && filepath.Ext(file.Name()) == ".mp4" { // 假设是mp4也可以加更多判断
videoFiles = append(videoFiles, filepath.Join(rawDir, file.Name()))
}
}
if len(videoFiles) <= 60 {
return // 不超过60条不清理
}
// 按修改时间排序(旧 → 新)
sort.Slice(videoFiles, func(i, j int) bool {
fi, _ := os.Stat(videoFiles[i])
fj, _ := os.Stat(videoFiles[j])
return fi.ModTime().Before(fj.ModTime())
})
// 删除最早的超过60条的部分
for i := 0; i < len(videoFiles)-60; i++ {
if err := os.Remove(videoFiles[i]); err != nil {
logger.Logger.Printf("Failed to delete the old original video %s: %v", videoFiles[i], err)
} else {
logger.Logger.Printf("Clear the old original videos: %s", videoFiles[i])
}
}
}