858 lines
27 KiB
Go
858 lines
27 KiB
Go
package main
|
||
|
||
import (
|
||
"FireLeave_tool/connect"
|
||
"FireLeave_tool/httpreader"
|
||
"FireLeave_tool/logger"
|
||
"FireLeave_tool/tcpreader"
|
||
"FireLeave_tool/video_server"
|
||
"context"
|
||
"encoding/json"
|
||
"flag"
|
||
"fmt"
|
||
"log"
|
||
"math"
|
||
"net"
|
||
"net/http"
|
||
"os"
|
||
"os/exec"
|
||
"os/signal"
|
||
"path/filepath"
|
||
"sync"
|
||
"syscall"
|
||
"time"
|
||
)
|
||
|
||
type deviceAlarmState struct {
|
||
InCondition bool
|
||
StartTime int64
|
||
AlarmPauseUntil int64
|
||
}
|
||
|
||
// 全局 map:每个设备独立状态
|
||
var deviceStates sync.Map // map[string]*deviceAlarmState
|
||
var mergingAlarms sync.Map
|
||
|
||
// DeviceManager 设备管理器
|
||
type DeviceManager struct {
|
||
mu sync.Mutex
|
||
devices map[string]*connect.DeviceData
|
||
}
|
||
|
||
// NewDeviceManager 初始化设备管理器
|
||
func NewDeviceManager() *DeviceManager {
|
||
return &DeviceManager{
|
||
devices: make(map[string]*connect.DeviceData),
|
||
}
|
||
}
|
||
func getDeviceState(deviceUUID string) *deviceAlarmState {
|
||
if s, ok := deviceStates.Load(deviceUUID); ok {
|
||
return s.(*deviceAlarmState)
|
||
}
|
||
s := &deviceAlarmState{}
|
||
deviceStates.Store(deviceUUID, s)
|
||
return s
|
||
}
|
||
|
||
// AddDevice 添加设备
|
||
func (dm *DeviceManager) AddDevice(deviceData connect.DeviceData) {
|
||
dm.mu.Lock()
|
||
defer dm.mu.Unlock()
|
||
dm.devices[deviceData.DeviceUUID] = &deviceData
|
||
logger.Logger.Printf("add equipments: DeviceUUID=%s", deviceData.DeviceUUID)
|
||
}
|
||
|
||
// StopDevice 停止设备
|
||
func (dm *DeviceManager) StopDevice(deviceUUID string) {
|
||
dm.mu.Lock()
|
||
defer dm.mu.Unlock()
|
||
if _, exists := dm.devices[deviceUUID]; exists {
|
||
delete(dm.devices, deviceUUID)
|
||
logger.Logger.Printf("stop equipments: DeviceUUID=%s", deviceUUID)
|
||
}
|
||
}
|
||
|
||
// GetDevice 获取设备
|
||
func (dm *DeviceManager) GetDevice(deviceUUID string) (*connect.DeviceData, bool) {
|
||
dm.mu.Lock()
|
||
defer dm.mu.Unlock()
|
||
device, exists := dm.devices[deviceUUID]
|
||
return device, exists
|
||
}
|
||
|
||
// GlobalDeviceManager 全局设备管理器
|
||
var GlobalDeviceManager = NewDeviceManager()
|
||
|
||
// 工具函数
|
||
func waitForHTTPService(port int, timeout time.Duration) bool {
|
||
deadline := time.Now().Add(timeout)
|
||
address := fmt.Sprintf("127.0.0.1:%d", port)
|
||
|
||
logger.Logger.Printf("Start waiting for the HTTP service to start (Port=%d, Timeout=%v)", port, timeout)
|
||
|
||
attempt := 0
|
||
for time.Now().Before(deadline) {
|
||
attempt++
|
||
|
||
// 1. 先检查TCP连接
|
||
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
|
||
if err != nil {
|
||
if attempt == 1 || attempt%5 == 0 {
|
||
logger.Logger.Printf("Wait for the TCP connection... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
continue
|
||
}
|
||
conn.Close()
|
||
|
||
// 2. 再检查HTTP服务是否真正就绪
|
||
client := &http.Client{Timeout: 3 * time.Second}
|
||
resp, err := client.Get(fmt.Sprintf("http://%s/health", address))
|
||
if err == nil {
|
||
resp.Body.Close()
|
||
if resp.StatusCode == http.StatusOK {
|
||
logger.Logger.Printf("The HTTP service has started successfully (Port=%d, Number of attempts=%d)", port, attempt)
|
||
return true
|
||
}
|
||
}
|
||
|
||
if attempt == 1 || attempt%5 == 0 {
|
||
logger.Logger.Printf("Wait for the HTTP service to be ready... (Port=%d, Total number of attempts=%d)", port, attempt)
|
||
}
|
||
time.Sleep(1 * time.Second)
|
||
}
|
||
|
||
logger.Logger.Printf("HTTP service startup timeout (Port=%d, Total number of attempts=%d)", port, attempt)
|
||
return false
|
||
}
|
||
|
||
// waitForTCPService 增强版本 - 支持更长的等待时间和更好的错误处理
|
||
func waitForTCPService(port int, timeout time.Duration) bool {
|
||
deadline := time.Now().Add(timeout)
|
||
address := fmt.Sprintf("127.0.0.1:%d", port)
|
||
|
||
logger.Logger.Printf("Start waiting for the TCP service to start (Port=%d, Timeout=%v)", port, timeout)
|
||
|
||
attempt := 0
|
||
lastError := ""
|
||
|
||
for time.Now().Before(deadline) {
|
||
attempt++
|
||
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
|
||
if err == nil {
|
||
conn.Close()
|
||
logger.Logger.Printf("The TCP service has been successfully started (Port=%d, Number of attempts=%d)", port, attempt)
|
||
return true
|
||
}
|
||
|
||
currentError := err.Error()
|
||
if currentError != lastError {
|
||
logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
|
||
lastError = currentError
|
||
} else if attempt%5 == 0 {
|
||
// 每5次尝试记录一次相同的错误
|
||
logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
|
||
}
|
||
|
||
time.Sleep(2 * time.Second) // 增加等待间隔
|
||
}
|
||
|
||
logger.Logger.Printf("TCP service startup timeout (Port=%d, Total number of attempts=%d, Final error: %s)", port, attempt, lastError)
|
||
return false
|
||
}
|
||
|
||
func logProcessInfo(pid int, deviceUUID string) error {
|
||
cmd := exec.Command("ps", "-o", "pid,ppid,user,stat,start_time,cmd", "-p", fmt.Sprintf("%d", pid))
|
||
output, err := cmd.Output()
|
||
if err != nil {
|
||
return fmt.Errorf("Failed to obtain process information: %v", err)
|
||
}
|
||
|
||
logger.Logger.Printf("Detailed information about the reasoning program process (DeviceUUID=%s, PID=%d):\n%s", deviceUUID, pid, string(output))
|
||
return nil
|
||
}
|
||
|
||
func getIntFromNumber(num json.Number, def int) int {
|
||
if i, err := num.Int64(); err == nil {
|
||
return int(i)
|
||
}
|
||
return def
|
||
}
|
||
|
||
// processDevice 处理单个设备
|
||
func processDevice(ctx context.Context, deviceData connect.DeviceData) {
|
||
|
||
logger.Logger.Printf("Start processing the equipment: DeviceUUID=%s", deviceData.DeviceUUID)
|
||
debugDeviceData(deviceData)
|
||
|
||
// 首先将设备添加到设备管理器
|
||
GlobalDeviceManager.AddDevice(deviceData)
|
||
defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID)
|
||
|
||
// 创建带取消的子上下文
|
||
deviceCtx, cancel := context.WithCancel(ctx)
|
||
defer cancel()
|
||
|
||
// 分配端口
|
||
port, err := connect.GlobalPortManager.AllocatePort(deviceData.DeviceUUID)
|
||
if err != nil {
|
||
logger.Logger.Printf(" DeviceUUID=%s Port allocation failed: %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
defer connect.GlobalPortManager.ReleasePort(deviceData.DeviceUUID)
|
||
logger.Logger.Printf("Port allocation successful: DeviceUUID=%s, Port=%d", deviceData.DeviceUUID, port)
|
||
|
||
inferenceServer := httpreader.NewInferenceServer(deviceData.DeviceUUID, port)
|
||
if err := inferenceServer.Start(); err != nil {
|
||
logger.Logger.Printf("Failed to start the inference data receiving server (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
|
||
// 使用带上下文的停止
|
||
defer inferenceServer.StopWithContext(deviceCtx)
|
||
logger.Logger.Printf("Wait for the inference data receiving server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
|
||
|
||
// 等待HTTP服务器启动
|
||
if !waitForHTTPService(port, 10*time.Second) {
|
||
logger.Logger.Printf("The inference data receiving server has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
|
||
return
|
||
}
|
||
|
||
logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
|
||
|
||
// 处理检测区域为推理程序需要的格式
|
||
detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea)
|
||
logger.Logger.Printf("The processed detection area: %s", detectAreaStr)
|
||
|
||
// 启动推理程序
|
||
inferenceCmd := exec.Command(
|
||
"./yolov5",
|
||
"-s", deviceData.CameraRTSP,
|
||
"-m", "model",
|
||
"-c", deviceData.DeviceUUID,
|
||
"-p", fmt.Sprintf("%d", port),
|
||
"-w", detectAreaStr,
|
||
"-r", fmt.Sprintf("%d", deviceData.Confidence),
|
||
)
|
||
inferenceCmd.Stdout = os.Stdout
|
||
inferenceCmd.Stderr = os.Stderr
|
||
//logger.Logger.Printf("推理程序输出仅显示在控制台")
|
||
|
||
logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args)
|
||
|
||
if err := inferenceCmd.Start(); err != nil {
|
||
logger.Logger.Printf("推理程序启动失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
|
||
// 关键:保存进程对象到全局 map
|
||
connect.InferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
|
||
|
||
logger.Logger.Printf("推理程序已启动,PID=%d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
|
||
|
||
// 初始化心跳
|
||
connect.UpdateInferenceHeartbeat(deviceData.DeviceUUID)
|
||
|
||
// 检查推理程序是否启动成功
|
||
time.Sleep(2 * time.Second)
|
||
if inferenceCmd.Process == nil {
|
||
logger.Logger.Printf("The reasoning program process has not been created (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
return
|
||
}
|
||
|
||
if err := inferenceCmd.Process.Signal(syscall.Signal(0)); err != nil {
|
||
logger.Logger.Printf("The reasoning program has exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
|
||
logger.Logger.Printf("The reasoning program process has been started,PID: %d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
|
||
|
||
if err := logProcessInfo(inferenceCmd.Process.Pid, deviceData.DeviceUUID); err != nil {
|
||
logger.Logger.Printf("Failed to record process information: %v", err)
|
||
}
|
||
|
||
defer func() {
|
||
logger.Logger.Printf("Start to stop the reasoning program (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
if inferenceCmd.Process != nil {
|
||
if err := inferenceCmd.Process.Kill(); err != nil {
|
||
logger.Logger.Printf("The termination of the reasoning program failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
} else {
|
||
logger.Logger.Printf("The reasoning program has stopped (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
}
|
||
}
|
||
}()
|
||
|
||
// 启动温度程序
|
||
// 在 processDevice 函数中修改温度程序启动部分
|
||
// 启动温度程序
|
||
tempPort := port + 1000
|
||
tempCmdArgs := []string{
|
||
"-s", deviceData.CameraRTSP,
|
||
"-t", fmt.Sprintf("%d", tempPort),
|
||
}
|
||
|
||
if deviceData.CameraChannel > 0 {
|
||
tempCmdArgs = append(tempCmdArgs, "-c", fmt.Sprintf("%d", deviceData.CameraChannel))
|
||
}
|
||
|
||
tempCmd := exec.Command("./release/run_thermometry.sh", tempCmdArgs...)
|
||
tempCmd.Stdout = os.Stdout
|
||
tempCmd.Stderr = os.Stderr
|
||
logger.Logger.Printf("Start the temperature program command: %v", tempCmd.Args)
|
||
|
||
if err := tempCmd.Start(); err != nil {
|
||
logger.Logger.Printf("The temperature program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
|
||
// 增强进程检查
|
||
time.Sleep(3 * time.Second) // 减少等待时间
|
||
|
||
if tempCmd.Process == nil || tempCmd.Process.Pid == 0 {
|
||
logger.Logger.Printf("The temperature program was not started correctly, and the process PID is 0 (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
return
|
||
}
|
||
|
||
// 检查进程是否真的在运行
|
||
if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil {
|
||
logger.Logger.Printf("The temperature program process check failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
|
||
logger.Logger.Printf("The temperature program has been successfully started,PID: %d (DeviceUUID=%s)", tempCmd.Process.Pid, deviceData.DeviceUUID)
|
||
|
||
// 增加TCP服务启动前的额外检查
|
||
logger.Logger.Printf("Wait for the temperature program to be initialized... (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
time.Sleep(2 * time.Second)
|
||
|
||
// 等待 TCP 服务器就绪
|
||
logger.Logger.Printf("Wait for the temperature program TCP server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
|
||
if !waitForTCPService(tempPort, 20*time.Second) { // 增加超时时间
|
||
logger.Logger.Printf("The TCP server of the temperature program has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
|
||
|
||
// 检查温度程序是否还在运行
|
||
if tempCmd.Process != nil {
|
||
if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil {
|
||
logger.Logger.Printf("The temperature program has abnormally exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
} else {
|
||
logger.Logger.Printf("The temperature program is still running but the TCP service has not started. There might be a configuration issue(DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
}
|
||
}
|
||
return
|
||
}
|
||
|
||
// 创建 TCP 客户端
|
||
tempClient, err := tcpreader.NewTCPClient(deviceData.DeviceUUID, tempPort)
|
||
if err != nil {
|
||
logger.Logger.Printf("Failed to create a TCP client (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
defer tempClient.Close()
|
||
|
||
logger.Logger.Printf("The TCP client has been successfully created (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
|
||
|
||
// 确保上传目录存在
|
||
if err := os.MkdirAll("/data/upload", 0755); err != nil {
|
||
logger.Logger.Printf("Failed to create the upload directory: %v", err)
|
||
return
|
||
}
|
||
logger.Logger.Printf("The directory upload has been successfully created: /data/upload")
|
||
|
||
// 主循环:从通道接收推理数据
|
||
logger.Logger.Printf("Start monitoring the equipment (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
|
||
//var lastPersonCount = -1
|
||
//var lastTemperature float64 = -1
|
||
monitorTicker := time.NewTicker(2 * time.Second)
|
||
defer monitorTicker.Stop()
|
||
|
||
for {
|
||
select {
|
||
case <-deviceCtx.Done():
|
||
logger.Logger.Printf("Equipment processing has been cancelled: DeviceUUID=%s", deviceData.DeviceUUID)
|
||
// 强制停止所有子进程
|
||
if inferenceCmd.Process != nil {
|
||
inferenceCmd.Process.Kill()
|
||
}
|
||
if tempCmd.Process != nil {
|
||
tempCmd.Process.Kill()
|
||
}
|
||
tempClient.Close()
|
||
return
|
||
|
||
case inferenceResult, ok := <-inferenceServer.GetDataChan():
|
||
if !ok {
|
||
logger.Logger.Printf("The inference data channel is closed: DeviceUUID=%s", deviceData.DeviceUUID)
|
||
return
|
||
}
|
||
currentPersonCount := 0
|
||
for _, param := range inferenceResult.Params {
|
||
if param.ClassIdx == 1 {
|
||
currentPersonCount = param.Number
|
||
break
|
||
}
|
||
}
|
||
|
||
if currentPersonCount != deviceData.PersonCount {
|
||
deviceData.PersonCount = currentPersonCount
|
||
connect.UpdateDeviceData(deviceData)
|
||
logger.Logger.Printf("Personnel quantity update: %d (DeviceUUID=%s)", currentPersonCount, deviceData.DeviceUUID)
|
||
}
|
||
|
||
case <-monitorTicker.C:
|
||
now := time.Now().Unix()
|
||
|
||
// === 获取当前设备独立状态 ===
|
||
state := getDeviceState(deviceData.DeviceUUID)
|
||
|
||
// === 读取温度(防抖)===
|
||
tempResult, err := tempClient.ReadTemperatureResult()
|
||
if err == nil {
|
||
if math.Abs(tempResult.Tmp-deviceData.Temperature) >= 0.5 {
|
||
oldTemp := deviceData.Temperature
|
||
deviceData.Temperature = tempResult.Tmp
|
||
connect.UpdateDeviceData(deviceData)
|
||
GlobalDeviceManager.AddDevice(deviceData)
|
||
logger.Logger.Printf("Temperature update: %.1f°C → %.1f°C (DeviceUUID=%s)",
|
||
oldTemp, tempResult.Tmp, deviceData.DeviceUUID)
|
||
}
|
||
}
|
||
|
||
// 获取参数
|
||
detectionSec := getIntFromNumber(deviceData.DetectionTime, 20)
|
||
alarmCooldown := getIntFromNumber(deviceData.AlarmTime, 300)
|
||
|
||
// === 防抖:无人 + 温度稳定超阈值 ===
|
||
noPerson := deviceData.PersonCount == 0
|
||
|
||
// 温度防抖
|
||
var tempStableHigh bool
|
||
threshold := deviceData.TemperatureThreshold
|
||
if deviceData.Temperature > threshold+0.5 {
|
||
tempStableHigh = true
|
||
} else if deviceData.Temperature < threshold-0.5 {
|
||
tempStableHigh = false
|
||
} else {
|
||
tempStableHigh = deviceData.Temperature > threshold
|
||
}
|
||
|
||
conditionMet := noPerson && tempStableHigh
|
||
|
||
// 1. 冷却期检查
|
||
if now < state.AlarmPauseUntil {
|
||
if state.InCondition {
|
||
logger.Logger.Printf("Alarm in cooldown: %ds remaining (DeviceUUID=%s)",
|
||
state.AlarmPauseUntil-now, deviceData.DeviceUUID)
|
||
state.InCondition = false
|
||
}
|
||
continue
|
||
}
|
||
|
||
// 2. 条件满足
|
||
if conditionMet {
|
||
if !state.InCondition {
|
||
state.StartTime = now
|
||
state.InCondition = true
|
||
logger.Logger.Printf("Alarm condition met, start timing (%ds required): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)",
|
||
detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID)
|
||
} else if now-state.StartTime >= int64(detectionSec) {
|
||
logger.Logger.Printf("ALARM TRIGGERED (after %ds): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)",
|
||
detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID)
|
||
|
||
go triggerAlarmWithDelay(deviceData, state.StartTime, alarmCooldown)
|
||
|
||
state.AlarmPauseUntil = now + int64(alarmCooldown)
|
||
state.InCondition = false
|
||
|
||
deviceDataCopy := deviceData
|
||
deviceDataCopy.AlarmPauseUntil = state.AlarmPauseUntil
|
||
connect.UpdateDeviceData(deviceDataCopy)
|
||
}
|
||
} else {
|
||
if state.InCondition {
|
||
logger.Logger.Printf("Alarm condition interrupted: lasted %ds < %ds (DeviceUUID=%s)",
|
||
now-state.StartTime, detectionSec, deviceData.DeviceUUID)
|
||
state.InCondition = false
|
||
}
|
||
}
|
||
|
||
}
|
||
}
|
||
}
|
||
|
||
func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup) {
|
||
stop := make(chan os.Signal, 1)
|
||
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
|
||
|
||
select {
|
||
case <-stop:
|
||
logger.Logger.Printf("Upon receiving the stop signal, start cleaning...")
|
||
|
||
cancel()
|
||
done := make(chan struct{})
|
||
go func() {
|
||
wg.Wait()
|
||
close(done)
|
||
}()
|
||
|
||
select {
|
||
case <-done:
|
||
logger.Logger.Printf("All goroutines exited normally")
|
||
case <-time.After(10 * time.Second):
|
||
logger.Logger.Printf("When the waiting time exceeds the limit, force exit")
|
||
// Log status before force exit
|
||
activeDevices := GlobalDeviceManager.GetAllDevices()
|
||
logger.Logger.Printf("It still exists when forced to exit %d An active device", len(activeDevices))
|
||
for uuid := range activeDevices {
|
||
logger.Logger.Printf("Forcibly stop the equipment: UUID=%s", uuid)
|
||
}
|
||
}
|
||
|
||
case <-ctx.Done():
|
||
logger.Logger.Printf("The context has been cancelled.")
|
||
}
|
||
|
||
logger.Logger.Printf("The program exits successfully.")
|
||
os.Exit(0)
|
||
}
|
||
|
||
func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) {
|
||
logger.Logger.Printf("Start the initialization of asynchronous WebSocket...")
|
||
|
||
if err := connect.InitWSChannel(); err != nil {
|
||
logger.Logger.Printf("Failed to initialize WebSocket channel: %v", err)
|
||
return
|
||
}
|
||
|
||
logger.Logger.Printf("Waiting for WebSocket connection to be established...")
|
||
containerName := "fireleave-container"
|
||
|
||
for i := 0; i < 30; i++ {
|
||
select {
|
||
case <-ctx.Done():
|
||
logger.Logger.Printf("Context cancelled, aborting WebSocket registration")
|
||
return
|
||
case <-time.After(1 * time.Second):
|
||
client := connect.GetWSClient()
|
||
if client != nil && client.IsConnected() {
|
||
|
||
goto register
|
||
}
|
||
}
|
||
}
|
||
|
||
logger.Logger.Printf("WebSocket connection timeout after 30 seconds")
|
||
return
|
||
|
||
register:
|
||
|
||
serviceID, err := connect.RegisterService(containerName)
|
||
if err != nil {
|
||
logger.Logger.Printf("Service registration failed: %v", err)
|
||
} else {
|
||
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
|
||
}
|
||
|
||
// 新增:订阅本地告警 topic
|
||
if err := connect.SubscribeTopic("/dhlr/alert", connect.HandleBuzzerAlert); err != nil {
|
||
logger.Logger.Printf("订阅告警topic失败 (/dhlr/alert): %v", err)
|
||
} else {
|
||
logger.Logger.Printf("成功订阅本地告警topic: /dhlr/alert")
|
||
}
|
||
}
|
||
|
||
func (dm *DeviceManager) GetAllDevices() map[string]*connect.DeviceData {
|
||
dm.mu.Lock()
|
||
defer dm.mu.Unlock()
|
||
|
||
result := make(map[string]*connect.DeviceData)
|
||
for k, v := range dm.devices {
|
||
result[k] = v
|
||
}
|
||
return result
|
||
}
|
||
func jsonString(v any) string {
|
||
b, err := json.Marshal(v)
|
||
if err != nil {
|
||
logger.Logger.Printf("JSON编码失败: %v", err)
|
||
return ""
|
||
}
|
||
return string(b)
|
||
}
|
||
|
||
// main.go → triggerAlarmWithDelay
|
||
func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, cooldown int) {
|
||
key := fmt.Sprintf("%s_%d", deviceData.DeviceUUID, alarmTime)
|
||
if _, loaded := mergingAlarms.LoadOrStore(key, true); loaded {
|
||
return
|
||
}
|
||
defer mergingAlarms.Delete(key)
|
||
|
||
// === 1. 合并视频 ===
|
||
mergedFile, err := video_server.MergeVideo(deviceData.DeviceUUID, alarmTime)
|
||
if err != nil {
|
||
logger.Logger.Printf("视频合并失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||
return
|
||
}
|
||
|
||
// === 2. 上传 Nginx ===
|
||
var hostUUID string
|
||
configs, err := connect.LoadServiceConfig()
|
||
if err == nil {
|
||
for _, cfg := range configs {
|
||
if cfg.DeviceUUID == deviceData.DeviceUUID {
|
||
hostUUID = cfg.HostUUID
|
||
break
|
||
}
|
||
}
|
||
}
|
||
|
||
eventParamsForNginx := map[string]any{
|
||
"alarm_time": alarmTime,
|
||
"description": "无人工作区且温度超阈值报警",
|
||
"device_UUID": deviceData.DeviceUUID,
|
||
"host_uuid": hostUUID,
|
||
"person_count": deviceData.PersonCount,
|
||
"temperature": deviceData.Temperature,
|
||
}
|
||
|
||
videoURL, err := video_server.UploadToNginx(mergedFile, eventParamsForNginx, deviceData.UploadURL)
|
||
videoURLStr := ""
|
||
if err != nil {
|
||
logger.Logger.Printf("Nginx上传失败: %v", err)
|
||
} else {
|
||
videoURLStr = videoURL
|
||
logger.Logger.Printf("Nginx上传成功: %s", videoURL)
|
||
}
|
||
|
||
client := connect.GetWSClient()
|
||
if client == nil || !client.IsConnected() {
|
||
logger.Logger.Printf("WS未就绪,延迟上报 (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
os.Remove(mergedFile)
|
||
return
|
||
}
|
||
|
||
// === 3. 上报事件 (event) ===
|
||
eventPayload := map[string]any{
|
||
"type": "event",
|
||
"args": map[string]any{
|
||
"level": "alarm",
|
||
"description": "无人工作区且温度超阈值报警",
|
||
"device_uuid": deviceData.DeviceUUID,
|
||
"host_uuid": hostUUID,
|
||
"video_url": videoURLStr,
|
||
"alarm_time": alarmTime,
|
||
"address": deviceData.Address,
|
||
},
|
||
}
|
||
|
||
eventMsg := map[string]any{
|
||
"method": "metric_data",
|
||
"params": map[string]any{
|
||
"route_key": fmt.Sprintf("/dhlr/device/%s/event", deviceData.DeviceUUID),
|
||
"metric": jsonString(eventPayload),
|
||
},
|
||
}
|
||
|
||
client.SendAsync(eventMsg)
|
||
logger.Logger.Printf("The alarm incident has been reported (DeviceUUID=%s)", deviceData.DeviceUUID)
|
||
|
||
// === 4. 上报数据快照 (data) ===
|
||
dataPayload := map[string]any{
|
||
"type": "data",
|
||
"args": map[string]any{
|
||
"person_count": deviceData.PersonCount,
|
||
"temperature": deviceData.Temperature,
|
||
"camera_rtsp": deviceData.CameraRTSP,
|
||
"task_id": deviceData.TaskID,
|
||
"camera_ip": deviceData.CameraIP,
|
||
"confidence": deviceData.Confidence,
|
||
"alarm_time": alarmTime,
|
||
"address": deviceData.Address,
|
||
},
|
||
}
|
||
|
||
dataMsg := map[string]any{
|
||
"method": "metric_data",
|
||
"params": map[string]any{
|
||
"route_key": fmt.Sprintf("/dhlr/device/%s/data", deviceData.DeviceUUID),
|
||
"metric": jsonString(dataPayload),
|
||
},
|
||
}
|
||
|
||
client.SendAsync(dataMsg)
|
||
logger.Logger.Printf("指标快照已上报 (DeviceUUID=%s, Temp=%.1f°C, 人:%d)",
|
||
deviceData.DeviceUUID, deviceData.Temperature, deviceData.PersonCount)
|
||
|
||
// === 5. 清理本地文件 ===
|
||
os.Remove(mergedFile)
|
||
video_server.CleanupOldRawVideos(deviceData.DeviceUUID)
|
||
video_server.CleanupOldVideos(deviceData.DeviceUUID)
|
||
}
|
||
func (dm *DeviceManager) RangeDevices(fn func(uuid string, device *connect.DeviceData) bool) {
|
||
dm.mu.Lock()
|
||
defer dm.mu.Unlock()
|
||
for uuid, device := range dm.devices {
|
||
|
||
if !fn(uuid, device) {
|
||
return
|
||
}
|
||
}
|
||
}
|
||
func main() {
|
||
|
||
var rtspURL string
|
||
var deviceUUID string
|
||
flag.StringVar(&rtspURL, "rtsp", "", "RTSP stream URL for testing")
|
||
flag.StringVar(&deviceUUID, "uuid", "", "Device UUID for testing")
|
||
flag.Parse()
|
||
|
||
if err := logger.InitLogger(); err != nil {
|
||
log.Fatalf("Logger initialization failed: %v", err)
|
||
}
|
||
defer logger.CloseLogger()
|
||
|
||
go func() {
|
||
for {
|
||
logger.CleanupOldLogs()
|
||
time.Sleep(24 * time.Hour)
|
||
}
|
||
}()
|
||
|
||
logger.Logger.Printf("=== Starting FireLeave Tool ===")
|
||
|
||
ctx, cancel := context.WithCancel(context.Background())
|
||
defer cancel()
|
||
var wg sync.WaitGroup
|
||
|
||
logger.Logger.Printf("Step 1: Initialize device data from service config")
|
||
deviceDataList, err := connect.InitializeDevicesFromServiceConfig()
|
||
if err != nil {
|
||
logger.Logger.Printf("Device data initialization failed: %v", err)
|
||
logger.Logger.Printf("Will try to use existing device files...")
|
||
|
||
deviceDataList = connect.GetDeviceDataList()
|
||
}
|
||
|
||
logger.Logger.Printf("Step 2: Starting %d devices", len(deviceDataList))
|
||
|
||
for i, deviceData := range deviceDataList {
|
||
logger.Logger.Printf("Starting device[%d]: UUID=%s", i, deviceData.DeviceUUID)
|
||
wg.Add(1)
|
||
go func(deviceData connect.DeviceData, index int) {
|
||
defer wg.Done()
|
||
defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID)
|
||
logger.Logger.Printf("Device processing goroutine started: UUID=%s, Index=%d", deviceData.DeviceUUID, index)
|
||
processDevice(ctx, deviceData)
|
||
}(deviceData, i)
|
||
}
|
||
|
||
logger.Logger.Printf("Step 3: All devices started")
|
||
|
||
logger.Logger.Printf("Step 4: Asynchronously initialize WebSocket connection")
|
||
go initWebSocketAsync(ctx, &wg)
|
||
|
||
logger.Logger.Printf("=== Main program startup completed ===")
|
||
logger.Logger.Printf("System status: Running devices=%d", len(deviceDataList))
|
||
|
||
// Add device status monitoring
|
||
go monitorDeviceStatus(ctx, deviceDataList)
|
||
|
||
go func() {
|
||
ticker := time.NewTicker(30 * time.Second)
|
||
defer ticker.Stop()
|
||
for range ticker.C {
|
||
GlobalDeviceManager.RangeDevices(func(uuid string, device *connect.DeviceData) bool {
|
||
if connect.ShouldRestartInference(uuid) {
|
||
logger.Logger.Printf("推理程序心跳超时(>90s),准备重启: DeviceUUID=%s", uuid)
|
||
connect.StopInferenceProcess(uuid)
|
||
time.Sleep(6 * time.Second)
|
||
connect.StartInferenceProcess(device)
|
||
connect.UpdateInferenceHeartbeat(uuid)
|
||
}
|
||
return true
|
||
})
|
||
}
|
||
}()
|
||
go func() {
|
||
ticker := time.NewTicker(5 * time.Minute)
|
||
for range ticker.C {
|
||
for _, device := range GlobalDeviceManager.GetAllDevices() {
|
||
video_server.CleanupOldRawVideos(device.DeviceUUID)
|
||
}
|
||
}
|
||
}()
|
||
|
||
waitForShutdown(ctx, cancel, &wg)
|
||
}
|
||
func printGlobalDeviceStatus() {
|
||
now := time.Now().Unix()
|
||
activeDevices := GlobalDeviceManager.GetAllDevices()
|
||
|
||
if len(activeDevices) == 0 {
|
||
logger.Logger.Printf("=== Global Device Status monitoring === No running devices for the time being")
|
||
return
|
||
}
|
||
|
||
logger.Logger.Printf("=== Global Device Status monitoring === Currently running %d", len(activeDevices))
|
||
|
||
i := 1
|
||
for uuid, data := range activeDevices {
|
||
|
||
stateInf, _ := deviceStates.Load(uuid)
|
||
state := stateInf.(*deviceAlarmState)
|
||
cooldownLeft := int64(0)
|
||
if now < state.AlarmPauseUntil {
|
||
cooldownLeft = state.AlarmPauseUntil - now
|
||
}
|
||
|
||
logger.Logger.Printf(" [%d] DeviceUUID=%s | person:%d | temp:%.1f°C | threshold value:%.1f°C | cooling time:%ds",
|
||
i,
|
||
uuid,
|
||
data.PersonCount,
|
||
data.Temperature,
|
||
data.TemperatureThreshold,
|
||
cooldownLeft,
|
||
)
|
||
i++
|
||
}
|
||
}
|
||
func monitorDeviceStatus(ctx context.Context, initialDevices []connect.DeviceData) {
|
||
ticker := time.NewTicker(30 * time.Second)
|
||
defer ticker.Stop()
|
||
|
||
logger.Logger.Printf("Start global device status monitoring (print once every 30 seconds)")
|
||
|
||
for {
|
||
select {
|
||
case <-ticker.C:
|
||
printGlobalDeviceStatus()
|
||
if files, err := filepath.Glob("/data/devices/device_*.json"); err == nil {
|
||
logger.Logger.Printf("Device file status: A total of %d device files", len(files))
|
||
for i, file := range files {
|
||
if i < 3 {
|
||
logger.Logger.Printf(" device file[%d]: %s", i, filepath.Base(file))
|
||
}
|
||
}
|
||
}
|
||
|
||
logger.Logger.Printf("Operating equipment %d ", len(GlobalDeviceManager.GetAllDevices()))
|
||
|
||
case <-ctx.Done():
|
||
logger.Logger.Printf("Global device status monitoring has stopped")
|
||
return
|
||
}
|
||
}
|
||
}
|
||
|
||
func debugDeviceData(deviceData connect.DeviceData) {
|
||
logger.Logger.Printf("Debug device data: UUID=%s", deviceData.DeviceUUID)
|
||
logger.Logger.Printf(" TaskID: %q", deviceData.TaskID)
|
||
logger.Logger.Printf(" CameraRTSP: %q", deviceData.CameraRTSP)
|
||
logger.Logger.Printf(" CameraIP: %q", deviceData.CameraIP)
|
||
logger.Logger.Printf(" UploadURL: %q", deviceData.UploadURL)
|
||
logger.Logger.Printf(" DetectAreaStr: %q", deviceData.DetectAreaStr)
|
||
logger.Logger.Printf(" PersonCount: %d", deviceData.PersonCount)
|
||
logger.Logger.Printf(" Temperature: %f", deviceData.Temperature)
|
||
}
|