初始提交

This commit is contained in:
gqc 2025-11-18 17:30:28 +08:00
commit 7a9d103943
19 changed files with 2797 additions and 0 deletions

8
.idea/.gitignore generated vendored Normal file
View File

@ -0,0 +1,8 @@
# 默认忽略的文件
/shelf/
/workspace.xml
# 基于编辑器的 HTTP 客户端请求
/httpRequests/
# Datasource local storage ignored files
/dataSources/
/dataSources.local.xml

9
.idea/FireLeave_tool.iml generated Normal file
View File

@ -0,0 +1,9 @@
<?xml version="1.0" encoding="UTF-8"?>
<module type="WEB_MODULE" version="4">
<component name="Go" enabled="true" />
<component name="NewModuleRootManager">
<content url="file://$MODULE_DIR$" />
<orderEntry type="inheritedJdk" />
<orderEntry type="sourceFolder" forTests="false" />
</component>
</module>

8
.idea/modules.xml generated Normal file
View File

@ -0,0 +1,8 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="ProjectModuleManager">
<modules>
<module fileurl="file://$PROJECT_DIR$/.idea/FireLeave_tool.iml" filepath="$PROJECT_DIR$/.idea/FireLeave_tool.iml" />
</modules>
</component>
</project>

6
.idea/vcs.xml generated Normal file
View File

@ -0,0 +1,6 @@
<?xml version="1.0" encoding="UTF-8"?>
<project version="4">
<component name="VcsDirectoryMappings">
<mapping directory="$PROJECT_DIR$" vcs="Git" />
</component>
</project>

Binary file not shown.

217
connect/device_storage.go Normal file
View File

@ -0,0 +1,217 @@
package connect
import (
"FireLeave_tool/logger"
"encoding/json"
"fmt"
"os"
"path/filepath"
"strings"
)
// Modified device data storage path
var (
devicesBaseDir = "/data/devices"
deviceDataList []DeviceData
)
// InitializeDevicesFromServiceConfig Initialize device data from service configuration
func InitializeDevicesFromServiceConfig() ([]DeviceData, error) {
deviceDataMutex.Lock()
defer deviceDataMutex.Unlock()
logger.Logger.Printf("Initializing device data from service configuration...")
// Ensure device directory exists
if err := os.MkdirAll(devicesBaseDir, 0755); err != nil {
logger.Logger.Printf("Failed to create device directory: %v", err)
return nil, err
}
logger.Logger.Printf("Device directory confirmed: %s", devicesBaseDir)
// Load from service configuration
configs, err := LoadServiceConfig()
if err != nil {
logger.Logger.Printf("Failed to load from service configuration: %v", err)
return nil, err
}
logger.Logger.Printf("Loaded %d devices from service configuration", len(configs))
// Clear existing device data
deviceDataList = []DeviceData{}
// Process each device and save to individual files
for i, config := range configs {
processedData := ProcessDeviceDataForInference(config)
deviceDataList = append(deviceDataList, processedData)
// Save to individual device file
if err := saveDeviceToFile(processedData); err != nil {
logger.Logger.Printf("Failed to save the device file [%s]: %v", processedData.DeviceUUID, err)
continue
}
logger.Logger.Printf("Processed device [%d]: UUID=%s", i, processedData.DeviceUUID)
}
logger.Logger.Printf("Device data initialization completed, %d devices", len(deviceDataList))
return deviceDataList, nil
}
// saveDeviceToFile Save individual device data to separate file
func saveDeviceToFile(deviceData DeviceData) error {
filePath := filepath.Join(devicesBaseDir, fmt.Sprintf("device_%s.json", deviceData.DeviceUUID))
// Clean device data to ensure no invalid characters
cleanedData := cleanDeviceData(deviceData)
data, err := json.MarshalIndent(cleanedData, "", " ")
if err != nil {
logger.Logger.Printf("Device data serialization failed: %v, Device data: %+v", err, cleanedData)
return fmt.Errorf("Device data serialization failed: %v", err)
}
// Write directly to file
if err := os.WriteFile(filePath, data, 0644); err != nil {
return fmt.Errorf("Failed to write device file: %v", err)
}
logger.Logger.Printf("Device data saved successfully: %s", filePath)
return nil
}
// cleanDeviceData Clean device data, remove possible invalid characters
func cleanDeviceData(data DeviceData) DeviceData {
cleaned := data
// Clean string fields
cleaned.DeviceUUID = cleanString(data.DeviceUUID)
cleaned.TaskID = cleanString(data.TaskID)
cleaned.CameraRTSP = cleanString(data.CameraRTSP)
cleaned.CameraIP = cleanString(data.CameraIP)
cleaned.UploadURL = cleanString(data.UploadURL)
cleaned.DetectAreaStr = cleanString(data.DetectAreaStr)
// Ensure numeric fields are valid
if cleaned.Confidence < 0 || cleaned.Confidence > 100 {
cleaned.Confidence = 70 // Default value
}
if cleaned.CameraChannel < 0 {
cleaned.CameraChannel = 0
}
if cleaned.PersonCount < 0 {
cleaned.PersonCount = 0
}
if cleaned.Temperature < 0 {
cleaned.Temperature = 0
}
if cleaned.TemperatureThreshold <= 0 {
cleaned.TemperatureThreshold = 45.0 // Default value
}
return cleaned
}
func cleanString(s string) string {
// Remove control characters and invalid Unicode characters
return strings.Map(func(r rune) rune {
if r == 0xFFFD { // Unicode replacement character
return -1
}
if r < 32 && r != 9 && r != 10 && r != 13 { // Keep tab, newline, carriage return
return -1
}
return r
}, s)
}
// GetDeviceDataList Get device data list
func GetDeviceDataList() []DeviceData {
deviceDataMutex.Lock()
defer deviceDataMutex.Unlock()
// If no data in memory, try to load from device files
if len(deviceDataList) == 0 {
logger.Logger.Printf("No device data in memory, loading from device files...")
if err := loadDevicesFromFiles(); err != nil {
logger.Logger.Printf("Failed to load from device files: %v", err)
return []DeviceData{}
}
}
// Return copy
result := make([]DeviceData, len(deviceDataList))
copy(result, deviceDataList)
return result
}
// loadDevicesFromFiles Load data from device files
func loadDevicesFromFiles() error {
// Clear existing data
deviceDataList = []DeviceData{}
// Read all files in device directory
files, err := filepath.Glob(filepath.Join(devicesBaseDir, "device_*.json"))
if err != nil {
return fmt.Errorf("Failed to find device files: %v", err)
}
logger.Logger.Printf("Found %d device files", len(files))
for _, file := range files {
data, err := os.ReadFile(file)
if err != nil {
logger.Logger.Printf("Failed to read device file %s: %v", file, err)
continue
}
var deviceData DeviceData
if err := json.Unmarshal(data, &deviceData); err != nil {
logger.Logger.Printf("Failed to parse device file %s: %v", file, err)
continue
}
deviceDataList = append(deviceDataList, deviceData)
logger.Logger.Printf("Loaded device from file: UUID=%s", deviceData.DeviceUUID)
}
logger.Logger.Printf("Loaded %d devices from device files", len(deviceDataList))
return nil
}
// UpdateDeviceData Update device data
func UpdateDeviceData(data DeviceData) error {
deviceDataMutex.Lock()
defer deviceDataMutex.Unlock()
//logger.Logger.Printf("Updating device data [%s]", data.DeviceUUID)
// Clean input data
cleanedData := cleanDeviceData(data)
// Find if device with same DeviceUUID already exists
found := false
for i, existing := range deviceDataList {
if existing.DeviceUUID == cleanedData.DeviceUUID {
deviceDataList[i] = cleanedData
found = true
//logger.Logger.Printf("Updated existing device: DeviceUUID=%s", cleanedData.DeviceUUID)
break
}
}
// If not exists, add to list
if !found {
logger.Logger.Printf("Adding new device to list: DeviceUUID=%s", cleanedData.DeviceUUID)
deviceDataList = append(deviceDataList, cleanedData)
}
// Save to individual file
if err := saveDeviceToFile(cleanedData); err != nil {
logger.Logger.Printf("Failed to save device file: %v", err)
return err
}
logger.Logger.Printf("Device data update completed: DeviceUUID=%s", cleanedData.DeviceUUID)
return nil
}

96
connect/port_manager.go Normal file
View File

@ -0,0 +1,96 @@
package connect
import (
"errors"
"fmt"
"net"
"sync"
"FireLeave_tool/logger"
)
// PortManager 管理端口分配
type PortManager struct {
mu sync.Mutex
allocated map[string]int // DeviceUUID 到推理端口的映射
portPool []int // 推理端口池HTTP
available map[int]bool // 可用端口标记
}
func NewPortManager() *PortManager {
// 推理端口范围58881-58890HTTP支持最多10个设备
ports := []int{58881, 58882, 58883, 58884, 58885, 58886, 58887, 58888, 58889, 58890}
// 初始化可用端口映射
available := make(map[int]bool)
for _, port := range ports {
available[port] = true
}
return &PortManager{
allocated: make(map[string]int),
portPool: ports,
available: available,
}
}
func (pm *PortManager) AllocatePort(deviceUUID string) (int, error) {
pm.mu.Lock()
defer pm.mu.Unlock()
// 检查设备是否已分配端口
if port, exists := pm.allocated[deviceUUID]; exists {
logger.Logger.Printf("Device %s already allocated port: %d", deviceUUID, port)
return port, nil
}
// 查找可用端口
for _, port := range pm.portPool {
if !pm.available[port] {
continue // 端口已被分配
}
tempPort := port + 1000 // 温度端口TCP
// 检查推理端口HTTP是否可用
inferenceLn, err := net.Listen("tcp", fmt.Sprintf(":%d", port))
if err != nil {
logger.Logger.Printf("Port %d (inference) is not available: %v", port, err)
continue
}
inferenceLn.Close()
// 检查温度端口TCP是否可用
tempLn, err := net.Listen("tcp", fmt.Sprintf(":%d", tempPort))
if err != nil {
logger.Logger.Printf("Port %d (temperature) is not available: %v", tempPort, err)
continue
}
tempLn.Close()
// 端口可用,记录分配
pm.allocated[deviceUUID] = port
pm.available[port] = false
logger.Logger.Printf("Port allocated for device %s: inference=%d, temperature=%d",
deviceUUID, port, tempPort)
return port, nil
}
return 0, errors.New("no available ports for inference and temperature")
}
func (pm *PortManager) ReleasePort(deviceUUID string) {
pm.mu.Lock()
defer pm.mu.Unlock()
// 删除设备端口映射
if port, exists := pm.allocated[deviceUUID]; exists {
delete(pm.allocated, deviceUUID)
pm.available[port] = true // 标记端口为可用
logger.Logger.Printf("Port released for device %s: %d", deviceUUID, port)
}
}
// GlobalPortManager 全局端口管理器
var GlobalPortManager = NewPortManager()

