2025-12-10 14:34:57 +08:00

858 lines
27 KiB
Go
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package main
import (
"FireLeave_tool/connect"
"FireLeave_tool/httpreader"
"FireLeave_tool/logger"
"FireLeave_tool/tcpreader"
"FireLeave_tool/video_server"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"math"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
)
type deviceAlarmState struct {
InCondition bool
StartTime int64
AlarmPauseUntil int64
}
// 全局 map每个设备独立状态
var deviceStates sync.Map // map[string]*deviceAlarmState
var mergingAlarms sync.Map
// DeviceManager 设备管理器
type DeviceManager struct {
mu sync.Mutex
devices map[string]*connect.DeviceData
}
// NewDeviceManager 初始化设备管理器
func NewDeviceManager() *DeviceManager {
return &DeviceManager{
devices: make(map[string]*connect.DeviceData),
}
}
func getDeviceState(deviceUUID string) *deviceAlarmState {
if s, ok := deviceStates.Load(deviceUUID); ok {
return s.(*deviceAlarmState)
}
s := &deviceAlarmState{}
deviceStates.Store(deviceUUID, s)
return s
}
// AddDevice 添加设备
func (dm *DeviceManager) AddDevice(deviceData connect.DeviceData) {
dm.mu.Lock()
defer dm.mu.Unlock()
dm.devices[deviceData.DeviceUUID] = &deviceData
logger.Logger.Printf("add equipments: DeviceUUID=%s", deviceData.DeviceUUID)
}
// StopDevice 停止设备
func (dm *DeviceManager) StopDevice(deviceUUID string) {
dm.mu.Lock()
defer dm.mu.Unlock()
if _, exists := dm.devices[deviceUUID]; exists {
delete(dm.devices, deviceUUID)
logger.Logger.Printf("stop equipments: DeviceUUID=%s", deviceUUID)
}
}
// GetDevice 获取设备
func (dm *DeviceManager) GetDevice(deviceUUID string) (*connect.DeviceData, bool) {
dm.mu.Lock()
defer dm.mu.Unlock()
device, exists := dm.devices[deviceUUID]
return device, exists
}
// GlobalDeviceManager 全局设备管理器
var GlobalDeviceManager = NewDeviceManager()
// 工具函数
func waitForHTTPService(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
address := fmt.Sprintf("127.0.0.1:%d", port)
logger.Logger.Printf("Start waiting for the HTTP service to start (Port=%d, Timeout=%v)", port, timeout)
attempt := 0
for time.Now().Before(deadline) {
attempt++
// 1. 先检查TCP连接
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
if err != nil {
if attempt == 1 || attempt%5 == 0 {
logger.Logger.Printf("Wait for the TCP connection... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
}
time.Sleep(1 * time.Second)
continue
}
conn.Close()
// 2. 再检查HTTP服务是否真正就绪
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(fmt.Sprintf("http://%s/health", address))
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
logger.Logger.Printf("The HTTP service has started successfully (Port=%d, Number of attempts=%d)", port, attempt)
return true
}
}
if attempt == 1 || attempt%5 == 0 {
logger.Logger.Printf("Wait for the HTTP service to be ready... (Port=%d, Total number of attempts=%d)", port, attempt)
}
time.Sleep(1 * time.Second)
}
logger.Logger.Printf("HTTP service startup timeout (Port=%d, Total number of attempts=%d)", port, attempt)
return false
}
// waitForTCPService 增强版本 - 支持更长的等待时间和更好的错误处理
func waitForTCPService(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
address := fmt.Sprintf("127.0.0.1:%d", port)
logger.Logger.Printf("Start waiting for the TCP service to start (Port=%d, Timeout=%v)", port, timeout)
attempt := 0
lastError := ""
for time.Now().Before(deadline) {
attempt++
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
if err == nil {
conn.Close()
logger.Logger.Printf("The TCP service has been successfully started (Port=%d, Number of attempts=%d)", port, attempt)
return true
}
currentError := err.Error()
if currentError != lastError {
logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
lastError = currentError
} else if attempt%5 == 0 {
// 每5次尝试记录一次相同的错误
logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
}
time.Sleep(2 * time.Second) // 增加等待间隔
}
logger.Logger.Printf("TCP service startup timeout (Port=%d, Total number of attempts=%d, Final error: %s)", port, attempt, lastError)
return false
}
func logProcessInfo(pid int, deviceUUID string) error {
cmd := exec.Command("ps", "-o", "pid,ppid,user,stat,start_time,cmd", "-p", fmt.Sprintf("%d", pid))
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("Failed to obtain process information: %v", err)
}
logger.Logger.Printf("Detailed information about the reasoning program process (DeviceUUID=%s, PID=%d):\n%s", deviceUUID, pid, string(output))
return nil
}
func getIntFromNumber(num json.Number, def int) int {
if i, err := num.Int64(); err == nil {
return int(i)
}
return def
}
// processDevice 处理单个设备
func processDevice(ctx context.Context, deviceData connect.DeviceData) {
logger.Logger.Printf("Start processing the equipment: DeviceUUID=%s", deviceData.DeviceUUID)
debugDeviceData(deviceData)
// 首先将设备添加到设备管理器
GlobalDeviceManager.AddDevice(deviceData)
defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID)
// 创建带取消的子上下文
deviceCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 分配端口
port, err := connect.GlobalPortManager.AllocatePort(deviceData.DeviceUUID)
if err != nil {
logger.Logger.Printf(" DeviceUUID=%s Port allocation failed: %v", deviceData.DeviceUUID, err)
return
}
defer connect.GlobalPortManager.ReleasePort(deviceData.DeviceUUID)
logger.Logger.Printf("Port allocation successful: DeviceUUID=%s, Port=%d", deviceData.DeviceUUID, port)
inferenceServer := httpreader.NewInferenceServer(deviceData.DeviceUUID, port)
if err := inferenceServer.Start(); err != nil {
logger.Logger.Printf("Failed to start the inference data receiving server (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// 使用带上下文的停止
defer inferenceServer.StopWithContext(deviceCtx)
logger.Logger.Printf("Wait for the inference data receiving server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
// 等待HTTP服务器启动
if !waitForHTTPService(port, 10*time.Second) {
logger.Logger.Printf("The inference data receiving server has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
return
}
logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
// 处理检测区域为推理程序需要的格式
detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea)
logger.Logger.Printf("The processed detection area: %s", detectAreaStr)
// 启动推理程序
inferenceCmd := exec.Command(
"./yolov5",
"-s", deviceData.CameraRTSP,
"-m", "model",
"-c", deviceData.DeviceUUID,
"-p", fmt.Sprintf("%d", port),
"-w", detectAreaStr,
"-r", fmt.Sprintf("%d", deviceData.Confidence),
)
inferenceCmd.Stdout = os.Stdout
inferenceCmd.Stderr = os.Stderr
//logger.Logger.Printf("推理程序输出仅显示在控制台")
logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args)
if err := inferenceCmd.Start(); err != nil {
logger.Logger.Printf("推理程序启动失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// 关键:保存进程对象到全局 map
connect.InferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
logger.Logger.Printf("推理程序已启动PID=%d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
// 初始化心跳
connect.UpdateInferenceHeartbeat(deviceData.DeviceUUID)
// 检查推理程序是否启动成功
time.Sleep(2 * time.Second)
if inferenceCmd.Process == nil {
logger.Logger.Printf("The reasoning program process has not been created (DeviceUUID=%s)", deviceData.DeviceUUID)
return
}
if err := inferenceCmd.Process.Signal(syscall.Signal(0)); err != nil {
logger.Logger.Printf("The reasoning program has exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
logger.Logger.Printf("The reasoning program process has been startedPID: %d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
if err := logProcessInfo(inferenceCmd.Process.Pid, deviceData.DeviceUUID); err != nil {
logger.Logger.Printf("Failed to record process information: %v", err)
}
defer func() {
logger.Logger.Printf("Start to stop the reasoning program (DeviceUUID=%s)", deviceData.DeviceUUID)
if inferenceCmd.Process != nil {
if err := inferenceCmd.Process.Kill(); err != nil {
logger.Logger.Printf("The termination of the reasoning program failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
} else {
logger.Logger.Printf("The reasoning program has stopped (DeviceUUID=%s)", deviceData.DeviceUUID)
}
}
}()
// 启动温度程序
// 在 processDevice 函数中修改温度程序启动部分
// 启动温度程序
tempPort := port + 1000
tempCmdArgs := []string{
"-s", deviceData.CameraRTSP,
"-t", fmt.Sprintf("%d", tempPort),
}
if deviceData.CameraChannel > 0 {
tempCmdArgs = append(tempCmdArgs, "-c", fmt.Sprintf("%d", deviceData.CameraChannel))
}
tempCmd := exec.Command("./release/run_thermometry.sh", tempCmdArgs...)
tempCmd.Stdout = os.Stdout
tempCmd.Stderr = os.Stderr
logger.Logger.Printf("Start the temperature program command: %v", tempCmd.Args)
if err := tempCmd.Start(); err != nil {
logger.Logger.Printf("The temperature program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// 增强进程检查
time.Sleep(3 * time.Second) // 减少等待时间
if tempCmd.Process == nil || tempCmd.Process.Pid == 0 {
logger.Logger.Printf("The temperature program was not started correctly, and the process PID is 0 (DeviceUUID=%s)", deviceData.DeviceUUID)
return
}
// 检查进程是否真的在运行
if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil {
logger.Logger.Printf("The temperature program process check failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
logger.Logger.Printf("The temperature program has been successfully startedPID: %d (DeviceUUID=%s)", tempCmd.Process.Pid, deviceData.DeviceUUID)
// 增加TCP服务启动前的额外检查
logger.Logger.Printf("Wait for the temperature program to be initialized... (DeviceUUID=%s)", deviceData.DeviceUUID)
time.Sleep(2 * time.Second)
// 等待 TCP 服务器就绪
logger.Logger.Printf("Wait for the temperature program TCP server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
if !waitForTCPService(tempPort, 20*time.Second) { // 增加超时时间
logger.Logger.Printf("The TCP server of the temperature program has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
// 检查温度程序是否还在运行
if tempCmd.Process != nil {
if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil {
logger.Logger.Printf("The temperature program has abnormally exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
} else {
logger.Logger.Printf("The temperature program is still running but the TCP service has not started. There might be a configuration issue(DeviceUUID=%s)", deviceData.DeviceUUID)
}
}
return
}
// 创建 TCP 客户端
tempClient, err := tcpreader.NewTCPClient(deviceData.DeviceUUID, tempPort)
if err != nil {
logger.Logger.Printf("Failed to create a TCP client (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
defer tempClient.Close()
logger.Logger.Printf("The TCP client has been successfully created (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
// 确保上传目录存在
if err := os.MkdirAll("/data/upload", 0755); err != nil {
logger.Logger.Printf("Failed to create the upload directory: %v", err)
return
}
logger.Logger.Printf("The directory upload has been successfully created: /data/upload")
// 主循环:从通道接收推理数据
logger.Logger.Printf("Start monitoring the equipment (DeviceUUID=%s)", deviceData.DeviceUUID)
//var lastPersonCount = -1
//var lastTemperature float64 = -1
monitorTicker := time.NewTicker(2 * time.Second)
defer monitorTicker.Stop()
for {
select {
case <-deviceCtx.Done():
logger.Logger.Printf("Equipment processing has been cancelled: DeviceUUID=%s", deviceData.DeviceUUID)
// 强制停止所有子进程
if inferenceCmd.Process != nil {
inferenceCmd.Process.Kill()
}
if tempCmd.Process != nil {
tempCmd.Process.Kill()
}
tempClient.Close()
return
case inferenceResult, ok := <-inferenceServer.GetDataChan():
if !ok {
logger.Logger.Printf("The inference data channel is closed: DeviceUUID=%s", deviceData.DeviceUUID)
return
}
currentPersonCount := 0
for _, param := range inferenceResult.Params {
if param.ClassIdx == 1 {
currentPersonCount = param.Number
break
}
}
if currentPersonCount != deviceData.PersonCount {
deviceData.PersonCount = currentPersonCount
connect.UpdateDeviceData(deviceData)
logger.Logger.Printf("Personnel quantity update: %d (DeviceUUID=%s)", currentPersonCount, deviceData.DeviceUUID)
}
case <-monitorTicker.C:
now := time.Now().Unix()
// === 获取当前设备独立状态 ===
state := getDeviceState(deviceData.DeviceUUID)
// === 读取温度(防抖)===
tempResult, err := tempClient.ReadTemperatureResult()
if err == nil {
if math.Abs(tempResult.Tmp-deviceData.Temperature) >= 0.5 {
oldTemp := deviceData.Temperature
deviceData.Temperature = tempResult.Tmp
connect.UpdateDeviceData(deviceData)
GlobalDeviceManager.AddDevice(deviceData)
logger.Logger.Printf("Temperature update: %.1f°C → %.1f°C (DeviceUUID=%s)",
oldTemp, tempResult.Tmp, deviceData.DeviceUUID)
}
}
// 获取参数
detectionSec := getIntFromNumber(deviceData.DetectionTime, 20)
alarmCooldown := getIntFromNumber(deviceData.AlarmTime, 300)
// === 防抖:无人 + 温度稳定超阈值 ===
noPerson := deviceData.PersonCount == 0
// 温度防抖
var tempStableHigh bool
threshold := deviceData.TemperatureThreshold
if deviceData.Temperature > threshold+0.5 {
tempStableHigh = true
} else if deviceData.Temperature < threshold-0.5 {
tempStableHigh = false
} else {
tempStableHigh = deviceData.Temperature > threshold
}
conditionMet := noPerson && tempStableHigh
// 1. 冷却期检查
if now < state.AlarmPauseUntil {
if state.InCondition {
logger.Logger.Printf("Alarm in cooldown: %ds remaining (DeviceUUID=%s)",
state.AlarmPauseUntil-now, deviceData.DeviceUUID)
state.InCondition = false
}
continue
}
// 2. 条件满足
if conditionMet {
if !state.InCondition {
state.StartTime = now
state.InCondition = true
logger.Logger.Printf("Alarm condition met, start timing (%ds required): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)",
detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID)
} else if now-state.StartTime >= int64(detectionSec) {
logger.Logger.Printf("ALARM TRIGGERED (after %ds): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)",
detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID)
go triggerAlarmWithDelay(deviceData, state.StartTime, alarmCooldown)
state.AlarmPauseUntil = now + int64(alarmCooldown)
state.InCondition = false
deviceDataCopy := deviceData
deviceDataCopy.AlarmPauseUntil = state.AlarmPauseUntil
connect.UpdateDeviceData(deviceDataCopy)
}
} else {
if state.InCondition {
logger.Logger.Printf("Alarm condition interrupted: lasted %ds < %ds (DeviceUUID=%s)",
now-state.StartTime, detectionSec, deviceData.DeviceUUID)
state.InCondition = false
}
}
}
}
}
func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup) {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
select {
case <-stop:
logger.Logger.Printf("Upon receiving the stop signal, start cleaning...")
cancel()
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
logger.Logger.Printf("All goroutines exited normally")
case <-time.After(10 * time.Second):
logger.Logger.Printf("When the waiting time exceeds the limit, force exit")
// Log status before force exit
activeDevices := GlobalDeviceManager.GetAllDevices()
logger.Logger.Printf("It still exists when forced to exit %d An active device", len(activeDevices))
for uuid := range activeDevices {
logger.Logger.Printf("Forcibly stop the equipment: UUID=%s", uuid)
}
}
case <-ctx.Done():
logger.Logger.Printf("The context has been cancelled.")
}
logger.Logger.Printf("The program exits successfully.")
os.Exit(0)
}
func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) {
logger.Logger.Printf("Start the initialization of asynchronous WebSocket...")
if err := connect.InitWSChannel(); err != nil {
logger.Logger.Printf("Failed to initialize WebSocket channel: %v", err)
return
}
logger.Logger.Printf("Waiting for WebSocket connection to be established...")
containerName := "fireleave-container"
for i := 0; i < 30; i++ {
select {
case <-ctx.Done():
logger.Logger.Printf("Context cancelled, aborting WebSocket registration")
return
case <-time.After(1 * time.Second):
client := connect.GetWSClient()
if client != nil && client.IsConnected() {
goto register
}
}
}
logger.Logger.Printf("WebSocket connection timeout after 30 seconds")
return
register:
serviceID, err := connect.RegisterService(containerName)
if err != nil {
logger.Logger.Printf("Service registration failed: %v", err)
} else {
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
}
// 新增:订阅本地告警 topic
if err := connect.SubscribeTopic("/dhlr/alert", connect.HandleBuzzerAlert); err != nil {
logger.Logger.Printf("订阅告警topic失败 (/dhlr/alert): %v", err)
} else {
logger.Logger.Printf("成功订阅本地告警topic: /dhlr/alert")
}
}
func (dm *DeviceManager) GetAllDevices() map[string]*connect.DeviceData {
dm.mu.Lock()
defer dm.mu.Unlock()
result := make(map[string]*connect.DeviceData)
for k, v := range dm.devices {
result[k] = v
}
return result
}
func jsonString(v any) string {
b, err := json.Marshal(v)
if err != nil {
logger.Logger.Printf("JSON编码失败: %v", err)
return ""
}
return string(b)
}
// main.go → triggerAlarmWithDelay
func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, cooldown int) {
key := fmt.Sprintf("%s_%d", deviceData.DeviceUUID, alarmTime)
if _, loaded := mergingAlarms.LoadOrStore(key, true); loaded {
return
}
defer mergingAlarms.Delete(key)
// === 1. 合并视频 ===
mergedFile, err := video_server.MergeVideo(deviceData.DeviceUUID, alarmTime)
if err != nil {
logger.Logger.Printf("视频合并失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// === 2. 上传 Nginx ===
var hostUUID string
configs, err := connect.LoadServiceConfig()
if err == nil {
for _, cfg := range configs {
if cfg.DeviceUUID == deviceData.DeviceUUID {
hostUUID = cfg.HostUUID
break
}
}
}
eventParamsForNginx := map[string]any{
"alarm_time": alarmTime,
"description": "无人工作区且温度超阈值报警",
"device_UUID": deviceData.DeviceUUID,
"host_uuid": hostUUID,
"person_count": deviceData.PersonCount,
"temperature": deviceData.Temperature,
}
videoURL, err := video_server.UploadToNginx(mergedFile, eventParamsForNginx, deviceData.UploadURL)
videoURLStr := ""
if err != nil {
logger.Logger.Printf("Nginx上传失败: %v", err)
} else {
videoURLStr = videoURL
logger.Logger.Printf("Nginx上传成功: %s", videoURL)
}
client := connect.GetWSClient()
if client == nil || !client.IsConnected() {
logger.Logger.Printf("WS未就绪延迟上报 (DeviceUUID=%s)", deviceData.DeviceUUID)
os.Remove(mergedFile)
return
}
// === 3. 上报事件 (event) ===
eventPayload := map[string]any{
"type": "event",
"args": map[string]any{
"level": "alarm",
"description": "无人工作区且温度超阈值报警",
"device_uuid": deviceData.DeviceUUID,
"host_uuid": hostUUID,
"video_url": videoURLStr,
"alarm_time": alarmTime,
"address": deviceData.Address,
},
}
eventMsg := map[string]any{
"method": "metric_data",
"params": map[string]any{
"route_key": fmt.Sprintf("/dhlr/device/%s/event", deviceData.DeviceUUID),
"metric": jsonString(eventPayload),
},
}
client.SendAsync(eventMsg)
logger.Logger.Printf("The alarm incident has been reported (DeviceUUID=%s)", deviceData.DeviceUUID)
// === 4. 上报数据快照 (data) ===
dataPayload := map[string]any{
"type": "data",
"args": map[string]any{
"person_count": deviceData.PersonCount,
"temperature": deviceData.Temperature,
"camera_rtsp": deviceData.CameraRTSP,
"task_id": deviceData.TaskID,
"camera_ip": deviceData.CameraIP,
"confidence": deviceData.Confidence,
"alarm_time": alarmTime,
"address": deviceData.Address,
},
}
dataMsg := map[string]any{
"method": "metric_data",
"params": map[string]any{
"route_key": fmt.Sprintf("/dhlr/device/%s/data", deviceData.DeviceUUID),
"metric": jsonString(dataPayload),
},
}
client.SendAsync(dataMsg)
logger.Logger.Printf("指标快照已上报 (DeviceUUID=%s, Temp=%.1f°C, 人:%d)",
deviceData.DeviceUUID, deviceData.Temperature, deviceData.PersonCount)
// === 5. 清理本地文件 ===
os.Remove(mergedFile)
video_server.CleanupOldRawVideos(deviceData.DeviceUUID)
video_server.CleanupOldVideos(deviceData.DeviceUUID)
}
func (dm *DeviceManager) RangeDevices(fn func(uuid string, device *connect.DeviceData) bool) {
dm.mu.Lock()
defer dm.mu.Unlock()
for uuid, device := range dm.devices {
if !fn(uuid, device) {
return
}
}
}
func main() {
var rtspURL string
var deviceUUID string
flag.StringVar(&rtspURL, "rtsp", "", "RTSP stream URL for testing")
flag.StringVar(&deviceUUID, "uuid", "", "Device UUID for testing")
flag.Parse()
if err := logger.InitLogger(); err != nil {
log.Fatalf("Logger initialization failed: %v", err)
}
defer logger.CloseLogger()
go func() {
for {
logger.CleanupOldLogs()
time.Sleep(24 * time.Hour)
}
}()
logger.Logger.Printf("=== Starting FireLeave Tool ===")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
logger.Logger.Printf("Step 1: Initialize device data from service config")
deviceDataList, err := connect.InitializeDevicesFromServiceConfig()
if err != nil {
logger.Logger.Printf("Device data initialization failed: %v", err)
logger.Logger.Printf("Will try to use existing device files...")
deviceDataList = connect.GetDeviceDataList()
}
logger.Logger.Printf("Step 2: Starting %d devices", len(deviceDataList))
for i, deviceData := range deviceDataList {
logger.Logger.Printf("Starting device[%d]: UUID=%s", i, deviceData.DeviceUUID)
wg.Add(1)
go func(deviceData connect.DeviceData, index int) {
defer wg.Done()
defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID)
logger.Logger.Printf("Device processing goroutine started: UUID=%s, Index=%d", deviceData.DeviceUUID, index)
processDevice(ctx, deviceData)
}(deviceData, i)
}
logger.Logger.Printf("Step 3: All devices started")
logger.Logger.Printf("Step 4: Asynchronously initialize WebSocket connection")
go initWebSocketAsync(ctx, &wg)
logger.Logger.Printf("=== Main program startup completed ===")
logger.Logger.Printf("System status: Running devices=%d", len(deviceDataList))
// Add device status monitoring
go monitorDeviceStatus(ctx, deviceDataList)
go func() {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
for range ticker.C {
GlobalDeviceManager.RangeDevices(func(uuid string, device *connect.DeviceData) bool {
if connect.ShouldRestartInference(uuid) {
logger.Logger.Printf("推理程序心跳超时(>90s准备重启: DeviceUUID=%s", uuid)
connect.StopInferenceProcess(uuid)
time.Sleep(6 * time.Second)
connect.StartInferenceProcess(device)
connect.UpdateInferenceHeartbeat(uuid)
}
return true
})
}
}()
go func() {
ticker := time.NewTicker(5 * time.Minute)
for range ticker.C {
for _, device := range GlobalDeviceManager.GetAllDevices() {
video_server.CleanupOldRawVideos(device.DeviceUUID)
}
}
}()
waitForShutdown(ctx, cancel, &wg)
}
func printGlobalDeviceStatus() {
now := time.Now().Unix()
activeDevices := GlobalDeviceManager.GetAllDevices()
if len(activeDevices) == 0 {
logger.Logger.Printf("=== Global Device Status monitoring === No running devices for the time being")
return
}
logger.Logger.Printf("=== Global Device Status monitoring === Currently running %d", len(activeDevices))
i := 1
for uuid, data := range activeDevices {
stateInf, _ := deviceStates.Load(uuid)
state := stateInf.(*deviceAlarmState)
cooldownLeft := int64(0)
if now < state.AlarmPauseUntil {
cooldownLeft = state.AlarmPauseUntil - now
}
logger.Logger.Printf(" [%d] DeviceUUID=%s | person:%d | temp:%.1f°C | threshold value:%.1f°C | cooling time:%ds",
i,
uuid,
data.PersonCount,
data.Temperature,
data.TemperatureThreshold,
cooldownLeft,
)
i++
}
}
func monitorDeviceStatus(ctx context.Context, initialDevices []connect.DeviceData) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
logger.Logger.Printf("Start global device status monitoring (print once every 30 seconds)")
for {
select {
case <-ticker.C:
printGlobalDeviceStatus()
if files, err := filepath.Glob("/data/devices/device_*.json"); err == nil {
logger.Logger.Printf("Device file status: A total of %d device files", len(files))
for i, file := range files {
if i < 3 {
logger.Logger.Printf(" device file[%d]: %s", i, filepath.Base(file))
}
}
}
logger.Logger.Printf("Operating equipment %d ", len(GlobalDeviceManager.GetAllDevices()))
case <-ctx.Done():
logger.Logger.Printf("Global device status monitoring has stopped")
return
}
}
}
func debugDeviceData(deviceData connect.DeviceData) {
logger.Logger.Printf("Debug device data: UUID=%s", deviceData.DeviceUUID)
logger.Logger.Printf(" TaskID: %q", deviceData.TaskID)
logger.Logger.Printf(" CameraRTSP: %q", deviceData.CameraRTSP)
logger.Logger.Printf(" CameraIP: %q", deviceData.CameraIP)
logger.Logger.Printf(" UploadURL: %q", deviceData.UploadURL)
logger.Logger.Printf(" DetectAreaStr: %q", deviceData.DetectAreaStr)
logger.Logger.Printf(" PersonCount: %d", deviceData.PersonCount)
logger.Logger.Printf(" Temperature: %f", deviceData.Temperature)
}