first commit
This commit is contained in:
parent
9b02bd9779
commit
c554fd3d3d
22
Dockerfile
Normal file
22
Dockerfile
Normal file
@ -0,0 +1,22 @@
|
||||
FROM debian:bullseye-slim
|
||||
|
||||
RUN ln -sf /usr/share/zoneinfo/Asia/Shanghai /etc/localtime && \
|
||||
echo "Asia/Shanghai" > /etc/timezone
|
||||
|
||||
# 复制 deb 包到容器中
|
||||
COPY mp4-merge_0.1.11-1_arm64.deb /tmp/
|
||||
|
||||
|
||||
RUN dpkg -i /tmp/mp4-merge_0.1.11-1_arm64.deb || (apt-get update && apt-get install -f -y)
|
||||
|
||||
|
||||
COPY models/ /usr/local/models/
|
||||
|
||||
# 复制启动脚本
|
||||
COPY start.sh /usr/local/start.sh
|
||||
RUN chmod +x /usr/local/start.sh
|
||||
|
||||
WORKDIR /usr/local/models
|
||||
RUN chmod +x yolov5
|
||||
|
||||
CMD ["/usr/local/start.sh"]
|
||||
@ -9,13 +9,11 @@ import (
|
||||
"strings"
|
||||
)
|
||||
|
||||
// Modified device data storage path
|
||||
var (
|
||||
devicesBaseDir = "/data/devices"
|
||||
deviceDataList []DeviceData
|
||||
)
|
||||
|
||||
// InitializeDevicesFromServiceConfig Initialize device data from service configuration
|
||||
func InitializeDevicesFromServiceConfig() ([]DeviceData, error) {
|
||||
deviceDataMutex.Lock()
|
||||
defer deviceDataMutex.Unlock()
|
||||
@ -38,15 +36,12 @@ func InitializeDevicesFromServiceConfig() ([]DeviceData, error) {
|
||||
|
||||
logger.Logger.Printf("Loaded %d devices from service configuration", len(configs))
|
||||
|
||||
// Clear existing device data
|
||||
deviceDataList = []DeviceData{}
|
||||
|
||||
// Process each device and save to individual files
|
||||
for i, config := range configs {
|
||||
processedData := ProcessDeviceDataForInference(config)
|
||||
deviceDataList = append(deviceDataList, processedData)
|
||||
|
||||
// Save to individual device file
|
||||
if err := saveDeviceToFile(processedData); err != nil {
|
||||
logger.Logger.Printf("Failed to save the device file [%s]: %v", processedData.DeviceUUID, err)
|
||||
continue
|
||||
@ -58,11 +53,9 @@ func InitializeDevicesFromServiceConfig() ([]DeviceData, error) {
|
||||
return deviceDataList, nil
|
||||
}
|
||||
|
||||
// saveDeviceToFile Save individual device data to separate file
|
||||
func saveDeviceToFile(deviceData DeviceData) error {
|
||||
filePath := filepath.Join(devicesBaseDir, fmt.Sprintf("device_%s.json", deviceData.DeviceUUID))
|
||||
|
||||
// Clean device data to ensure no invalid characters
|
||||
cleanedData := cleanDeviceData(deviceData)
|
||||
|
||||
data, err := json.MarshalIndent(cleanedData, "", " ")
|
||||
@ -71,7 +64,6 @@ func saveDeviceToFile(deviceData DeviceData) error {
|
||||
return fmt.Errorf("Device data serialization failed: %v", err)
|
||||
}
|
||||
|
||||
// Write directly to file
|
||||
if err := os.WriteFile(filePath, data, 0644); err != nil {
|
||||
return fmt.Errorf("Failed to write device file: %v", err)
|
||||
}
|
||||
@ -80,11 +72,9 @@ func saveDeviceToFile(deviceData DeviceData) error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// cleanDeviceData Clean device data, remove possible invalid characters
|
||||
func cleanDeviceData(data DeviceData) DeviceData {
|
||||
cleaned := data
|
||||
|
||||
// Clean string fields
|
||||
cleaned.DeviceUUID = cleanString(data.DeviceUUID)
|
||||
cleaned.TaskID = cleanString(data.TaskID)
|
||||
cleaned.CameraRTSP = cleanString(data.CameraRTSP)
|
||||
@ -92,9 +82,8 @@ func cleanDeviceData(data DeviceData) DeviceData {
|
||||
cleaned.UploadURL = cleanString(data.UploadURL)
|
||||
cleaned.DetectAreaStr = cleanString(data.DetectAreaStr)
|
||||
|
||||
// Ensure numeric fields are valid
|
||||
if cleaned.Confidence < 0 || cleaned.Confidence > 100 {
|
||||
cleaned.Confidence = 70 // Default value
|
||||
cleaned.Confidence = 70
|
||||
}
|
||||
if cleaned.CameraChannel < 0 {
|
||||
cleaned.CameraChannel = 0
|
||||
@ -106,14 +95,14 @@ func cleanDeviceData(data DeviceData) DeviceData {
|
||||
cleaned.Temperature = 0
|
||||
}
|
||||
if cleaned.TemperatureThreshold <= 0 {
|
||||
cleaned.TemperatureThreshold = 45.0 // Default value
|
||||
cleaned.TemperatureThreshold = 45.0
|
||||
}
|
||||
|
||||
return cleaned
|
||||
}
|
||||
|
||||
func cleanString(s string) string {
|
||||
// Remove control characters and invalid Unicode characters
|
||||
|
||||
return strings.Map(func(r rune) rune {
|
||||
if r == 0xFFFD { // Unicode replacement character
|
||||
return -1
|
||||
@ -125,12 +114,10 @@ func cleanString(s string) string {
|
||||
}, s)
|
||||
}
|
||||
|
||||
// GetDeviceDataList Get device data list
|
||||
func GetDeviceDataList() []DeviceData {
|
||||
deviceDataMutex.Lock()
|
||||
defer deviceDataMutex.Unlock()
|
||||
|
||||
// If no data in memory, try to load from device files
|
||||
if len(deviceDataList) == 0 {
|
||||
logger.Logger.Printf("No device data in memory, loading from device files...")
|
||||
if err := loadDevicesFromFiles(); err != nil {
|
||||
@ -139,18 +126,15 @@ func GetDeviceDataList() []DeviceData {
|
||||
}
|
||||
}
|
||||
|
||||
// Return copy
|
||||
result := make([]DeviceData, len(deviceDataList))
|
||||
copy(result, deviceDataList)
|
||||
return result
|
||||
}
|
||||
|
||||
// loadDevicesFromFiles Load data from device files
|
||||
func loadDevicesFromFiles() error {
|
||||
// Clear existing data
|
||||
deviceDataList = []DeviceData{}
|
||||
|
||||
// Read all files in device directory
|
||||
files, err := filepath.Glob(filepath.Join(devicesBaseDir, "device_*.json"))
|
||||
if err != nil {
|
||||
return fmt.Errorf("Failed to find device files: %v", err)
|
||||
@ -179,7 +163,6 @@ func loadDevicesFromFiles() error {
|
||||
return nil
|
||||
}
|
||||
|
||||
// UpdateDeviceData Update device data
|
||||
func UpdateDeviceData(data DeviceData) error {
|
||||
deviceDataMutex.Lock()
|
||||
defer deviceDataMutex.Unlock()
|
||||
@ -189,7 +172,6 @@ func UpdateDeviceData(data DeviceData) error {
|
||||
// Clean input data
|
||||
cleanedData := cleanDeviceData(data)
|
||||
|
||||
// Find if device with same DeviceUUID already exists
|
||||
found := false
|
||||
for i, existing := range deviceDataList {
|
||||
if existing.DeviceUUID == cleanedData.DeviceUUID {
|
||||
@ -200,13 +182,11 @@ func UpdateDeviceData(data DeviceData) error {
|
||||
}
|
||||
}
|
||||
|
||||
// If not exists, add to list
|
||||
if !found {
|
||||
logger.Logger.Printf("Adding new device to list: DeviceUUID=%s", cleanedData.DeviceUUID)
|
||||
deviceDataList = append(deviceDataList, cleanedData)
|
||||
}
|
||||
|
||||
// Save to individual file
|
||||
if err := saveDeviceToFile(cleanedData); err != nil {
|
||||
logger.Logger.Printf("Failed to save device file: %v", err)
|
||||
return err
|
||||
|
||||
88
connect/heartbeat.go
Normal file
88
connect/heartbeat.go
Normal file
@ -0,0 +1,88 @@
|
||||
package connect
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"os"
|
||||
"os/exec"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
"FireLeave_tool/logger"
|
||||
)
|
||||
|
||||
var (
|
||||
InferenceProcesses sync.Map // key: deviceUUID string, value: *exec.Cmd
|
||||
|
||||
// 记录最后一次收到心跳的时间
|
||||
inferenceLastHeartbeat sync.Map // key: deviceUUID string, value: time.Time
|
||||
)
|
||||
|
||||
// UpdateInferenceHeartbeat 每收到一帧 /video/post 就调用这个
|
||||
func UpdateInferenceHeartbeat(deviceUUID string) {
|
||||
inferenceLastHeartbeat.Store(deviceUUID, time.Now())
|
||||
|
||||
}
|
||||
|
||||
// ShouldRestartInference 判断是否需要重启推理程序
|
||||
func ShouldRestartInference(deviceUUID string) bool {
|
||||
val, ok := inferenceLastHeartbeat.Load(deviceUUID)
|
||||
if !ok {
|
||||
return true // 从未收到过心跳,肯定要启动
|
||||
}
|
||||
last := val.(time.Time)
|
||||
return time.Since(last) > 85*time.Second
|
||||
}
|
||||
|
||||
// StopInferenceProcess 停止推理进程(供 main 和 DeviceManager 调用)
|
||||
func StopInferenceProcess(deviceUUID string) {
|
||||
if raw, ok := InferenceProcesses.Load(deviceUUID); ok {
|
||||
if cmd, ok := raw.(*exec.Cmd); ok && cmd.Process != nil {
|
||||
logger.Logger.Printf("正在停止推理进程 (DeviceUUID=%s, PID=%d)", deviceUUID, cmd.Process.Pid)
|
||||
cmd.Process.Kill()
|
||||
cmd.Wait()
|
||||
}
|
||||
InferenceProcesses.Delete(deviceUUID)
|
||||
}
|
||||
}
|
||||
|
||||
// StartInferenceProcess 重启推理程序(保持原端口!)
|
||||
func StartInferenceProcess(deviceData *DeviceData) {
|
||||
|
||||
StopInferenceProcess(deviceData.DeviceUUID)
|
||||
|
||||
port, err := GlobalPortManager.GetPort(deviceData.DeviceUUID)
|
||||
if err != nil {
|
||||
logger.Logger.Printf("获取端口失败,无法重启推理程序 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||||
return
|
||||
}
|
||||
|
||||
detectAreaStr := ProcessDetectAreaForInference(deviceData.DetectArea)
|
||||
|
||||
inferenceCmd := exec.Command(
|
||||
"./yolov5",
|
||||
"-s", deviceData.CameraRTSP,
|
||||
"-m", "model",
|
||||
"-c", deviceData.DeviceUUID,
|
||||
"-p", fmt.Sprintf("%d", port),
|
||||
"-w", detectAreaStr,
|
||||
"-r", fmt.Sprintf("%d", deviceData.Confidence),
|
||||
)
|
||||
|
||||
inferenceCmd.Stdout = os.Stdout
|
||||
inferenceCmd.Stderr = os.Stderr
|
||||
|
||||
logger.Logger.Printf("正在重启推理程序 (使用原端口 %d): %v", port, inferenceCmd.Args)
|
||||
|
||||
if err := inferenceCmd.Start(); err != nil {
|
||||
logger.Logger.Printf("推理程序重启失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||||
return
|
||||
}
|
||||
|
||||
// 保存进程对象
|
||||
InferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
|
||||
logger.Logger.Printf("推理程序已成功重启,PID=%d (DeviceUUID=%s, Port=%d)",
|
||||
inferenceCmd.Process.Pid, deviceData.DeviceUUID, port)
|
||||
|
||||
// 重启后立即刷新一次心跳,防止 30s 检测又触发
|
||||
UpdateInferenceHeartbeat(deviceData.DeviceUUID)
|
||||
}
|
||||
@ -20,7 +20,7 @@ type PortManager struct {
|
||||
}
|
||||
|
||||
func NewPortManager() *PortManager {
|
||||
// 推理端口范围:58881-58890(HTTP),支持最多10个设备
|
||||
|
||||
ports := []int{58881, 58882, 58883, 58884, 58885, 58886, 58887, 58888, 58889, 58890}
|
||||
|
||||
// 初始化可用端口映射
|
||||
|
||||
@ -511,6 +511,7 @@ func (c *WSClient) heartbeatLoop() {
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
func (c *WSClient) reRegisterAndResubscribe() {
|
||||
// 1. 等待连接稳定
|
||||
time.Sleep(1 * time.Second)
|
||||
|
||||
24
fire_leave_config
Normal file
24
fire_leave_config
Normal file
@ -0,0 +1,24 @@
|
||||
{
|
||||
"image": "fire_leave:latest",
|
||||
"container_name": "fire_leave",
|
||||
"command": [
|
||||
"/bin/bash",
|
||||
"/usr/local/start.sh"
|
||||
],
|
||||
"volumes": [
|
||||
"/lib/npu:/lib/npu",
|
||||
"/lib:/lib:ro",
|
||||
"/usr/lib:/usr/lib:ro"
|
||||
],
|
||||
"restart": "always",
|
||||
"network_mode": "host",
|
||||
"working_dir": "/usr/local/models",
|
||||
"hostname": "myhost",
|
||||
"privileged": true,
|
||||
"devices": [
|
||||
"/dev/galcore:/dev/galcore",
|
||||
"/dev/dri/card0:/dev/dri/card0",
|
||||
"/dev/vpu_service:/dev/vpu_service",
|
||||
"/dev/rga:/dev/rga"
|
||||
]
|
||||
}
|
||||
BIN
fireleave_tool
BIN
fireleave_tool
Binary file not shown.
@ -1,12 +1,14 @@
|
||||
package httpreader
|
||||
|
||||
import (
|
||||
"FireLeave_tool/connect"
|
||||
"bytes"
|
||||
"context"
|
||||
"encoding/json"
|
||||
"fmt"
|
||||
"io"
|
||||
"net/http"
|
||||
"os"
|
||||
"sync"
|
||||
"time"
|
||||
|
||||
@ -113,20 +115,17 @@ func (is *InferenceServer) handleVideoPost(w http.ResponseWriter, r *http.Reques
|
||||
http.Error(w, `{"error":"Missing serial field"}`, http.StatusBadRequest)
|
||||
return
|
||||
}
|
||||
now := time.Now().Format("2006/01/02 15:04:05.000")
|
||||
logger.Logger.Printf("♥ 推理心跳收到 | 时间: %s | UUID: %s | Serial: %s | 人数量: %d",
|
||||
now, is.deviceUUID, result.Serial, getPersonCount(result))
|
||||
|
||||
// 只在人员数量变化或重要事件时记录详细日志
|
||||
if len(result.Params) > 0 {
|
||||
hasPerson := false
|
||||
for _, param := range result.Params {
|
||||
if param.ClassIdx == 1 && param.Number > 0 {
|
||||
hasPerson = true
|
||||
break
|
||||
}
|
||||
}
|
||||
if hasPerson {
|
||||
logger.Logger.Printf("Inference data: Serial=%s, Person detected, ParamsCount=%d",
|
||||
result.Serial, len(result.Params))
|
||||
}
|
||||
// 这一行就是你缺失的救命稻草!
|
||||
connect.UpdateInferenceHeartbeat(is.deviceUUID)
|
||||
|
||||
// 额外写一份永不丢失的独立心跳日志
|
||||
if f, err := os.OpenFile("/data/heartbeat.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644); err == nil {
|
||||
defer f.Close()
|
||||
fmt.Fprintf(f, "[%s] HEARTBEAT OK | %s | person=%d\n", now, is.deviceUUID, getPersonCount(result))
|
||||
}
|
||||
|
||||
// 发送到处理通道
|
||||
@ -179,3 +178,11 @@ func (s *InferenceServer) StopWithContext(ctx context.Context) {
|
||||
logger.Logger.Printf("The HTTP server has been shut down (DeviceUUID=%s)", s.deviceUUID)
|
||||
}
|
||||
}
|
||||
func getPersonCount(result InferenceResult) int {
|
||||
for _, p := range result.Params {
|
||||
if p.ClassIdx == 1 {
|
||||
return p.Number
|
||||
}
|
||||
}
|
||||
return 0
|
||||
}
|
||||
|
||||
@ -1,121 +0,0 @@
|
||||
package httpreader
|
||||
|
||||
import (
|
||||
"fmt"
|
||||
"net"
|
||||
"net/http"
|
||||
"time"
|
||||
|
||||
"FireLeave_tool/logger"
|
||||
)
|
||||
|
||||
// CheckHTTPService 检查HTTP服务是否可用
|
||||
func CheckHTTPService(port int, timeout time.Duration) bool {
|
||||
client := &http.Client{
|
||||
Timeout: timeout,
|
||||
}
|
||||
|
||||
url := fmt.Sprintf("http://127.0.0.1:%d/health", port)
|
||||
resp, err := client.Get(url)
|
||||
if err != nil {
|
||||
logger.Logger.Printf("HTTP服务健康检查失败 (Port=%d): %v", port, err)
|
||||
return false
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
return resp.StatusCode == http.StatusOK
|
||||
}
|
||||
|
||||
// CheckPortAvailable 检查端口是否可用
|
||||
func CheckPortAvailable(port int) bool {
|
||||
address := fmt.Sprintf(":%d", port)
|
||||
listener, err := net.Listen("tcp", address)
|
||||
if err != nil {
|
||||
return false
|
||||
}
|
||||
listener.Close()
|
||||
return true
|
||||
}
|
||||
|
||||
// WaitForHTTPService 等待HTTP服务启动
|
||||
func WaitForHTTPService(port int, timeout time.Duration) bool {
|
||||
deadline := time.Now().Add(timeout)
|
||||
logger.Logger.Printf("等待HTTP服务启动 (Port=%d, Timeout=%v)", port, timeout)
|
||||
|
||||
attempt := 0
|
||||
for time.Now().Before(deadline) {
|
||||
attempt++
|
||||
|
||||
if CheckHTTPService(port, 2*time.Second) {
|
||||
logger.Logger.Printf("HTTP服务启动成功 (Port=%d, 尝试次数=%d)", port, attempt)
|
||||
return true
|
||||
}
|
||||
|
||||
if attempt == 1 || attempt%5 == 0 {
|
||||
logger.Logger.Printf("等待HTTP服务... (Port=%d, 尝试次数=%d)", port, attempt)
|
||||
}
|
||||
|
||||
time.Sleep(1 * time.Second)
|
||||
}
|
||||
|
||||
logger.Logger.Printf("HTTP服务启动超时 (Port=%d, 总尝试次数=%d)", port, attempt)
|
||||
return false
|
||||
}
|
||||
|
||||
// ValidateInferenceResult 验证推理结果数据
|
||||
func ValidateInferenceResult(result *InferenceResult) error {
|
||||
if result.Serial == "" {
|
||||
return fmt.Errorf("serial字段不能为空")
|
||||
}
|
||||
if result.At <= 0 {
|
||||
return fmt.Errorf("at时间戳无效")
|
||||
}
|
||||
|
||||
// 验证参数数据
|
||||
for i, param := range result.Params {
|
||||
if param.Number < 0 {
|
||||
return fmt.Errorf("参数[%d]的number不能为负数", i)
|
||||
}
|
||||
if param.ClassIdx < 0 {
|
||||
return fmt.Errorf("参数[%d]的class_idx不能为负数", i)
|
||||
}
|
||||
}
|
||||
|
||||
return nil
|
||||
}
|
||||
|
||||
// GetPersonCountFromResult 从推理结果中获取人员数量
|
||||
func GetPersonCountFromResult(result *InferenceResult) int {
|
||||
if result == nil || len(result.Params) == 0 {
|
||||
return 0
|
||||
}
|
||||
|
||||
// 查找人员类别的参数(假设 class_idx=1 表示人员)
|
||||
for _, param := range result.Params {
|
||||
if param.ClassIdx == 1 { // 人员类别
|
||||
return param.Number
|
||||
}
|
||||
}
|
||||
|
||||
return 0
|
||||
}
|
||||
|
||||
// CreateTestInferenceResult 创建测试用的推理结果(用于调试)
|
||||
func CreateTestInferenceResult(deviceUUID string, personCount int) InferenceResult {
|
||||
return InferenceResult{
|
||||
Serial: deviceUUID,
|
||||
At: time.Now().Unix(),
|
||||
Type: 1,
|
||||
Params: []struct {
|
||||
ClassIdx int `json:"class_idx"`
|
||||
Name string `json:"name"`
|
||||
Number int `json:"number"`
|
||||
}{
|
||||
{
|
||||
ClassIdx: 1,
|
||||
Name: "person",
|
||||
Number: personCount,
|
||||
},
|
||||
},
|
||||
}
|
||||
}
|
||||
116
main.go
116
main.go
@ -23,10 +23,6 @@ import (
|
||||
"time"
|
||||
)
|
||||
|
||||
var (
|
||||
inferenceLastHeartbeat sync.Map // key: deviceUUID, value: time.Time
|
||||
)
|
||||
|
||||
type deviceAlarmState struct {
|
||||
InCondition bool
|
||||
StartTime int64
|
||||
@ -250,12 +246,12 @@ func processDevice(ctx context.Context, deviceData connect.DeviceData) {
|
||||
}
|
||||
|
||||
// 关键:保存进程对象到全局 map
|
||||
inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
|
||||
connect.InferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
|
||||
|
||||
logger.Logger.Printf("推理程序已启动,PID=%d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
|
||||
|
||||
// 初始化心跳
|
||||
UpdateInferenceHeartbeat(deviceData.DeviceUUID)
|
||||
connect.UpdateInferenceHeartbeat(deviceData.DeviceUUID)
|
||||
|
||||
// 检查推理程序是否启动成功
|
||||
time.Sleep(2 * time.Second)
|
||||
@ -492,10 +488,7 @@ func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.Wa
|
||||
case <-stop:
|
||||
logger.Logger.Printf("Upon receiving the stop signal, start cleaning...")
|
||||
|
||||
// First cancel all contexts
|
||||
cancel()
|
||||
|
||||
// Wait for all goroutines to complete, but with timeout
|
||||
done := make(chan struct{})
|
||||
go func() {
|
||||
wg.Wait()
|
||||
@ -523,17 +516,14 @@ func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.Wa
|
||||
os.Exit(0)
|
||||
}
|
||||
|
||||
// initWebSocketAsync Asynchronously initializes WebSocket
|
||||
func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) {
|
||||
logger.Logger.Printf("Start the initialization of asynchronous WebSocket...")
|
||||
|
||||
// 1. Initialize WebSocket channel
|
||||
if err := connect.InitWSChannel(); err != nil {
|
||||
logger.Logger.Printf("Failed to initialize WebSocket channel: %v", err)
|
||||
return
|
||||
}
|
||||
|
||||
// 2. Wait for connection to be ready (with context cancellation)
|
||||
logger.Logger.Printf("Waiting for WebSocket connection to be established...")
|
||||
containerName := "fireleave-container"
|
||||
|
||||
@ -545,7 +535,7 @@ func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) {
|
||||
case <-time.After(1 * time.Second):
|
||||
client := connect.GetWSClient()
|
||||
if client != nil && client.IsConnected() {
|
||||
// Connection successful → proceed to register
|
||||
|
||||
goto register
|
||||
}
|
||||
}
|
||||
@ -555,7 +545,7 @@ func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) {
|
||||
return
|
||||
|
||||
register:
|
||||
// 3. Register service
|
||||
|
||||
serviceID, err := connect.RegisterService(containerName)
|
||||
if err != nil {
|
||||
logger.Logger.Printf("Service registration failed: %v", err)
|
||||
@ -571,12 +561,10 @@ register:
|
||||
}
|
||||
}
|
||||
|
||||
// GetAllDevices Get all devices (new method)
|
||||
func (dm *DeviceManager) GetAllDevices() map[string]*connect.DeviceData {
|
||||
dm.mu.Lock()
|
||||
defer dm.mu.Unlock()
|
||||
|
||||
// Return copy to avoid concurrent modification
|
||||
result := make(map[string]*connect.DeviceData)
|
||||
for k, v := range dm.devices {
|
||||
result[k] = v
|
||||
@ -701,99 +689,29 @@ func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, coold
|
||||
video_server.CleanupOldRawVideos(deviceData.DeviceUUID)
|
||||
video_server.CleanupOldVideos(deviceData.DeviceUUID)
|
||||
}
|
||||
func UpdateInferenceHeartbeat(deviceUUID string) {
|
||||
inferenceLastHeartbeat.Store(deviceUUID, time.Now())
|
||||
}
|
||||
func ShouldRestartInference(deviceUUID string) bool {
|
||||
if val, ok := inferenceLastHeartbeat.Load(deviceUUID); ok {
|
||||
last := val.(time.Time)
|
||||
return time.Since(last) > 90*time.Second
|
||||
}
|
||||
return true // 从未收到过,也要启动
|
||||
}
|
||||
func (dm *DeviceManager) RangeDevices(fn func(uuid string, device *connect.DeviceData) bool) {
|
||||
dm.mu.Lock()
|
||||
defer dm.mu.Unlock()
|
||||
|
||||
for uuid, device := range dm.devices {
|
||||
// 拷贝一份指针,防止外部修改影响遍历
|
||||
devCopy := device
|
||||
if !fn(uuid, devCopy) {
|
||||
|
||||
if !fn(uuid, device) {
|
||||
return
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
// StopInferenceProcess 停止指定设备的推理进程(你现在 defer 中就是这么干的,抽出来统一管理)
|
||||
func (dm *DeviceManager) StopInferenceProcess(deviceUUID string) {
|
||||
// 这里你之前没有保存 cmd,我们需要一个地方存起来!
|
||||
// 所以我们要加一个 map 保存每个设备的推理进程
|
||||
if cmd, ok := inferenceProcesses.Load(deviceUUID); ok {
|
||||
if process, ok2 := cmd.(*exec.Cmd); ok2 && process.Process != nil {
|
||||
logger.Logger.Printf("正在停止推理进程 (DeviceUUID=%s, PID=%d)", deviceUUID, process.Process.Pid)
|
||||
process.Process.Kill()
|
||||
process.Wait() // 等待退出,防止僵尸进程
|
||||
}
|
||||
inferenceProcesses.Delete(deviceUUID)
|
||||
}
|
||||
}
|
||||
|
||||
// StartInferenceProcess 启动推理进程(把你 processDevice 里启动推理的部分抽出来)
|
||||
func (dm *DeviceManager) StartInferenceProcess(deviceData *connect.DeviceData) {
|
||||
dm.StopInferenceProcess(deviceData.DeviceUUID)
|
||||
|
||||
// 正确:只查询,不分配
|
||||
port, err := connect.GlobalPortManager.GetPort(deviceData.DeviceUUID)
|
||||
if err != nil {
|
||||
logger.Logger.Printf("获取端口失败,无法重启推理程序 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||||
return
|
||||
}
|
||||
|
||||
detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea)
|
||||
|
||||
inferenceCmd := exec.Command(
|
||||
"./yolov5",
|
||||
"-s", deviceData.CameraRTSP,
|
||||
"-m", "model",
|
||||
"-c", deviceData.DeviceUUID,
|
||||
"-p", fmt.Sprintf("%d", port), // 用老端口!
|
||||
"-w", detectAreaStr,
|
||||
"-r", fmt.Sprintf("%d", deviceData.Confidence),
|
||||
)
|
||||
inferenceCmd.Stdout = os.Stdout
|
||||
inferenceCmd.Stderr = os.Stderr
|
||||
|
||||
logger.Logger.Printf("正在重启推理程序 (使用原端口 %d): %v", port, inferenceCmd.Args)
|
||||
|
||||
if err := inferenceCmd.Start(); err != nil {
|
||||
logger.Logger.Printf("推理程序重启失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
|
||||
return
|
||||
}
|
||||
|
||||
inferenceProcesses.Store(deviceData.DeviceUUID, inferenceCmd)
|
||||
logger.Logger.Printf("推理程序已成功重启,PID=%d (DeviceUUID=%s, Port=%d)",
|
||||
inferenceCmd.Process.Pid, deviceData.DeviceUUID, port)
|
||||
|
||||
UpdateInferenceHeartbeat(deviceData.DeviceUUID)
|
||||
}
|
||||
|
||||
var inferenceProcesses sync.Map
|
||||
|
||||
func main() {
|
||||
// Define command line parameters (remove ws parameter)
|
||||
|
||||
var rtspURL string
|
||||
var deviceUUID string
|
||||
flag.StringVar(&rtspURL, "rtsp", "", "RTSP stream URL for testing")
|
||||
flag.StringVar(&deviceUUID, "uuid", "", "Device UUID for testing")
|
||||
flag.Parse()
|
||||
|
||||
// Initialize logger
|
||||
if err := logger.InitLogger(); err != nil {
|
||||
log.Fatalf("Logger initialization failed: %v", err)
|
||||
}
|
||||
defer logger.CloseLogger()
|
||||
|
||||
// Log cleanup
|
||||
go func() {
|
||||
for {
|
||||
logger.CleanupOldLogs()
|
||||
@ -801,20 +719,18 @@ func main() {
|
||||
}
|
||||
}()
|
||||
|
||||
// Directly enter normal mode (remove wsMode judgment)
|
||||
logger.Logger.Printf("=== Starting FireLeave Tool ===")
|
||||
|
||||
ctx, cancel := context.WithCancel(context.Background())
|
||||
defer cancel()
|
||||
var wg sync.WaitGroup
|
||||
|
||||
// 1. Initialize device data directly from service config
|
||||
logger.Logger.Printf("Step 1: Initialize device data from service config")
|
||||
deviceDataList, err := connect.InitializeDevicesFromServiceConfig()
|
||||
if err != nil {
|
||||
logger.Logger.Printf("Device data initialization failed: %v", err)
|
||||
logger.Logger.Printf("Will try to use existing device files...")
|
||||
// If initialization fails, try to load from existing files
|
||||
|
||||
deviceDataList = connect.GetDeviceDataList()
|
||||
}
|
||||
|
||||
@ -833,7 +749,6 @@ func main() {
|
||||
|
||||
logger.Logger.Printf("Step 3: All devices started")
|
||||
|
||||
// 3. Asynchronously initialize WebSocket
|
||||
logger.Logger.Printf("Step 4: Asynchronously initialize WebSocket connection")
|
||||
go initWebSocketAsync(ctx, &wg)
|
||||
|
||||
@ -848,12 +763,12 @@ func main() {
|
||||
defer ticker.Stop()
|
||||
for range ticker.C {
|
||||
GlobalDeviceManager.RangeDevices(func(uuid string, device *connect.DeviceData) bool {
|
||||
if ShouldRestartInference(uuid) {
|
||||
if connect.ShouldRestartInference(uuid) {
|
||||
logger.Logger.Printf("推理程序心跳超时(>90s),准备重启: DeviceUUID=%s", uuid)
|
||||
GlobalDeviceManager.StopInferenceProcess(uuid)
|
||||
time.Sleep(2 * time.Second)
|
||||
GlobalDeviceManager.StartInferenceProcess(device)
|
||||
UpdateInferenceHeartbeat(uuid) // 防止重复重启
|
||||
connect.StopInferenceProcess(uuid)
|
||||
time.Sleep(6 * time.Second)
|
||||
connect.StartInferenceProcess(device)
|
||||
connect.UpdateInferenceHeartbeat(uuid)
|
||||
}
|
||||
return true
|
||||
})
|
||||
@ -867,7 +782,7 @@ func main() {
|
||||
}
|
||||
}
|
||||
}()
|
||||
// Wait for termination signal
|
||||
|
||||
waitForShutdown(ctx, cancel, &wg)
|
||||
}
|
||||
func printGlobalDeviceStatus() {
|
||||
@ -883,7 +798,7 @@ func printGlobalDeviceStatus() {
|
||||
|
||||
i := 1
|
||||
for uuid, data := range activeDevices {
|
||||
// 从 deviceStates 获取冷却时间
|
||||
|
||||
stateInf, _ := deviceStates.Load(uuid)
|
||||
state := stateInf.(*deviceAlarmState)
|
||||
cooldownLeft := int64(0)
|
||||
@ -930,7 +845,6 @@ func monitorDeviceStatus(ctx context.Context, initialDevices []connect.DeviceDat
|
||||
}
|
||||
}
|
||||
|
||||
// debugDeviceData Debug device data
|
||||
func debugDeviceData(deviceData connect.DeviceData) {
|
||||
logger.Logger.Printf("Debug device data: UUID=%s", deviceData.DeviceUUID)
|
||||
logger.Logger.Printf(" TaskID: %q", deviceData.TaskID)
|
||||
|
||||
@ -18,8 +18,8 @@ type TemperatureResult struct {
|
||||
|
||||
// TCPClient TCP 客户端
|
||||
type TCPClient struct {
|
||||
addr string // 地址,例如 "127.0.0.1:59881"
|
||||
deviceUUID string // 关联的 DeviceUUID
|
||||
addr string
|
||||
deviceUUID string
|
||||
conn net.Conn
|
||||
mu sync.Mutex
|
||||
}
|
||||
|
||||
@ -212,14 +212,13 @@ func alignTo10Seconds(timestamp int64) int64 {
|
||||
return alignedTime.Unix()
|
||||
}
|
||||
|
||||
// parseSimpleTimestamp 解析简单的时间戳文件名 (如: 20251029181740.mp4)
|
||||
// parseSimpleTimestamp 解析简单的时间戳文件名
|
||||
func parseSimpleTimestamp(filename string) (int64, error) {
|
||||
// 移除文件扩展名
|
||||
|
||||
nameWithoutExt := strings.TrimSuffix(filename, filepath.Ext(filename))
|
||||
|
||||
// 文件名应该就是14位时间戳
|
||||
if len(nameWithoutExt) != 14 {
|
||||
//return 0, fmt.Errorf("无效的时间戳格式: %s (期望14位数字)", nameWithoutExt)
|
||||
|
||||
}
|
||||
|
||||
// 解析时间戳
|
||||
@ -297,7 +296,7 @@ func CleanupOldVideos(deviceUUID string) {
|
||||
func CleanupOldRawVideos(deviceUUID string) {
|
||||
rawDir := filepath.Join("/usr/data/camera", deviceUUID)
|
||||
if _, err := os.Stat(rawDir); os.IsNotExist(err) {
|
||||
return // 目录不存在,直接返回
|
||||
return
|
||||
}
|
||||
|
||||
files, err := os.ReadDir(rawDir)
|
||||
@ -308,7 +307,7 @@ func CleanupOldRawVideos(deviceUUID string) {
|
||||
|
||||
var videoFiles []string
|
||||
for _, file := range files {
|
||||
if !file.IsDir() && filepath.Ext(file.Name()) == ".mp4" { // 假设是mp4,也可以加更多判断
|
||||
if !file.IsDir() && filepath.Ext(file.Name()) == ".mp4" {
|
||||
videoFiles = append(videoFiles, filepath.Join(rawDir, file.Name()))
|
||||
}
|
||||
}
|
||||
@ -317,14 +316,12 @@ func CleanupOldRawVideos(deviceUUID string) {
|
||||
return // 不超过60条,不清理
|
||||
}
|
||||
|
||||
// 按修改时间排序(旧 → 新)
|
||||
sort.Slice(videoFiles, func(i, j int) bool {
|
||||
fi, _ := os.Stat(videoFiles[i])
|
||||
fj, _ := os.Stat(videoFiles[j])
|
||||
return fi.ModTime().Before(fj.ModTime())
|
||||
})
|
||||
|
||||
// 删除最早的(超过60条的部分)
|
||||
for i := 0; i < len(videoFiles)-60; i++ {
|
||||
if err := os.Remove(videoFiles[i]); err != nil {
|
||||
logger.Logger.Printf("Failed to delete the old original video %s: %v", videoFiles[i], err)
|
||||
|
||||
@ -16,18 +16,16 @@ import (
|
||||
|
||||
// UploadToNginx 上传视频到 Nginx 服务器,返回 URL
|
||||
func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string) (string, error) {
|
||||
// === 1. 打开文件 ===
|
||||
|
||||
file, err := os.Open(filePath)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("open file failed: %w", err)
|
||||
}
|
||||
defer file.Close()
|
||||
|
||||
// === 2. 创建表单 ===
|
||||
body := new(bytes.Buffer)
|
||||
writer := multipart.NewWriter(body)
|
||||
|
||||
// --- 上传文件 ---
|
||||
part, err := writer.CreateFormFile("file", filepath.Base(filePath))
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create form file failed: %w", err)
|
||||
@ -36,7 +34,6 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string
|
||||
return "", fmt.Errorf("copy file failed: %w", err)
|
||||
}
|
||||
|
||||
// --- 上传 JSON ---
|
||||
eventJSON, err := json.Marshal(eventParams)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("marshal event failed: %w", err)
|
||||
@ -52,12 +49,10 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string
|
||||
return "", fmt.Errorf("close writer failed: %w", err)
|
||||
}
|
||||
|
||||
// === 3. 打印请求头 ===
|
||||
logger.Logger.Printf("[HTTP Request]URL: %s", uploadURL)
|
||||
logger.Logger.Printf("[HTTP Request]Content-Type: %s", writer.FormDataContentType())
|
||||
logger.Logger.Printf("[HTTP Request]Body 长度: %d bytes", body.Len())
|
||||
|
||||
// === 4. 发送请求 ===
|
||||
req, err := http.NewRequest("POST", uploadURL, body)
|
||||
if err != nil {
|
||||
return "", fmt.Errorf("create request failed: %w", err)
|
||||
@ -71,7 +66,6 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string
|
||||
}
|
||||
defer resp.Body.Close()
|
||||
|
||||
// === 5. 打印响应 ===
|
||||
respBody, _ := io.ReadAll(resp.Body)
|
||||
logger.Logger.Printf("[nginx response]Status: %d", resp.StatusCode)
|
||||
logger.Logger.Printf("[nginx response]Body: %s", string(respBody))
|
||||
@ -80,7 +74,6 @@ func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string
|
||||
return "", fmt.Errorf("nginx error %d: %s", resp.StatusCode, string(respBody))
|
||||
}
|
||||
|
||||
// === 6. 解析 video_url ===
|
||||
var result struct {
|
||||
Code int `json:"code"`
|
||||
Result json.RawMessage `json:"result"`
|
||||
|
||||
Loading…
x
Reference in New Issue
Block a user