720
connect/ws_channel.go Normal file
View File

@ -0,0 +1,720 @@
package connect
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
"FireLeave_tool/logger"
"github.com/gorilla/websocket"
)
// Global variables
var (
wsClient *WSClient
wsMutex sync.RWMutex
isConnecting bool
wsURL = "ws://172.17.0.1:18080/ws" // Updated to specified address
deviceDataMutex sync.Mutex
ConfigUpdateChan chan DeviceData
currentServiceID string // New: current service's server_id
)
func GetWSClient() *WSClient {
wsMutex.RLock()
defer wsMutex.RUnlock()
return wsClient
}
func (c *WSClient) IsConnected() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.connected
}
const (
serviceConfigFile = "/usr/local/etc/service.conf"
)
// DeviceData struct
type DeviceData struct {
DeviceUUID string `json:"device_uuid"`
TaskID string `json:"task_id"`
CameraRTSP string `json:"camera_rtsp"`
CameraIP string `json:"camera_ip"`
Confidence int `json:"confidence"`
DetectArea []Area `json:"detect_area"`
DetectAreaStr string `json:"detect_area_str,omitempty"`
PersonCount int `json:"person_count"`
DetectionTime json.Number `json:"detection_time"` // Using json.Number
AlarmTime json.Number `json:"alarm_time"` // Using json.Number
TemperatureThreshold float64 `json:"temperature_threshold"`
Temperature float64 `json:"temperature"`
UploadURL string `json:"upload_url"`
WSConn *websocket.Conn
CameraChannel int `json:"camera_channel"`
LastAlarmTime int64 `json:"-"` // Last alarm time
AlarmPauseUntil int64 `json:"-"` // Pause alarm untilw
HostUUID string `json:"host_uuid"`
Address string `json:"address"`
}
type InferenceParams struct {
DeviceUUID string `json:"device_uuid"`
CameraRTSP string `json:"camera_rtsp"`
CameraChannel int `json:"camera_channel"`
Confidence int `json:"confidence"`
DetectArea string `json:"detect_area"` // String format detection area
Port int `json:"port"` // HTTP server port
}
type Area struct {
ID int `json:"id"`
Pts []Point `json:"pts"`
}
type Point struct {
X float64 `json:"x"`
Y float64 `json:"y"`
}
// WSMessage struct
type WSMessage struct {
ID int64 `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"` // Modified: string -> json.RawMessage
Error *WSError `json:"error,omitempty"`
}
type WSError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type RegisterParams struct {
ContainerName string `json:"container_name,omitempty"`
ServiceID string `json:"service_id,omitempty"`
MetaData map[string]interface{} `json:"meta_data,omitempty"` // New: metadata field
}
type SubscribeParams struct {
Topic string `json:"topic"`
}
type PublishParams struct {
EventType string `json:"event_type,omitempty"`
Body json.RawMessage `json:"body,omitempty"`
Topic string `json:"topic,omitempty"`
Data json.RawMessage `json:"data,omitempty"`
}
type WSClient struct {
Conn *websocket.Conn
url string
serviceID string
serverID string
connected bool
mutex sync.Mutex
messageID int64
callbacks map[int64]chan *WSMessage
subscribers map[string]func([]byte)
closeChan chan struct{}
reconnectFlag bool
}
// NewWSClient creates WebSocket client
func NewWSClient(url string) (*WSClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
client := &WSClient{
Conn: conn,
url: url,
connected: true,
messageID: 1,
callbacks: make(map[int64]chan *WSMessage),
subscribers: make(map[string]func([]byte)),
closeChan: make(chan struct{}),
reconnectFlag: true,
}
go client.readLoop()
go client.heartbeatLoop()
return client, nil
}
func (c *WSClient) connect() error {
conn, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
return err
}
c.Conn = conn
c.connected = true
go c.readLoop()
go c.heartbeatLoop()
return nil
}
// InitWSChannel initializes WebSocket
func InitWSChannel() error {
wsMutex.Lock()
defer wsMutex.Unlock()
if wsClient != nil && wsClient.connected {
logger.Logger.Printf("WebSocket connection already exists")
return nil
}
if isConnecting {
logger.Logger.Printf("WebSocket connection is being established")
return fmt.Errorf("WebSocket connection is being established")
}
isConnecting = true
defer func() { isConnecting = false }()
logger.Logger.Printf("Starting WebSocket connection initialization: %s", wsURL)
// Connect WebSocket asynchronously to avoid blocking
go func() {
client, err := NewWSClient(wsURL)
if err != nil {
logger.Logger.Printf("Failed to create WebSocket client: %v", err)
return
}
if ConfigUpdateChan == nil {
ConfigUpdateChan = make(chan DeviceData, 100)
}
// Note: No longer calling LoadDeviceData() here to avoid duplicate loading
wsClient = client
logger.Logger.Printf("WebSocket connection initialized successfully (asynchronous)")
}()
return nil
}
// New: function to generate server_id
func generateServiceID() string {
return fmt.Sprintf("fireleave-service-%d", time.Now().UnixNano())
}
// New: set current server_id
//func SetServerID(serverID string) {
// currentServerID = serverID
//}
func GetServiceID() string {
if currentServiceID == "" {
currentServiceID = generateServiceID()
}
return currentServiceID
}
// RegisterService service registration - modified: now requires providing own server_id
func RegisterService(containerName string) (string, error) {
if wsClient == nil {
return "", fmt.Errorf("WebSocket client not initialized")
}
serviceID := GetServiceID()
params := RegisterParams{
ContainerName: containerName,
ServiceID: serviceID,
MetaData: map[string]interface{}{ // Add metadata
"version": "1.0",
"type": "fire_leave",
},
}
logger.Logger.Printf("Starting service registration: ContainerName=%s, ServiceID=%s", containerName, serviceID)
response, err := wsClient.sendRequest("register", params, true)
if err != nil {
logger.Logger.Printf("Service registration failed: %v", err)
return "", err
}
if response.Error != nil {
logger.Logger.Printf("Service registration error: Code=%d, Message=%s", response.Error.Code, response.Error.Message)
return "", fmt.Errorf("Service registration error: %s", response.Error.Message)
}
// Parse response
var resultStr string
if err := json.Unmarshal(response.Result, &resultStr); err != nil {
logger.Logger.Printf("Failed to parse service registration response: %v, Raw response: %s", err, string(response.Result))
return "", fmt.Errorf("Failed to parse service registration response: %v", err)
}
if resultStr != "ok" {
logger.Logger.Printf("Service registration failed: Expected result 'ok', got '%s'", resultStr)
return "", fmt.Errorf("Service registration failed: Server returned abnormal response")
}
wsClient.serviceID = serviceID
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
return serviceID, nil
}
// SubscribeTopic topic subscription
func SubscribeTopic(topic string, callback func([]byte)) error {
if wsClient == nil {
return fmt.Errorf("WebSocket client not initialized")
}
params := SubscribeParams{
Topic: topic,
}
logger.Logger.Printf("Starting topic subscription: Topic=%s", topic)
response, err := wsClient.sendRequest("subscribe", params, true)
if err != nil {
logger.Logger.Printf("Topic subscription failed: %v", err)
return err
}
if response.Error != nil {
logger.Logger.Printf("Topic subscription error: Code=%d, Message=%s", response.Error.Code, response.Error.Message)
return fmt.Errorf("Topic subscription error: %s", response.Error.Message)
}
wsClient.mutex.Lock()
wsClient.subscribers[topic] = callback
wsClient.mutex.Unlock()
logger.Logger.Printf("Topic subscription successful: Topic=%s", topic)
return nil
}
func (c *WSClient) SendRawMessage(message interface{}) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.connected {
return fmt.Errorf("WebSocket connection disconnected")
}
messageData, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("Message serialization failed: %v", err)
}
logger.Logger.Printf("Sending WebSocket message: %s", string(messageData))
err = c.Conn.WriteMessage(websocket.TextMessage, messageData)
if err != nil {
logger.Logger.Printf("WebSocket message sending failed: %v", err)
c.connected = false
go c.reconnect()
return fmt.Errorf("Message sending failed: %v", err)
}
return nil
}
// sendRequest sends request
func (c *WSClient) sendRequest(method string, params interface{}, waitResponse bool) (*WSMessage, error) {
c.mutex.Lock()
id := c.messageID
c.messageID++
c.mutex.Unlock()
message := WSMessage{
ID: id,
Method: method,
}
if params != nil {
paramsData, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("Parameter serialization failed: %v", err)
}
message.Params = paramsData
}
if waitResponse {
callback := make(chan *WSMessage, 1)
c.mutex.Lock()
c.callbacks[id] = callback
c.mutex.Unlock()
defer func() {
c.mutex.Lock()
delete(c.callbacks, id)
c.mutex.Unlock()
logger.Logger.Printf("Cleaning up callback channel: ID=%d", id)
}()
if err := c.SendRawMessage(message); err != nil {
return nil, err
}
logger.Logger.Printf("Waiting for response: ID=%d", id)
select {
case response := <-callback:
logger.Logger.Printf("Received response: ID=%d", id)
return response, nil
case <-time.After(30 * time.Second):
logger.Logger.Printf("Response timeout: ID=%d", id)
return nil, fmt.Errorf("Response timeout")
case <-c.closeChan:
logger.Logger.Printf("Connection closed, stopping response wait: ID=%d", id)
return nil, fmt.Errorf("Connection closed")
}
}
return nil, c.SendRawMessage(message)
}
// readLoop message reading loop
func (c *WSClient) readLoop() {
logger.Logger.Printf("Starting WebSocket message reading loop")
defer func() {
if r := recover(); r != nil {
logger.Logger.Printf("readLoop panic: %v", r)
}
// Important: do not close(closeChan)!
logger.Logger.Printf("WebSocket read loop exited, connection marked as disconnected")
}()
for {
_, messageData, err := c.Conn.ReadMessage()
if err != nil {
logger.Logger.Printf("WebSocket message reading failed: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
go c.reconnect() // Trigger reconnection
return
}
if string(messageData) == "pong" {
logger.Logger.Printf("Received pong response")
continue
}
logger.Logger.Printf("Received WebSocket message: %s", string(messageData))
var message WSMessage
if err := json.Unmarshal(messageData, &message); err != nil {
logger.Logger.Printf("WebSocket message parsing failed: %v", err)
continue
}
if message.ID > 0 {
c.mutex.Lock()
if callback, exists := c.callbacks[message.ID]; exists {
select {
case callback <- &message:
logger.Logger.Printf("Successfully sent message to callback channel: ID=%d", message.ID)
default:
logger.Logger.Printf("Callback channel full, discarding message: ID=%d", message.ID)
}
}
c.mutex.Unlock()
}
if message.Method == "publish" {
c.handlePublishMessage(message)
}
}
}
// handlePublishMessage handles publish messages
func (c *WSClient) handlePublishMessage(message WSMessage) {
logger.Logger.Printf("Processing publish message")
var publishParams PublishParams
if err := json.Unmarshal(message.Params, &publishParams); err != nil {
logger.Logger.Printf("Failed to parse publish message parameters: %v", err)
return
}
if publishParams.Topic != "" {
c.mutex.Lock()
callback, exists := c.subscribers[publishParams.Topic]
c.mutex.Unlock()
if exists && callback != nil {
callback(message.Params)
}
}
if publishParams.Data != nil {
var deviceData DeviceData
if err := json.Unmarshal(publishParams.Data, &deviceData); err != nil {
logger.Logger.Printf("Failed to parse DeviceData: %v", err)
return
}
// Check if DeviceUUID is valid
if deviceData.DeviceUUID == "" {
logger.Logger.Printf("Invalid DeviceData: DeviceUUID is empty, ignoring")
return
}
logger.Logger.Printf("Received valid DeviceData: DeviceUUID=%s", deviceData.DeviceUUID)
// 1. Save original configuration to service config file (using SaveSingleServiceConfig)
if err := SaveSingleServiceConfig(deviceData); err != nil {
logger.Logger.Printf("Failed to save service configuration: %v", err)
}
// 2. Process data and save to device data file
processedData := ProcessDeviceDataForInference(deviceData)
if err := UpdateDeviceData(processedData); err != nil {
logger.Logger.Printf("Failed to update DeviceData: %v", err)
return
}
select {
case ConfigUpdateChan <- processedData:
logger.Logger.Printf("New DeviceData sent to config update channel: DeviceUUID=%s", processedData.DeviceUUID)
default:
logger.Logger.Printf("Config update channel full, discarding message")
}
}
}
// heartbeatLoop heartbeat loop - fixed version
func (c *WSClient) heartbeatLoop() {
logger.Logger.Printf("Starting WebSocket heartbeat loop")
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mutex.Lock()
connected := c.connected
conn := c.Conn
c.mutex.Unlock()
if connected && conn != nil {
err := conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
logger.Logger.Printf("Failed to send ping: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
go c.reconnect()
} else {
logger.Logger.Printf("Ping sent successfully")
}
}
case <-c.closeChan:
logger.Logger.Printf("Heartbeat loop received close signal, exiting")
ticker.Stop() // Explicitly stop ticker
return
}
}
}
func (c *WSClient) reRegisterAndResubscribe() {
// 1. 等待连接稳定
time.Sleep(1 * time.Second)
// 2. 重新注册
containerName := "fireleave-container"
serviceID, err := RegisterService(containerName)
if err != nil {
logger.Logger.Printf("Failed to re-register service after reconnect: %v", err)
return
}
logger.Logger.Printf("Re-registration successful: ServiceID=%s", serviceID)
// 3. 重新订阅
for topic, callback := range c.subscribers {
if err := SubscribeTopic(topic, callback); err != nil {
logger.Logger.Printf("Failed to re-subscribe topic %s: %v", topic, err)
} else {
logger.Logger.Printf("Re-subscribed to topic: %s", topic)
}
}
}
// connect/wschannel.go → reconnect complete fix
func (c *WSClient) reconnect() {
if !c.reconnectFlag {
return
}
for i := 0; i < 12; i++ {
delay := time.Duration(1<<uint(i)) * time.Second
if delay > 30*time.Second {
delay = 30 * time.Second
}
time.Sleep(delay)
conn, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
logger.Logger.Printf("Reconnection attempt %d/12 failed: %v", i+1, err)
continue
}
c.mutex.Lock()
c.Conn = conn
c.connected = true
c.mutex.Unlock()
logger.Logger.Printf("WebSocket reconnection successful!")
// 关键:重连后必须先 register
go c.reRegisterAndResubscribe()
// 重启读写循环
go c.readLoop()
go c.heartbeatLoop()
return
}
logger.Logger.Printf("Reconnection failed after 12 attempts")
c.reconnectFlag = false
}
func (d *DeviceData) ToInferenceParams(port int) InferenceParams {
return InferenceParams{
DeviceUUID: d.DeviceUUID,
CameraRTSP: d.CameraRTSP,
CameraChannel: d.CameraChannel,
Confidence: d.Confidence,
DetectArea: ProcessDetectAreaForInference(d.DetectArea),
Port: port,
}
}
// SaveServiceConfig saves multiple device configurations to service config file
func SaveServiceConfig(deviceDataList []DeviceData) error {
configData, err := json.MarshalIndent(deviceDataList, "", " ")
if err != nil {
logger.Logger.Printf("Failed to serialize service configuration: %v", err)
return err
}
if err := os.WriteFile(serviceConfigFile, configData, 0644); err != nil {
logger.Logger.Printf("Failed to write service config file: %v", err)
return err
}
logger.Logger.Printf("Service configuration saved to: %s, %d devices", serviceConfigFile, len(deviceDataList))
return nil
}
// SaveSingleServiceConfig saves single device configuration (compatible with existing calls)
func SaveSingleServiceConfig(deviceData DeviceData) error {
// First read existing configuration
existingConfigs, err := LoadServiceConfig()
if err != nil {
// If reading fails, create new configuration list
existingConfigs = []DeviceData{deviceData}
} else {
// Find if configuration with same DeviceUUID already exists
found := false
for i, existing := range existingConfigs {
if existing.DeviceUUID == deviceData.DeviceUUID {
existingConfigs[i] = deviceData
found = true
break
}
}
// If not exists, add to list
if !found {
existingConfigs = append(existingConfigs, deviceData)
}
}
return SaveServiceConfig(existingConfigs)
}
// LoadServiceConfig loads multiple device data from service config file
func LoadServiceConfig() ([]DeviceData, error) {
if _, err := os.Stat(serviceConfigFile); os.IsNotExist(err) {
logger.Logger.Printf("Service config file does not exist: %s", serviceConfigFile)
return nil, fmt.Errorf("service config file does not exist")
}
data, err := os.ReadFile(serviceConfigFile)
if err != nil {
logger.Logger.Printf("Failed to read service config file: %v", err)
return nil, err
}
// Check if file content is empty
if len(data) == 0 {
logger.Logger.Printf("Service config file is empty")
return nil, fmt.Errorf("service config file is empty")
}
// Parse as DeviceData array
var deviceDataList []DeviceData
if err := json.Unmarshal(data, &deviceDataList); err != nil {
logger.Logger.Printf("Failed to parse service config file: %v", err)
return nil, err
}
// Filter out configurations with empty DeviceUUID
var validDeviceDataList []DeviceData
for _, deviceData := range deviceDataList {
if deviceData.DeviceUUID != "" {
validDeviceDataList = append(validDeviceDataList, deviceData)
}
}
const FixedUploadURL = "http://192.168.80.168/api/v1/upload/video"
for i := range validDeviceDataList {
if validDeviceDataList[i].UploadURL == "" {
validDeviceDataList[i].UploadURL = FixedUploadURL
logger.Logger.Printf("Auto-completed UploadURL: %s (DeviceUUID=%s)",
FixedUploadURL, validDeviceDataList[i].DeviceUUID)
}
}
if len(validDeviceDataList) == 0 {
logger.Logger.Printf("No valid device configurations found in service config file")
return nil, fmt.Errorf("no valid device configurations")
}
logger.Logger.Printf("Loaded %d device configurations from service config file", len(validDeviceDataList))
return validDeviceDataList, nil
}
// ProcessDetectAreaForInference processes detection area data, converts to format required by inference program
func ProcessDetectAreaForInference(areas []Area) string {
if len(areas) == 0 {
return ""
}
// Only take the first detection area
area := areas[0]
var points []string
for _, point := range area.Pts {
// Directly round, remove decimals
x := int(point.X)
y := int(point.Y)
points = append(points, fmt.Sprintf("%d,%d", x, y))
}
return strings.Join(points, ",")
}
func ProcessDeviceDataForInference(data DeviceData) DeviceData {
// Create a copy, only process fields that need conversion
processed := data
processed.DetectAreaStr = ProcessDetectAreaForInference(data.DetectArea)
return processed
}
func (c *WSClient) SendAsync(message interface{}) {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()
done := make(chan error, 1)
go func() {
done <- c.SendRawMessage(message)
}()
select {
case err := <-done:
if err != nil {
logger.Logger.Printf("Asynchronous reporting failed: %v", err)
} else {
logger.Logger.Printf("Asynchronous reporting successful")
}
case <-ctx.Done():
logger.Logger.Printf("Asynchronous reporting timeout (8 seconds no response), continuing business process")
}
}()
}

BIN
fireleave_tool Normal file

Binary file not shown.

5
go.mod Normal file
View File

@ -0,0 +1,5 @@
module FireLeave_tool
go 1.19
require github.com/gorilla/websocket v1.5.3

2
go.sum Normal file
View File

@ -0,0 +1,2 @@
github.com/gorilla/websocket v1.5.3 h1:saDtZ6Pbx/0u+bgYQ3q96pZgCzfhKXGPqt7kZ72aNNg=
github.com/gorilla/websocket v1.5.3/go.mod h1:YR8l580nyteQvAITg2hZ9XVh4b55+EU/adAjf1fMHhE=

181
httpreader/httpreader.go Normal file
View File

@ -0,0 +1,181 @@
package httpreader
import (
"bytes"
"context"
"encoding/json"
"fmt"
"io"
"net/http"
"sync"
"time"
"FireLeave_tool/logger"
)
// InferenceResult 推理结果
type InferenceResult struct {
Serial string `json:"serial"`
At int64 `json:"at"`
Type int `json:"type"`
Params []struct {
ClassIdx int `json:"class_idx"`
Name string `json:"name"`
Number int `json:"number"`
} `json:"params"`
}
// InferenceServer 推理数据接收服务器
type InferenceServer struct {
port int
deviceUUID string
server *http.Server
dataChan chan InferenceResult
mu sync.RWMutex
running bool
}
// NewInferenceServer 创建推理数据接收服务器
func NewInferenceServer(deviceUUID string, port int) *InferenceServer {
return &InferenceServer{
port: port,
deviceUUID: deviceUUID,
dataChan: make(chan InferenceResult, 100), // 缓冲通道,避免阻塞
running: false,
}
}
// Start 启动HTTP服务器
func (is *InferenceServer) Start() error {
is.mu.Lock()
defer is.mu.Unlock()
if is.running {
return fmt.Errorf("服务器已在运行")
}
mux := http.NewServeMux()
mux.HandleFunc("/video/post", is.handleVideoPost)
mux.HandleFunc("/health", is.handleHealthCheck)
is.server = &http.Server{
Addr: fmt.Sprintf(":%d", is.port),
Handler: mux,
ReadTimeout: 10 * time.Second,
WriteTimeout: 10 * time.Second,
}
go func() {
logger.Logger.Printf("Start the inference data receiving server (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port)
if err := is.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
logger.Logger.Printf("The inference data receiving server failed to start (DeviceUUID=%s): %v", is.deviceUUID, err)
}
}()
// 等待服务器启动
time.Sleep(100 * time.Millisecond)
is.running = true
logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port)
return nil
}
// handleVideoPost 处理视频数据POST请求
func (is *InferenceServer) handleVideoPost(w http.ResponseWriter, r *http.Request) {
startTime := time.Now()
if r.Method != "POST" {
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
return
}
// 读取原始 body
bodyBytes, err := io.ReadAll(r.Body)
if err != nil {
logger.Logger.Printf("Read request body failed: %v", err)
http.Error(w, "Bad request", http.StatusBadRequest)
return
}
// 重新设置 body 用于解析
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
var result InferenceResult
if err := json.NewDecoder(r.Body).Decode(&result); err != nil {
logger.Logger.Printf("Parse JSON failed: %v", err)
http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest)
return
}
// 验证必要字段
if result.Serial == "" {
logger.Logger.Printf("Data validation failed: Serial is empty")
http.Error(w, `{"error":"Missing serial field"}`, http.StatusBadRequest)
return
}
// 只在人员数量变化或重要事件时记录详细日志
if len(result.Params) > 0 {
hasPerson := false
for _, param := range result.Params {
if param.ClassIdx == 1 && param.Number > 0 {
hasPerson = true
break
}
}
if hasPerson {
logger.Logger.Printf("Inference data: Serial=%s, Person detected, ParamsCount=%d",
result.Serial, len(result.Params))
}
}
// 发送到处理通道
select {
case is.dataChan <- result:
w.Header().Set("Content-Type", "application/json")
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{"status":"ok", "message":"data received"}`))
// 使用 startTime 记录处理时间
logger.Logger.Printf("Inference data processed in %v", time.Since(startTime))
default:
logger.Logger.Printf("Data channel full, discard data")
http.Error(w, `{"error":"Channel full"}`, http.StatusServiceUnavailable)
}
}
func (is *InferenceServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) {
w.Header().Set("Content-Type", "application/json")
is.mu.RLock()
running := is.running
is.mu.RUnlock()
if running {
w.WriteHeader(http.StatusOK)
w.Write([]byte(`{
"status": "healthy",
"device_uuid": "` + is.deviceUUID + `",
"port": ` + fmt.Sprintf("%d", is.port) + `,
"timestamp": "` + time.Now().Format(time.RFC3339) + `"
}`))
} else {
w.WriteHeader(http.StatusServiceUnavailable)
w.Write([]byte(`{"status": "unavailable"}`))
}
}
func (is *InferenceServer) GetDataChan() <-chan InferenceResult {
return is.dataChan
}
// StopWithContext 带上下文的停止方法
func (s *InferenceServer) StopWithContext(ctx context.Context) {
if s.server != nil {
// 使用带超时的关闭
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
s.server.Shutdown(shutdownCtx)
logger.Logger.Printf("The HTTP server has been shut down (DeviceUUID=%s)", s.deviceUUID)
}
}

121
httpreader/utils.go Normal file
View File

@ -0,0 +1,121 @@
package httpreader
import (
"fmt"
"net"
"net/http"
"time"
"FireLeave_tool/logger"
)
// CheckHTTPService 检查HTTP服务是否可用
func CheckHTTPService(port int, timeout time.Duration) bool {
client := &http.Client{
Timeout: timeout,
}
url := fmt.Sprintf("http://127.0.0.1:%d/health", port)
resp, err := client.Get(url)
if err != nil {
logger.Logger.Printf("HTTP服务健康检查失败 (Port=%d): %v", port, err)
return false
}
defer resp.Body.Close()
return resp.StatusCode == http.StatusOK
}
// CheckPortAvailable 检查端口是否可用
func CheckPortAvailable(port int) bool {
address := fmt.Sprintf(":%d", port)
listener, err := net.Listen("tcp", address)
if err != nil {
return false
}
listener.Close()
return true
}
// WaitForHTTPService 等待HTTP服务启动
func WaitForHTTPService(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
logger.Logger.Printf("等待HTTP服务启动 (Port=%d, Timeout=%v)", port, timeout)
attempt := 0
for time.Now().Before(deadline) {
attempt++
if CheckHTTPService(port, 2*time.Second) {
logger.Logger.Printf("HTTP服务启动成功 (Port=%d, 尝试次数=%d)", port, attempt)
return true
}
if attempt == 1 || attempt%5 == 0 {
logger.Logger.Printf("等待HTTP服务... (Port=%d, 尝试次数=%d)", port, attempt)
}
time.Sleep(1 * time.Second)
}
logger.Logger.Printf("HTTP服务启动超时 (Port=%d, 总尝试次数=%d)", port, attempt)
return false
}
// ValidateInferenceResult 验证推理结果数据
func ValidateInferenceResult(result *InferenceResult) error {
if result.Serial == "" {
return fmt.Errorf("serial字段不能为空")
}
if result.At <= 0 {
return fmt.Errorf("at时间戳无效")
}
// 验证参数数据
for i, param := range result.Params {
if param.Number < 0 {
return fmt.Errorf("参数[%d]的number不能为负数", i)
}
if param.ClassIdx < 0 {
return fmt.Errorf("参数[%d]的class_idx不能为负数", i)
}
}
return nil
}
// GetPersonCountFromResult 从推理结果中获取人员数量
func GetPersonCountFromResult(result *InferenceResult) int {
if result == nil || len(result.Params) == 0 {
return 0
}
// 查找人员类别的参数(假设 class_idx=1 表示人员)
for _, param := range result.Params {
if param.ClassIdx == 1 { // 人员类别
return param.Number
}
}
return 0
}
// CreateTestInferenceResult 创建测试用的推理结果(用于调试)
func CreateTestInferenceResult(deviceUUID string, personCount int) InferenceResult {
return InferenceResult{
Serial: deviceUUID,
At: time.Now().Unix(),
Type: 1,
Params: []struct {
ClassIdx int `json:"class_idx"`
Name string `json:"name"`
Number int `json:"number"`
}{
{
ClassIdx: 1,
Name: "person",
Number: personCount,
},
},
}
}

111
logger/logger.go Normal file
View File

@ -0,0 +1,111 @@
package logger
import (
"log"
"os"
"path/filepath"
"sync"
"time"
)
// Logger 全局日志实例
var Logger *log.Logger
var logFile *os.File
var logFileName string
var mu sync.Mutex
// InitLogger 初始化日志,创建 /data/logs 目录并按天命名文件
func InitLogger() error {
mu.Lock()
defer mu.Unlock()
// 创建 /data/logs 目录
logDir := "/data/logs"
if err := os.MkdirAll(logDir, 0755); err != nil {
return err
}
// 按天生成日志文件名20260102.log
logFileName = filepath.Join(logDir, time.Now().Format("20060102")+".log")
file, err := os.OpenFile(logFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
return err
}
logFile = file
Logger = log.New(file, "", log.LstdFlags)
// 每天凌晨检查是否需要切换日志文件
go func() {
for {
// 每天检查一次(凌晨 00:05 执行)
now := time.Now()
next := time.Date(now.Year(), now.Month(), now.Day()+1, 0, 5, 0, 0, now.Location())
time.Sleep(time.Until(next))
mu.Lock()
newFileName := filepath.Join(logDir, time.Now().Format("20060102")+".log")
if newFileName != logFileName {
newFile, err := os.OpenFile(newFileName, os.O_APPEND|os.O_CREATE|os.O_WRONLY, 0644)
if err != nil {
log.Printf("Failed to switch the log file: %v", err)
mu.Unlock()
continue
}
logFile.Close()
logFile = newFile
logFileName = newFileName
Logger = log.New(newFile, "", log.LstdFlags)
Logger.Println("The log file has been switched:", newFileName)
}
mu.Unlock()
}
}()
return nil
}
// CloseLogger 关闭日志文件
func CloseLogger() {
mu.Lock()
defer mu.Unlock()
if logFile != nil {
logFile.Close()
logFile = nil
}
}
// CleanupOldLogs 删除超过30天的日志文件
func CleanupOldLogs() {
logDir := "/data/logs"
files, err := os.ReadDir(logDir)
if err != nil {
if Logger != nil {
Logger.Printf("Read the log directory %s Error: %v", logDir, err)
}
return
}
const maxAge = 30 * 24 * time.Hour // 30天
for _, file := range files {
if file.IsDir() {
continue
}
info, err := file.Info()
if err != nil {
if Logger != nil {
Logger.Printf("Obtain file information %s Error: %v", file.Name(), err)
}
continue
}
if time.Since(info.ModTime()) > maxAge {
filePath := filepath.Join(logDir, file.Name())
if err := os.Remove(filePath); err != nil {
if Logger != nil {
Logger.Printf("Delete the old log files %s Error: %v", filePath, err)
}
} else {
if Logger != nil {
Logger.Printf("Delete the old log files %s succeed", filePath)
}
}
}
}
}

843
main.go Normal file
View File

@ -0,0 +1,843 @@
package main
import (
"FireLeave_tool/connect"
"FireLeave_tool/httpreader"
"FireLeave_tool/logger"
"FireLeave_tool/tcpreader"
"FireLeave_tool/video_server"
"context"
"encoding/json"
"flag"
"fmt"
"log"
"math"
"net"
"net/http"
"os"
"os/exec"
"os/signal"
"path/filepath"
"sync"
"syscall"
"time"
)
type deviceAlarmState struct {
InCondition bool
StartTime int64
AlarmPauseUntil int64
}
// 全局 map每个设备独立状态
var deviceStates sync.Map // map[string]*deviceAlarmState
var mergingAlarms sync.Map
// DeviceManager 设备管理器
type DeviceManager struct {
mu sync.Mutex
devices map[string]*connect.DeviceData
}
// NewDeviceManager 初始化设备管理器
func NewDeviceManager() *DeviceManager {
return &DeviceManager{
devices: make(map[string]*connect.DeviceData),
}
}
func getDeviceState(deviceUUID string) *deviceAlarmState {
if s, ok := deviceStates.Load(deviceUUID); ok {
return s.(*deviceAlarmState)
}
s := &deviceAlarmState{}
deviceStates.Store(deviceUUID, s)
return s
}
// AddDevice 添加设备
func (dm *DeviceManager) AddDevice(deviceData connect.DeviceData) {
dm.mu.Lock()
defer dm.mu.Unlock()
dm.devices[deviceData.DeviceUUID] = &deviceData
logger.Logger.Printf("add equipments: DeviceUUID=%s", deviceData.DeviceUUID)
}
// StopDevice 停止设备
func (dm *DeviceManager) StopDevice(deviceUUID string) {
dm.mu.Lock()
defer dm.mu.Unlock()
if _, exists := dm.devices[deviceUUID]; exists {
delete(dm.devices, deviceUUID)
logger.Logger.Printf("stop equipments: DeviceUUID=%s", deviceUUID)
}
}
// GetDevice 获取设备
func (dm *DeviceManager) GetDevice(deviceUUID string) (*connect.DeviceData, bool) {
dm.mu.Lock()
defer dm.mu.Unlock()
device, exists := dm.devices[deviceUUID]
return device, exists
}
// GlobalDeviceManager 全局设备管理器
var GlobalDeviceManager = NewDeviceManager()
// 工具函数
func waitForHTTPService(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
address := fmt.Sprintf("127.0.0.1:%d", port)
logger.Logger.Printf("Start waiting for the HTTP service to start (Port=%d, Timeout=%v)", port, timeout)
attempt := 0
for time.Now().Before(deadline) {
attempt++
// 1. 先检查TCP连接
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
if err != nil {
if attempt == 1 || attempt%5 == 0 {
logger.Logger.Printf("Wait for the TCP connection... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
}
time.Sleep(1 * time.Second)
continue
}
conn.Close()
// 2. 再检查HTTP服务是否真正就绪
client := &http.Client{Timeout: 3 * time.Second}
resp, err := client.Get(fmt.Sprintf("http://%s/health", address))
if err == nil {
resp.Body.Close()
if resp.StatusCode == http.StatusOK {
logger.Logger.Printf("The HTTP service has started successfully (Port=%d, Number of attempts=%d)", port, attempt)
return true
}
}
if attempt == 1 || attempt%5 == 0 {
logger.Logger.Printf("Wait for the HTTP service to be ready... (Port=%d, Total number of attempts=%d)", port, attempt)
}
time.Sleep(1 * time.Second)
}
logger.Logger.Printf("HTTP service startup timeout (Port=%d, Total number of attempts=%d)", port, attempt)
return false
}
// waitForTCPService 增强版本 - 支持更长的等待时间和更好的错误处理
func waitForTCPService(port int, timeout time.Duration) bool {
deadline := time.Now().Add(timeout)
address := fmt.Sprintf("127.0.0.1:%d", port)
logger.Logger.Printf("Start waiting for the TCP service to start (Port=%d, Timeout=%v)", port, timeout)
attempt := 0
lastError := ""
for time.Now().Before(deadline) {
attempt++
conn, err := net.DialTimeout("tcp", address, 2*time.Second)
if err == nil {
conn.Close()
logger.Logger.Printf("The TCP service has been successfully started (Port=%d, Number of attempts=%d)", port, attempt)
return true
}
currentError := err.Error()
if currentError != lastError {
logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
lastError = currentError
} else if attempt%5 == 0 {
// 每5次尝试记录一次相同的错误
logger.Logger.Printf("Wait for TCP service... (Port=%d, Number of attempts=%d, Error: %v)", port, attempt, err)
}
time.Sleep(2 * time.Second) // 增加等待间隔
}
logger.Logger.Printf("TCP service startup timeout (Port=%d, Total number of attempts=%d, Final error: %s)", port, attempt, lastError)
return false
}
func logProcessInfo(pid int, deviceUUID string) error {
cmd := exec.Command("ps", "-o", "pid,ppid,user,stat,start_time,cmd", "-p", fmt.Sprintf("%d", pid))
output, err := cmd.Output()
if err != nil {
return fmt.Errorf("Failed to obtain process information: %v", err)
}
logger.Logger.Printf("Detailed information about the reasoning program process (DeviceUUID=%s, PID=%d):\n%s", deviceUUID, pid, string(output))
return nil
}
func getIntFromNumber(num json.Number, def int) int {
if i, err := num.Int64(); err == nil {
return int(i)
}
return def
}
// processDevice 处理单个设备
func processDevice(ctx context.Context, deviceData connect.DeviceData) {
logger.Logger.Printf("Start processing the equipment: DeviceUUID=%s", deviceData.DeviceUUID)
debugDeviceData(deviceData)
// 首先将设备添加到设备管理器
GlobalDeviceManager.AddDevice(deviceData)
defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID)
// 创建带取消的子上下文
deviceCtx, cancel := context.WithCancel(ctx)
defer cancel()
// 分配端口
port, err := connect.GlobalPortManager.AllocatePort(deviceData.DeviceUUID)
if err != nil {
logger.Logger.Printf(" DeviceUUID=%s Port allocation failed: %v", deviceData.DeviceUUID, err)
return
}
defer connect.GlobalPortManager.ReleasePort(deviceData.DeviceUUID)
logger.Logger.Printf("Port allocation successful: DeviceUUID=%s, Port=%d", deviceData.DeviceUUID, port)
inferenceServer := httpreader.NewInferenceServer(deviceData.DeviceUUID, port)
if err := inferenceServer.Start(); err != nil {
logger.Logger.Printf("Failed to start the inference data receiving server (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// 使用带上下文的停止
defer inferenceServer.StopWithContext(deviceCtx)
logger.Logger.Printf("Wait for the inference data receiving server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
// 等待HTTP服务器启动
if !waitForHTTPService(port, 10*time.Second) {
logger.Logger.Printf("The inference data receiving server has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
return
}
logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, port)
// 处理检测区域为推理程序需要的格式
detectAreaStr := connect.ProcessDetectAreaForInference(deviceData.DetectArea)
logger.Logger.Printf("The processed detection area: %s", detectAreaStr)
// 启动推理程序
inferenceCmd := exec.Command(
"./yolov5",
"-s", deviceData.CameraRTSP,
"-m", "model",
"-c", deviceData.DeviceUUID,
"-p", fmt.Sprintf("%d", port),
"-w", detectAreaStr,
"-r", fmt.Sprintf("%d", deviceData.Confidence),
)
inferenceCmd.Stdout = os.Stdout
inferenceCmd.Stderr = os.Stderr
//logger.Logger.Printf("推理程序输出仅显示在控制台")
logger.Logger.Printf("Start the inference program command: %v", inferenceCmd.Args)
if err := inferenceCmd.Start(); err != nil {
logger.Logger.Printf("The reasoning program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// 检查推理程序是否启动成功
time.Sleep(2 * time.Second)
if inferenceCmd.Process == nil {
logger.Logger.Printf("The reasoning program process has not been created (DeviceUUID=%s)", deviceData.DeviceUUID)
return
}
if err := inferenceCmd.Process.Signal(syscall.Signal(0)); err != nil {
logger.Logger.Printf("The reasoning program has exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
logger.Logger.Printf("The reasoning program process has been startedPID: %d (DeviceUUID=%s)", inferenceCmd.Process.Pid, deviceData.DeviceUUID)
if err := logProcessInfo(inferenceCmd.Process.Pid, deviceData.DeviceUUID); err != nil {
logger.Logger.Printf("Failed to record process information: %v", err)
}
defer func() {
logger.Logger.Printf("Start to stop the reasoning program (DeviceUUID=%s)", deviceData.DeviceUUID)
if inferenceCmd.Process != nil {
if err := inferenceCmd.Process.Kill(); err != nil {
logger.Logger.Printf("The termination of the reasoning program failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
} else {
logger.Logger.Printf("The reasoning program has stopped (DeviceUUID=%s)", deviceData.DeviceUUID)
}
}
}()
// 启动温度程序
// 在 processDevice 函数中修改温度程序启动部分
// 启动温度程序
tempPort := port + 1000
tempCmdArgs := []string{
"-s", deviceData.CameraRTSP,
"-t", fmt.Sprintf("%d", tempPort),
}
if deviceData.CameraChannel > 0 {
tempCmdArgs = append(tempCmdArgs, "-c", fmt.Sprintf("%d", deviceData.CameraChannel))
}
tempCmd := exec.Command("./release/run_thermometry.sh", tempCmdArgs...)
tempCmd.Stdout = os.Stdout
tempCmd.Stderr = os.Stderr
logger.Logger.Printf("Start the temperature program command: %v", tempCmd.Args)
if err := tempCmd.Start(); err != nil {
logger.Logger.Printf("The temperature program failed to start (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// 增强进程检查
time.Sleep(3 * time.Second) // 减少等待时间
if tempCmd.Process == nil || tempCmd.Process.Pid == 0 {
logger.Logger.Printf("The temperature program was not started correctly, and the process PID is 0 (DeviceUUID=%s)", deviceData.DeviceUUID)
return
}
// 检查进程是否真的在运行
if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil {
logger.Logger.Printf("The temperature program process check failed (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
logger.Logger.Printf("The temperature program has been successfully startedPID: %d (DeviceUUID=%s)", tempCmd.Process.Pid, deviceData.DeviceUUID)
// 增加TCP服务启动前的额外检查
logger.Logger.Printf("Wait for the temperature program to be initialized... (DeviceUUID=%s)", deviceData.DeviceUUID)
time.Sleep(2 * time.Second)
// 等待 TCP 服务器就绪
logger.Logger.Printf("Wait for the temperature program TCP server to start... (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
if !waitForTCPService(tempPort, 20*time.Second) { // 增加超时时间
logger.Logger.Printf("The TCP server of the temperature program has timed out during startup (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
// 检查温度程序是否还在运行
if tempCmd.Process != nil {
if err := tempCmd.Process.Signal(syscall.Signal(0)); err != nil {
logger.Logger.Printf("The temperature program has abnormally exited (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
} else {
logger.Logger.Printf("The temperature program is still running but the TCP service has not started. There might be a configuration issue(DeviceUUID=%s)", deviceData.DeviceUUID)
}
}
return
}
// 创建 TCP 客户端
tempClient, err := tcpreader.NewTCPClient(deviceData.DeviceUUID, tempPort)
if err != nil {
logger.Logger.Printf("Failed to create a TCP client (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
defer tempClient.Close()
logger.Logger.Printf("The TCP client has been successfully created (DeviceUUID=%s, Port=%d)", deviceData.DeviceUUID, tempPort)
// 确保上传目录存在
if err := os.MkdirAll("/data/upload", 0755); err != nil {
logger.Logger.Printf("Failed to create the upload directory: %v", err)
return
}
logger.Logger.Printf("The directory upload has been successfully created: /data/upload")
// 主循环:从通道接收推理数据
logger.Logger.Printf("Start monitoring the equipment (DeviceUUID=%s)", deviceData.DeviceUUID)
//var lastPersonCount = -1
//var lastTemperature float64 = -1
monitorTicker := time.NewTicker(2 * time.Second)
defer monitorTicker.Stop()
for {
select {
case <-deviceCtx.Done():
logger.Logger.Printf("Equipment processing has been cancelled: DeviceUUID=%s", deviceData.DeviceUUID)
// 强制停止所有子进程
if inferenceCmd.Process != nil {
inferenceCmd.Process.Kill()
}
if tempCmd.Process != nil {
tempCmd.Process.Kill()
}
tempClient.Close()
return
case inferenceResult, ok := <-inferenceServer.GetDataChan():
if !ok {
logger.Logger.Printf("The inference data channel is closed: DeviceUUID=%s", deviceData.DeviceUUID)
return
}
currentPersonCount := 0
for _, param := range inferenceResult.Params {
if param.ClassIdx == 1 {
currentPersonCount = param.Number
break
}
}
if currentPersonCount != deviceData.PersonCount {
deviceData.PersonCount = currentPersonCount
connect.UpdateDeviceData(deviceData)
logger.Logger.Printf("Personnel quantity update: %d (DeviceUUID=%s)", currentPersonCount, deviceData.DeviceUUID)
}
case <-monitorTicker.C:
now := time.Now().Unix()
// === 获取当前设备独立状态 ===
state := getDeviceState(deviceData.DeviceUUID)
// === 读取温度(防抖)===
tempResult, err := tempClient.ReadTemperatureResult()
if err == nil {
if math.Abs(tempResult.Tmp-deviceData.Temperature) >= 0.5 {
oldTemp := deviceData.Temperature
deviceData.Temperature = tempResult.Tmp
connect.UpdateDeviceData(deviceData)
GlobalDeviceManager.AddDevice(deviceData)
logger.Logger.Printf("Temperature update: %.1f°C → %.1f°C (DeviceUUID=%s)",
oldTemp, tempResult.Tmp, deviceData.DeviceUUID)
}
}
// 获取参数
detectionSec := getIntFromNumber(deviceData.DetectionTime, 20)
alarmCooldown := getIntFromNumber(deviceData.AlarmTime, 300)
// === 防抖:无人 + 温度稳定超阈值 ===
noPerson := deviceData.PersonCount == 0
// 温度防抖
var tempStableHigh bool
threshold := deviceData.TemperatureThreshold
if deviceData.Temperature > threshold+0.5 {
tempStableHigh = true
} else if deviceData.Temperature < threshold-0.5 {
tempStableHigh = false
} else {
tempStableHigh = deviceData.Temperature > threshold
}
conditionMet := noPerson && tempStableHigh
// 1. 冷却期检查
if now < state.AlarmPauseUntil {
if state.InCondition {
logger.Logger.Printf("Alarm in cooldown: %ds remaining (DeviceUUID=%s)",
state.AlarmPauseUntil-now, deviceData.DeviceUUID)
state.InCondition = false
}
continue
}
// 2. 条件满足
if conditionMet {
if !state.InCondition {
state.StartTime = now
state.InCondition = true
logger.Logger.Printf("Alarm condition met, start timing (%ds required): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)",
detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID)
} else if now-state.StartTime >= int64(detectionSec) {
logger.Logger.Printf("ALARM TRIGGERED (after %ds): PersonCount=%d, Temp=%.1f°C > %.1f°C (DeviceUUID=%s)",
detectionSec, deviceData.PersonCount, deviceData.Temperature, threshold, deviceData.DeviceUUID)
go triggerAlarmWithDelay(deviceData, state.StartTime, alarmCooldown)
state.AlarmPauseUntil = now + int64(alarmCooldown)
state.InCondition = false
deviceDataCopy := deviceData
deviceDataCopy.AlarmPauseUntil = state.AlarmPauseUntil
connect.UpdateDeviceData(deviceDataCopy)
}
} else {
if state.InCondition {
logger.Logger.Printf("Alarm condition interrupted: lasted %ds < %ds (DeviceUUID=%s)",
now-state.StartTime, detectionSec, deviceData.DeviceUUID)
state.InCondition = false
}
}
}
}
}
func waitForShutdown(ctx context.Context, cancel context.CancelFunc, wg *sync.WaitGroup) {
stop := make(chan os.Signal, 1)
signal.Notify(stop, os.Interrupt, syscall.SIGTERM)
select {
case <-stop:
logger.Logger.Printf("Upon receiving the stop signal, start cleaning...")
// First cancel all contexts
cancel()
// Wait for all goroutines to complete, but with timeout
done := make(chan struct{})
go func() {
wg.Wait()
close(done)
}()
select {
case <-done:
logger.Logger.Printf("All goroutines exited normally")
case <-time.After(10 * time.Second):
logger.Logger.Printf("When the waiting time exceeds the limit, force exit")
// Log status before force exit
activeDevices := GlobalDeviceManager.GetAllDevices()
logger.Logger.Printf("It still exists when forced to exit %d An active device", len(activeDevices))
for uuid := range activeDevices {
logger.Logger.Printf("Forcibly stop the equipment: UUID=%s", uuid)
}
}
case <-ctx.Done():
logger.Logger.Printf("The context has been cancelled.")
}
logger.Logger.Printf("The program exits successfully.")
os.Exit(0)
}
// initWebSocketAsync Asynchronously initializes WebSocket
func initWebSocketAsync(ctx context.Context, wg *sync.WaitGroup) {
logger.Logger.Printf("Start the initialization of asynchronous WebSocket...")
// 1. Initialize WebSocket channel
if err := connect.InitWSChannel(); err != nil {
logger.Logger.Printf("Failed to initialize WebSocket channel: %v", err)
return
}
// 2. Wait for connection to be ready (with context cancellation)
logger.Logger.Printf("Waiting for WebSocket connection to be established...")
containerName := "fireleave-container"
for i := 0; i < 30; i++ {
select {
case <-ctx.Done():
logger.Logger.Printf("Context cancelled, aborting WebSocket registration")
return
case <-time.After(1 * time.Second):
client := connect.GetWSClient()
if client != nil && client.IsConnected() {
// Connection successful → proceed to register
goto register
}
}
}
logger.Logger.Printf("WebSocket connection timeout after 30 seconds")
return
register:
// 3. Register service
serviceID, err := connect.RegisterService(containerName)
if err != nil {
logger.Logger.Printf("Service registration failed: %v", err)
} else {
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
}
}
// GetAllDevices Get all devices (new method)
func (dm *DeviceManager) GetAllDevices() map[string]*connect.DeviceData {
dm.mu.Lock()
defer dm.mu.Unlock()
// Return copy to avoid concurrent modification
result := make(map[string]*connect.DeviceData)
for k, v := range dm.devices {
result[k] = v
}
return result
}
func jsonString(v any) string {
b, err := json.Marshal(v)
if err != nil {
logger.Logger.Printf("JSON编码失败: %v", err)
return ""
}
return string(b)
}
// main.go → triggerAlarmWithDelay
func triggerAlarmWithDelay(deviceData connect.DeviceData, alarmTime int64, cooldown int) {
key := fmt.Sprintf("%s_%d", deviceData.DeviceUUID, alarmTime)
if _, loaded := mergingAlarms.LoadOrStore(key, true); loaded {
return
}
defer mergingAlarms.Delete(key)
// === 1. 合并视频 ===
mergedFile, err := video_server.MergeVideo(deviceData.DeviceUUID, alarmTime)
if err != nil {
logger.Logger.Printf("视频合并失败 (DeviceUUID=%s): %v", deviceData.DeviceUUID, err)
return
}
// === 2. 上传 Nginx ===
var hostUUID string
configs, err := connect.LoadServiceConfig()
if err == nil {
for _, cfg := range configs {
if cfg.DeviceUUID == deviceData.DeviceUUID {
hostUUID = cfg.HostUUID
break
}
}
}
eventParamsForNginx := map[string]any{
"alarm_time": alarmTime,
"description": "无人工作区且温度超阈值报警",
"device_UUID": deviceData.DeviceUUID,
"host_uuid": hostUUID,
"person_count": deviceData.PersonCount,
"temperature": deviceData.Temperature,
}
videoURL, err := video_server.UploadToNginx(mergedFile, eventParamsForNginx, deviceData.UploadURL)
videoURLStr := ""
if err != nil {
logger.Logger.Printf("Nginx上传失败: %v", err)
} else {
videoURLStr = videoURL
logger.Logger.Printf("Nginx上传成功: %s", videoURL)
}
client := connect.GetWSClient()
if client == nil || !client.IsConnected() {
logger.Logger.Printf("WS未就绪延迟上报 (DeviceUUID=%s)", deviceData.DeviceUUID)
os.Remove(mergedFile)
return
}
// === 3. 上报事件 (event) ===
eventPayload := map[string]any{
"type": "event",
"args": map[string]any{
"level": "alarm",
"description": "无人工作区且温度超阈值报警",
"device_uuid": deviceData.DeviceUUID,
"host_uuid": hostUUID,
"video_url": videoURLStr,
"alarm_time": alarmTime,
"address": deviceData.Address,
},
}
eventMsg := map[string]any{
"method": "metric_data",
"params": map[string]any{
"route_key": fmt.Sprintf("/dhlr/device/%s/event", deviceData.DeviceUUID),
"metric": jsonString(eventPayload),
},
}
client.SendAsync(eventMsg)
logger.Logger.Printf("报警事件已上报 (DeviceUUID=%s)", deviceData.DeviceUUID)
// === 4. 上报数据快照 (data) ===
dataPayload := map[string]any{
"type": "data",
"args": map[string]any{
"person_count": deviceData.PersonCount,
"temperature": deviceData.Temperature,
"camera_rtsp": deviceData.CameraRTSP,
"task_id": deviceData.TaskID,
"camera_ip": deviceData.CameraIP,
"confidence": deviceData.Confidence,
"alarm_time": alarmTime,
"address": deviceData.Address,
},
}
dataMsg := map[string]any{
"method": "metric_data",
"params": map[string]any{
"route_key": fmt.Sprintf("/dhlr/device/%s/data", deviceData.DeviceUUID),
"metric": jsonString(dataPayload),
},
}
client.SendAsync(dataMsg)
logger.Logger.Printf("指标快照已上报 (DeviceUUID=%s, Temp=%.1f°C, 人:%d)",
deviceData.DeviceUUID, deviceData.Temperature, deviceData.PersonCount)
// === 5. 清理本地文件 ===
os.Remove(mergedFile)
video_server.CleanupOldVideos(deviceData.DeviceUUID)
}
func waitAndRegister() error {
containerName := os.Getenv("CONTAINER_NAME")
if containerName == "" {
containerName = "fireleave-service-default"
}
// 等待 WS 连接成功
for i := 0; i < 30; i++ {
client := connect.GetWSClient()
if client != nil && client.IsConnected() {
// 连接成功 → 立即注册
serviceID, err := connect.RegisterService(containerName)
if err != nil {
return fmt.Errorf("服务注册失败: %v", err)
}
logger.Logger.Printf("服务注册成功ServiceID=%s", serviceID)
return nil
}
time.Sleep(1 * time.Second)
}
return fmt.Errorf("WebSocket 连接超时,无法注册服务")
}
func main() {
// Define command line parameters (remove ws parameter)
var rtspURL string
var deviceUUID string
flag.StringVar(&rtspURL, "rtsp", "", "RTSP stream URL for testing")
flag.StringVar(&deviceUUID, "uuid", "", "Device UUID for testing")
flag.Parse()
// Initialize logger
if err := logger.InitLogger(); err != nil {
log.Fatalf("Logger initialization failed: %v", err)
}
defer logger.CloseLogger()
// Log cleanup
go func() {
for {
logger.CleanupOldLogs()
time.Sleep(24 * time.Hour)
}
}()
// Directly enter normal mode (remove wsMode judgment)
logger.Logger.Printf("=== Starting FireLeave Tool ===")
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// 1. Initialize device data directly from service config
logger.Logger.Printf("Step 1: Initialize device data from service config")
deviceDataList, err := connect.InitializeDevicesFromServiceConfig()
if err != nil {
logger.Logger.Printf("Device data initialization failed: %v", err)
logger.Logger.Printf("Will try to use existing device files...")
// If initialization fails, try to load from existing files
deviceDataList = connect.GetDeviceDataList()
}
logger.Logger.Printf("Step 2: Starting %d devices", len(deviceDataList))
for i, deviceData := range deviceDataList {
logger.Logger.Printf("Starting device[%d]: UUID=%s", i, deviceData.DeviceUUID)
wg.Add(1)
go func(deviceData connect.DeviceData, index int) {
defer wg.Done()
defer GlobalDeviceManager.StopDevice(deviceData.DeviceUUID)
logger.Logger.Printf("Device processing goroutine started: UUID=%s, Index=%d", deviceData.DeviceUUID, index)
processDevice(ctx, deviceData)
}(deviceData, i)
}
logger.Logger.Printf("Step 3: All devices started")
// 3. Asynchronously initialize WebSocket
logger.Logger.Printf("Step 4: Asynchronously initialize WebSocket connection")
go initWebSocketAsync(ctx, &wg)
logger.Logger.Printf("=== Main program startup completed ===")
logger.Logger.Printf("System status: Running devices=%d", len(deviceDataList))
// Add device status monitoring
go monitorDeviceStatus(ctx, deviceDataList)
// Wait for termination signal
waitForShutdown(ctx, cancel, &wg)
}
func printGlobalDeviceStatus() {
now := time.Now().Unix()
activeDevices := GlobalDeviceManager.GetAllDevices()
if len(activeDevices) == 0 {
logger.Logger.Printf("=== Global Device Status monitoring === No running devices for the time being")
return
}
logger.Logger.Printf("=== Global Device Status monitoring === Currently running %d", len(activeDevices))
i := 1
for uuid, data := range activeDevices {
// 从 deviceStates 获取冷却时间
stateInf, _ := deviceStates.Load(uuid)
state := stateInf.(*deviceAlarmState)
cooldownLeft := int64(0)
if now < state.AlarmPauseUntil {
cooldownLeft = state.AlarmPauseUntil - now
}
logger.Logger.Printf(" [%d] DeviceUUID=%s | person:%d | temp:%.1f°C | threshold value:%.1f°C | cooling time:%ds",
i,
uuid,
data.PersonCount,
data.Temperature,
data.TemperatureThreshold,
cooldownLeft,
)
i++
}
}
func monitorDeviceStatus(ctx context.Context, initialDevices []connect.DeviceData) {
ticker := time.NewTicker(30 * time.Second)
defer ticker.Stop()
logger.Logger.Printf("Start global device status monitoring (print once every 30 seconds)")
for {
select {
case <-ticker.C:
printGlobalDeviceStatus()
if files, err := filepath.Glob("/data/devices/device_*.json"); err == nil {
logger.Logger.Printf("Device file status: A total of %d device files", len(files))
for i, file := range files {
if i < 3 {
logger.Logger.Printf(" device file[%d]: %s", i, filepath.Base(file))
}
}
}
logger.Logger.Printf("Operating equipment %d ", len(GlobalDeviceManager.GetAllDevices()))
case <-ctx.Done():
logger.Logger.Printf("Global device status monitoring has stopped")
return
}
}
}
// debugDeviceData Debug device data
func debugDeviceData(deviceData connect.DeviceData) {
logger.Logger.Printf("Debug device data: UUID=%s", deviceData.DeviceUUID)
logger.Logger.Printf(" TaskID: %q", deviceData.TaskID)
logger.Logger.Printf(" CameraRTSP: %q", deviceData.CameraRTSP)
logger.Logger.Printf(" CameraIP: %q", deviceData.CameraIP)
logger.Logger.Printf(" UploadURL: %q", deviceData.UploadURL)
logger.Logger.Printf(" DetectAreaStr: %q", deviceData.DetectAreaStr)
logger.Logger.Printf(" PersonCount: %d", deviceData.PersonCount)
logger.Logger.Printf(" Temperature: %f", deviceData.Temperature)
}

BIN
main_aarch64 Normal file

Binary file not shown.

76
tcpreader/tcpreader.go Normal file
View File

@ -0,0 +1,76 @@
package tcpreader
import (
"bufio"
"encoding/json"
"fmt"
"net"
"sync"
"time"
"FireLeave_tool/logger"
)
// TemperatureResult 温度读取程序返回的数据
type TemperatureResult struct {
Tmp float64 `json:"tmp"`
}
// TCPClient TCP 客户端
type TCPClient struct {
addr string // 地址,例如 "127.0.0.1:59881"
deviceUUID string // 关联的 DeviceUUID
conn net.Conn
mu sync.Mutex
}
// NewTCPClient 创建新的 TCP 客户端
func NewTCPClient(deviceUUID string, tempPort int) (*TCPClient, error) {
addr := fmt.Sprintf("127.0.0.1:%d", tempPort)
conn, err := net.DialTimeout("tcp", addr, 5*time.Second)
if err != nil {
logger.Logger.Printf("Connect TCP server %s failed (DeviceUUID=%s): %v", addr, deviceUUID, err)
return nil, fmt.Errorf("connect TCP server %s failed: %v", addr, err)
}
client := &TCPClient{
addr: addr,
deviceUUID: deviceUUID,
conn: conn,
}
logger.Logger.Printf("TCP client created successfully: DeviceUUID=%s, Address=%s", deviceUUID, addr)
return client, nil
}
// ReadTemperatureResult 读取温度程序数据
func (tc *TCPClient) ReadTemperatureResult() (TemperatureResult, error) {
tc.mu.Lock()
defer tc.mu.Unlock()
tc.conn.SetReadDeadline(time.Now().Add(10 * time.Second))
reader := bufio.NewReader(tc.conn)
message, err := reader.ReadString('\n')
if err != nil {
logger.Logger.Printf("Read temperature data failed (DeviceUUID=%s, Address=%s): %v", tc.deviceUUID, tc.addr, err)
return TemperatureResult{}, fmt.Errorf("read temperature data failed: %v", err)
}
var result TemperatureResult
if err := json.Unmarshal([]byte(message), &result); err != nil {
logger.Logger.Printf("Parse temperature data failed (DeviceUUID=%s, Address=%s): %v", tc.deviceUUID, tc.addr, err)
return TemperatureResult{}, fmt.Errorf("parse temperature data failed: %v", err)
}
return result, nil
}
// Close 关闭 TCP 连接
func (tc *TCPClient) Close() {
tc.mu.Lock()
defer tc.mu.Unlock()
if tc.conn != nil {
tc.conn.Close()
logger.Logger.Printf("Close TCP connection: DeviceUUID=%s, Address=%s", tc.deviceUUID, tc.addr)
}
}

294
video_server/video_merge.go Normal file
View File

@ -0,0 +1,294 @@
package video_server
import (
"fmt"
"io"
"os"
"os/exec"
"path/filepath"
"sort"
"strconv"
"strings"
"time"
"FireLeave_tool/logger"
)
// MergeVideo 合并视频文件 - 完全适配 mp4_merge --out 模式
func MergeVideo(deviceUUID string, alarmTime int64) (string, error) {
videoDir := fmt.Sprintf("/usr/data/camera/%s", deviceUUID)
if _, err := os.Stat(videoDir); os.IsNotExist(err) {
return "", fmt.Errorf("video directory not exist: %s", videoDir)
}
// === 1. 时间对齐到 10 秒 ===
alarmTimeAligned := alignTo10Seconds(alarmTime)
alarmTimeStr := time.Unix(alarmTimeAligned, 0).Format("20060102150405")
beforeTime := alarmTimeAligned - 10
afterTime := alarmTimeAligned + 10
beforeStr := time.Unix(beforeTime, 0).Format("20060102150405")
afterStr := time.Unix(afterTime, 0).Format("20060102150405")
paths := map[string]string{
beforeStr: filepath.Join(videoDir, beforeStr+".mp4"),
alarmTimeStr: filepath.Join(videoDir, alarmTimeStr+".mp4"),
afterStr: filepath.Join(videoDir, afterStr+".mp4"),
}
videoFiles := []string{}
for name, path := range paths {
if _, err := os.Stat(path); err == nil {
videoFiles = append(videoFiles, path)
logger.Logger.Printf("Found video: %s", name+".mp4")
} else {
logger.Logger.Printf("Missing video: %s", name+".mp4")
}
}
// === 2. 补充替代文件 ===
if len(videoFiles) < 3 {
allFiles, _ := os.ReadDir(videoDir)
var candidates []struct {
path string
ts int64
}
for _, f := range allFiles {
if !f.IsDir() && strings.HasSuffix(f.Name(), ".mp4") && !strings.Contains(f.Name(), "_joined") {
if ts, err := parseSimpleTimestamp(f.Name()); err == nil {
candidates = append(candidates, struct {
path string
ts int64
}{filepath.Join(videoDir, f.Name()), ts})
}
}
}
targets := []int64{beforeTime, alarmTimeAligned, afterTime}
for _, target := range targets {
if containsTimestamp(videoFiles, target) {
continue
}
closest := findClosest(candidates, target)
if closest != "" && !contains(videoFiles, closest) {
videoFiles = append(videoFiles, closest)
logger.Logger.Printf("Using fallback: %s", filepath.Base(closest))
}
}
}
if len(videoFiles) < 2 {
return "", fmt.Errorf("insufficient video files: %d", len(videoFiles))
}
// === 3. 排序 ===
sort.Slice(videoFiles, func(i, j int) bool {
return getTimestampFromPath(videoFiles[i]) < getTimestampFromPath(videoFiles[j])
})
logger.Logger.Printf("Merging %d videos:", len(videoFiles))
for i, f := range videoFiles {
logger.Logger.Printf(" [%d] %s", i, filepath.Base(f))
}
// === 4. 显式指定输出文件100% 可控)===
outputFile := fmt.Sprintf("%s.mp4_joined.mp4", alarmTimeStr)
joinedPath := filepath.Join(videoDir, outputFile)
// 清理旧文件(防止冲突)
if err := os.Remove(joinedPath); err != nil && !os.IsNotExist(err) {
logger.Logger.Printf("Failed to remove old output file %s: %v", joinedPath, err)
}
// === 5. 调用 mp4_merge显式 --out ===
args := append(videoFiles, "--out", joinedPath)
cmd := exec.Command("mp4_merge", args...)
cmd.Stdout = logger.Logger.Writer()
cmd.Stderr = logger.Logger.Writer()
if err := cmd.Run(); err != nil {
return "", fmt.Errorf("mp4_merge failed: %v", err)
}
// === 6. 验证输出文件存在 ===
if _, err := os.Stat(joinedPath); err != nil {
return "", fmt.Errorf("output file not created: %s, error: %v", joinedPath, err)
}
info, err := os.Stat(joinedPath)
if err != nil {
return "", fmt.Errorf("stat output file failed: %v", err)
}
logger.Logger.Printf("Merged file created: %s (size: %d bytes)", outputFile, info.Size())
// === 7. 移动到 /data/upload/ ===
t := time.Unix(alarmTimeAligned, 0)
year, month, day := t.Date()
timestampPart := fmt.Sprintf("%d-%d-%d-%d", year, int(month), day, alarmTimeAligned)
finalName := fmt.Sprintf("%s-%s.MP4", deviceUUID, timestampPart)
finalPath := filepath.Join("/data/upload", finalName)
if err := os.MkdirAll("/data/upload", 0755); err != nil {
return "", fmt.Errorf("create /data/upload failed: %v", err)
}
// 跨分区复制 + 删除
if err := copyFile(joinedPath, finalPath); err != nil {
return "", fmt.Errorf("copy failed: %v", err)
}
if err := os.Remove(joinedPath); err != nil {
logger.Logger.Printf("Failed to delete temp file %s: %v", joinedPath, err)
}
logger.Logger.Printf("Video merged and moved successfully: %s", finalPath)
return finalPath, nil
}
// containsTimestamp 检查是否已有该时间戳
func containsTimestamp(files []string, ts int64) bool {
for _, f := range files {
if getTimestampFromPath(f) == ts {
return true
}
}
return false
}
// findClosest 查找最接近的时间戳文件
func findClosest(candidates []struct {
path string
ts int64
}, target int64) string {
var best string
var minDiff int64 = 1<<63 - 1
for _, c := range candidates {
diff := abs(c.ts - target)
if diff < minDiff {
minDiff = diff
best = c.path
}
}
if minDiff <= 30 {
return best
}
return ""
}
func abs(x int64) int64 {
if x < 0 {
return -x
}
return x
}
// copyFile 支持跨分区复制
func copyFile(src, dst string) error {
in, err := os.Open(src)
if err != nil {
return err
}
defer in.Close()
out, err := os.Create(dst)
if err != nil {
return err
}
defer out.Close()
if _, err = io.Copy(out, in); err != nil {
return err
}
return out.Sync()
}
// alignTo10Seconds 将时间对齐到最近的10秒间隔
func alignTo10Seconds(timestamp int64) int64 {
t := time.Unix(timestamp, 0)
// 获取秒数并对齐到10秒
seconds := t.Second()
alignedSeconds := (seconds / 10) * 10
// 创建新的时间
alignedTime := time.Date(t.Year(), t.Month(), t.Day(), t.Hour(), t.Minute(), alignedSeconds, 0, t.Location())
return alignedTime.Unix()
}
// parseSimpleTimestamp 解析简单的时间戳文件名 (如: 20251029181740.mp4)
func parseSimpleTimestamp(filename string) (int64, error) {
// 移除文件扩展名
nameWithoutExt := strings.TrimSuffix(filename, filepath.Ext(filename))
// 文件名应该就是14位时间戳
if len(nameWithoutExt) != 14 {
//return 0, fmt.Errorf("无效的时间戳格式: %s (期望14位数字)", nameWithoutExt)
}
// 解析时间戳
year, _ := strconv.Atoi(nameWithoutExt[0:4])
month, _ := strconv.Atoi(nameWithoutExt[4:6])
day, _ := strconv.Atoi(nameWithoutExt[6:8])
hour, _ := strconv.Atoi(nameWithoutExt[8:10])
minute, _ := strconv.Atoi(nameWithoutExt[10:12])
second, _ := strconv.Atoi(nameWithoutExt[12:14])
t := time.Date(year, time.Month(month), day, hour, minute, second, 0, time.Local)
return t.Unix(), nil
}
// getTimestampFromPath 从文件路径中提取时间戳
func getTimestampFromPath(filePath string) int64 {
filename := filepath.Base(filePath)
timestamp, err := parseSimpleTimestamp(filename)
if err != nil {
return 0
}
return timestamp
}
// contains 检查切片是否包含某个元素
func contains(slice []string, item string) bool {
for _, s := range slice {
if s == item {
return true
}
}
return false
}
// CleanupOldVideos 清理旧的合并视频文件,保留最近的 10 个
func CleanupOldVideos(deviceUUID string) {
const maxFiles = 10
videoDir := "/data/upload"
prefix := fmt.Sprintf("%s_", deviceUUID)
files, err := os.ReadDir(videoDir)
if err != nil {
logger.Logger.Printf("read the signals %s Error: %v", videoDir, err)
return
}
var videoFiles []os.DirEntry
for _, file := range files {
if strings.HasPrefix(file.Name(), prefix) && strings.HasSuffix(file.Name(), "_merged.mp4") {
videoFiles = append(videoFiles, file)
}
}
if len(videoFiles) <= maxFiles {
return
}
sort.Slice(videoFiles, func(i, j int) bool {
infoI, _ := videoFiles[i].Info()
infoJ, _ := videoFiles[j].Info()
return infoI.ModTime().Before(infoJ.ModTime())
})
for i := 0; i < len(videoFiles)-maxFiles; i++ {
filePath := filepath.Join(videoDir, videoFiles[i].Name())
if err := os.Remove(filePath); err != nil {
logger.Logger.Printf("Clean up old videos %s Error: %v", filePath, err)
} else {
logger.Logger.Printf("Clean up old videos: %s", filePath)
}
}
}

View File

@ -0,0 +1,100 @@
package video_server
import (
"bytes"
"encoding/json"
"fmt"
"io"
"mime/multipart"
"net/http"
"os"
"path/filepath"
"time"
"FireLeave_tool/logger"
)
// UploadToNginx 上传视频到 Nginx 服务器,返回 URL
func UploadToNginx(filePath string, eventParams map[string]any, uploadURL string) (string, error) {
// === 1. 打开文件 ===
file, err := os.Open(filePath)
if err != nil {
return "", fmt.Errorf("open file failed: %w", err)
}
defer file.Close()
// === 2. 创建表单 ===
body := new(bytes.Buffer)
writer := multipart.NewWriter(body)
// --- 上传文件 ---
part, err := writer.CreateFormFile("file", filepath.Base(filePath))
if err != nil {
return "", fmt.Errorf("create form file failed: %w", err)
}
if _, err = io.Copy(part, file); err != nil {
return "", fmt.Errorf("copy file failed: %w", err)
}
// --- 上传 JSON ---
eventJSON, err := json.Marshal(eventParams)
if err != nil {
return "", fmt.Errorf("marshal event failed: %w", err)
}
logger.Logger.Printf("[Upload Event parameters]eventParams: %v", eventParams)
logger.Logger.Printf("[Upload Event parameters]eventJSON: %s", string(eventJSON))
if err := writer.WriteField("event", string(eventJSON)); err != nil {
return "", fmt.Errorf("write event field failed: %w", err)
}
if err := writer.Close(); err != nil {
return "", fmt.Errorf("close writer failed: %w", err)
}
// === 3. 打印请求头 ===
logger.Logger.Printf("[HTTP Request]URL: %s", uploadURL)
logger.Logger.Printf("[HTTP Request]Content-Type: %s", writer.FormDataContentType())
logger.Logger.Printf("[HTTP Request]Body 长度: %d bytes", body.Len())
// === 4. 发送请求 ===
req, err := http.NewRequest("POST", uploadURL, body)
if err != nil {
return "", fmt.Errorf("create request failed: %w", err)
}
req.Header.Set("Content-Type", writer.FormDataContentType())
client := &http.Client{Timeout: 180 * time.Second}
resp, err := client.Do(req)
if err != nil {
return "", fmt.Errorf("upload failed: %w", err)
}
defer resp.Body.Close()
// === 5. 打印响应 ===
respBody, _ := io.ReadAll(resp.Body)
logger.Logger.Printf("[nginx response]Status: %d", resp.StatusCode)
logger.Logger.Printf("[nginx response]Body: %s", string(respBody))
if resp.StatusCode != http.StatusOK {
return "", fmt.Errorf("nginx error %d: %s", resp.StatusCode, string(respBody))
}
// === 6. 解析 video_url ===
var result struct {
Code int `json:"code"`
Result json.RawMessage `json:"result"`
}
if err := json.Unmarshal(respBody, &result); err != nil {
return "", fmt.Errorf("parse response failed: %w", err)
}
if result.Code != 0 {
return "", fmt.Errorf("nginx error: %s", string(respBody))
}
var videoURL string
json.Unmarshal(result.Result, &videoURL)
logger.Logger.Printf("[ Upload Successful ] Local file: %s → Remote URL: %s", filePath, videoURL)
return videoURL, nil
}