FireLeave_tool/connect/ws_channel.go
2025-12-10 14:34:57 +08:00

730 lines
20 KiB
Go
Raw Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

package connect
import (
"context"
"encoding/json"
"fmt"
"os"
"strings"
"sync"
"time"
"FireLeave_tool/logger"
"github.com/gorilla/websocket"
)
// Global variables
var (
wsClient *WSClient
wsMutex sync.RWMutex
isConnecting bool
wsURL = "ws://172.17.0.1:18080/ws" // Updated to specified address
deviceDataMutex sync.Mutex
ConfigUpdateChan chan DeviceData
currentServiceID string // New: current service's server_id
)
type PublishParams struct {
Topic string `json:"topic"`
Data json.RawMessage `json:"data,omitempty"`
Content json.RawMessage `json:"content,omitempty"`
}
type InferenceHeartbeat struct {
DeviceUUID string `json:"device_uuid"`
PersonCount int `json:"person_count"`
Temperature float64 `json:"temperature,omitempty"`
Timestamp int64 `json:"timestamp"` // 上报时间戳
}
func GetWSClient() *WSClient {
wsMutex.RLock()
defer wsMutex.RUnlock()
return wsClient
}
func (c *WSClient) IsConnected() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.connected
}
const (
serviceConfigFile = "/usr/local/etc/service.conf"
)
// DeviceData struct
type DeviceData struct {
DeviceUUID string `json:"device_uuid"`
TaskID string `json:"task_id"`
CameraRTSP string `json:"camera_rtsp"`
CameraIP string `json:"camera_ip"`
Confidence int `json:"confidence"`
DetectArea []Area `json:"detect_area"`
DetectAreaStr string `json:"detect_area_str,omitempty"`
PersonCount int `json:"person_count"`
DetectionTime json.Number `json:"detection_time"` // Using json.Number
AlarmTime json.Number `json:"alarm_time"` // Using json.Number
TemperatureThreshold float64 `json:"temperature_threshold"`
Temperature float64 `json:"temperature"`
UploadURL string `json:"upload_url"`
WSConn *websocket.Conn
CameraChannel int `json:"camera_channel"`
LastAlarmTime int64 `json:"-"` // Last alarm time
AlarmPauseUntil int64 `json:"-"` // Pause alarm untilw
HostUUID string `json:"host_uuid"`
Address string `json:"address"`
}
type InferenceParams struct {
DeviceUUID string `json:"device_uuid"`
CameraRTSP string `json:"camera_rtsp"`
CameraChannel int `json:"camera_channel"`
Confidence int `json:"confidence"`
DetectArea string `json:"detect_area"` // String format detection area
Port int `json:"port"` // HTTP server port
}
type Area struct {
ID int `json:"id"`
Pts []Point `json:"pts"`
}
type Point struct {
X float64 `json:"x"`
Y float64 `json:"y"`
}
// WSMessage struct
type WSMessage struct {
ID int64 `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"` // Modified: string -> json.RawMessage
Error *WSError `json:"error,omitempty"`
}
type WSError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type RegisterParams struct {
ContainerName string `json:"container_name,omitempty"`
ServiceID string `json:"service_id,omitempty"`
MetaData map[string]interface{} `json:"meta_data,omitempty"` // New: metadata field
}
type SubscribeParams struct {
Topic string `json:"topic"`
}
type WSClient struct {
Conn *websocket.Conn
url string
serviceID string
serverID string
connected bool
mutex sync.Mutex
messageID int64
callbacks map[int64]chan *WSMessage
subscribers map[string]func([]byte)
closeChan chan struct{}
reconnectFlag bool
}
// NewWSClient creates WebSocket client
func NewWSClient(url string) (*WSClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
client := &WSClient{
Conn: conn,
url: url,
connected: true,
messageID: 1,
callbacks: make(map[int64]chan *WSMessage),
subscribers: make(map[string]func([]byte)),
closeChan: make(chan struct{}),
reconnectFlag: true,
}
go client.readLoop()
go client.heartbeatLoop()
return client, nil
}
func (c *WSClient) connect() error {
conn, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
return err
}
c.Conn = conn
c.connected = true
go c.readLoop()
go c.heartbeatLoop()
return nil
}
// InitWSChannel initializes WebSocket
func InitWSChannel() error {
wsMutex.Lock()
defer wsMutex.Unlock()
if wsClient != nil && wsClient.connected {
logger.Logger.Printf("WebSocket connection already exists")
return nil
}
if isConnecting {
logger.Logger.Printf("WebSocket connection is being established")
return fmt.Errorf("WebSocket connection is being established")
}
isConnecting = true
defer func() { isConnecting = false }()
logger.Logger.Printf("Starting WebSocket connection initialization: %s", wsURL)
// Connect WebSocket asynchronously to avoid blocking
go func() {
client, err := NewWSClient(wsURL)
if err != nil {
logger.Logger.Printf("Failed to create WebSocket client: %v", err)
return
}
if ConfigUpdateChan == nil {
ConfigUpdateChan = make(chan DeviceData, 100)
}
// Note: No longer calling LoadDeviceData() here to avoid duplicate loading
wsClient = client
logger.Logger.Printf("WebSocket connection initialized successfully (asynchronous)")
}()
return nil
}
// New: function to generate server_id
func generateServiceID() string {
return fmt.Sprintf("fireleave-service-%d", time.Now().UnixNano())
}
// New: set current server_id
//func SetServerID(serverID string) {
// currentServerID = serverID
//}
func GetServiceID() string {
if currentServiceID == "" {
currentServiceID = generateServiceID()
}
return currentServiceID
}
// RegisterService service registration - modified: now requires providing own server_id
func RegisterService(containerName string) (string, error) {
if wsClient == nil {
return "", fmt.Errorf("WebSocket client not initialized")
}
serviceID := GetServiceID()
params := RegisterParams{
ContainerName: containerName,
ServiceID: serviceID,
MetaData: map[string]interface{}{ // Add metadata
"version": "1.0",
"type": "fire_leave",
},
}
logger.Logger.Printf("Starting service registration: ContainerName=%s, ServiceID=%s", containerName, serviceID)
response, err := wsClient.sendRequest("register", params, true)
if err != nil {
logger.Logger.Printf("Service registration failed: %v", err)
return "", err
}
if response.Error != nil {
logger.Logger.Printf("Service registration error: Code=%d, Message=%s", response.Error.Code, response.Error.Message)
return "", fmt.Errorf("Service registration error: %s", response.Error.Message)
}
// Parse response
var resultStr string
if err := json.Unmarshal(response.Result, &resultStr); err != nil {
logger.Logger.Printf("Failed to parse service registration response: %v, Raw response: %s", err, string(response.Result))
return "", fmt.Errorf("Failed to parse service registration response: %v", err)
}
if resultStr != "ok" {
logger.Logger.Printf("Service registration failed: Expected result 'ok', got '%s'", resultStr)
return "", fmt.Errorf("Service registration failed: Server returned abnormal response")
}
wsClient.serviceID = serviceID
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
return serviceID, nil
}
// SubscribeTopic topic subscription
func SubscribeTopic(topic string, callback func([]byte)) error {
if wsClient == nil {
return fmt.Errorf("WebSocket client not initialized")
}
params := SubscribeParams{
Topic: topic,
}
logger.Logger.Printf("Starting topic subscription: Topic=%s", topic)
response, err := wsClient.sendRequest("subscribe", params, true)
if err != nil {
logger.Logger.Printf("Topic subscription failed: %v", err)
return err
}
if response.Error != nil {
logger.Logger.Printf("Topic subscription error: Code=%d, Message=%s", response.Error.Code, response.Error.Message)
return fmt.Errorf("Topic subscription error: %s", response.Error.Message)
}
wsClient.mutex.Lock()
wsClient.subscribers[topic] = callback
wsClient.mutex.Unlock()
logger.Logger.Printf("Topic subscription successful: Topic=%s", topic)
return nil
}
func (c *WSClient) SendRawMessage(message interface{}) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.connected {
return fmt.Errorf("WebSocket connection disconnected")
}
messageData, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("Message serialization failed: %v", err)
}
logger.Logger.Printf("Sending WebSocket message: %s", string(messageData))
err = c.Conn.WriteMessage(websocket.TextMessage, messageData)
if err != nil {
logger.Logger.Printf("WebSocket message sending failed: %v", err)
c.connected = false
go c.reconnect()
return fmt.Errorf("Message sending failed: %v", err)
}
return nil
}
// sendRequest sends request
func (c *WSClient) sendRequest(method string, params interface{}, waitResponse bool) (*WSMessage, error) {
c.mutex.Lock()
id := c.messageID
c.messageID++
c.mutex.Unlock()
message := WSMessage{
ID: id,
Method: method,
}
if params != nil {
paramsData, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("Parameter serialization failed: %v", err)
}
message.Params = paramsData
}
if waitResponse {
callback := make(chan *WSMessage, 1)
c.mutex.Lock()
c.callbacks[id] = callback
c.mutex.Unlock()
defer func() {
c.mutex.Lock()
delete(c.callbacks, id)
c.mutex.Unlock()
logger.Logger.Printf("Cleaning up callback channel: ID=%d", id)
}()
if err := c.SendRawMessage(message); err != nil {
return nil, err
}
logger.Logger.Printf("Waiting for response: ID=%d", id)
select {
case response := <-callback:
logger.Logger.Printf("Received response: ID=%d", id)
return response, nil
case <-time.After(30 * time.Second):
logger.Logger.Printf("Response timeout: ID=%d", id)
return nil, fmt.Errorf("Response timeout")
case <-c.closeChan:
logger.Logger.Printf("Connection closed, stopping response wait: ID=%d", id)
return nil, fmt.Errorf("Connection closed")
}
}
return nil, c.SendRawMessage(message)
}
// readLoop message reading loop
func (c *WSClient) readLoop() {
logger.Logger.Printf("Starting WebSocket message reading loop")
defer func() {
if r := recover(); r != nil {
logger.Logger.Printf("readLoop panic: %v", r)
}
// Important: do not close(closeChan)!
logger.Logger.Printf("WebSocket read loop exited, connection marked as disconnected")
}()
for {
_, messageData, err := c.Conn.ReadMessage()
if err != nil {
logger.Logger.Printf("WebSocket message reading failed: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
go c.reconnect() // Trigger reconnection
return
}
if string(messageData) == "pong" {
logger.Logger.Printf("Received pong response")
continue
}
logger.Logger.Printf("Received WebSocket message: %s", string(messageData))
var message WSMessage
if err := json.Unmarshal(messageData, &message); err != nil {
logger.Logger.Printf("WebSocket message parsing failed: %v", err)
continue
}
if message.ID > 0 {
c.mutex.Lock()
if callback, exists := c.callbacks[message.ID]; exists {
select {
case callback <- &message:
logger.Logger.Printf("Successfully sent message to callback channel: ID=%d", message.ID)
default:
logger.Logger.Printf("Callback channel full, discarding message: ID=%d", message.ID)
}
}
c.mutex.Unlock()
}
if message.Method == "publish" {
c.handlePublishMessage(message)
}
}
}
// handlePublishMessage handles publish messages
func (c *WSClient) handlePublishMessage(message WSMessage) {
logger.Logger.Printf("Processing publish message")
var publishParams PublishParams
if err := json.Unmarshal(message.Params, &publishParams); err != nil {
logger.Logger.Printf("Failed to parse publish message parameters: %v", err)
return
}
logger.Logger.Printf("WebSocket 收到 publish 消息 → Topic: %s | 原始数据: %s",
publishParams.Topic, string(publishParams.Data))
if publishParams.Topic != "" {
c.mutex.Lock()
callback, exists := c.subscribers[publishParams.Topic]
c.mutex.Unlock()
if exists && callback != nil {
callback(message.Params)
}
}
if publishParams.Data != nil {
var deviceData DeviceData
if err := json.Unmarshal(publishParams.Data, &deviceData); err != nil {
logger.Logger.Printf("Failed to parse DeviceData: %v", err)
return
}
// Check if DeviceUUID is valid
if deviceData.DeviceUUID == "" {
logger.Logger.Printf("Invalid DeviceData: DeviceUUID is empty, ignoring")
return
}
logger.Logger.Printf("Received valid DeviceData: DeviceUUID=%s", deviceData.DeviceUUID)
// 1. Save original configuration to service config file (using SaveSingleServiceConfig)
if err := SaveSingleServiceConfig(deviceData); err != nil {
logger.Logger.Printf("Failed to save service configuration: %v", err)
}
// 2. Process data and save to device data file
processedData := ProcessDeviceDataForInference(deviceData)
if err := UpdateDeviceData(processedData); err != nil {
logger.Logger.Printf("Failed to update DeviceData: %v", err)
return
}
select {
case ConfigUpdateChan <- processedData:
logger.Logger.Printf("New DeviceData sent to config update channel: DeviceUUID=%s", processedData.DeviceUUID)
default:
logger.Logger.Printf("Config update channel full, discarding message")
}
}
}
// heartbeatLoop heartbeat loop - fixed version
func (c *WSClient) heartbeatLoop() {
logger.Logger.Printf("Starting WebSocket heartbeat loop")
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mutex.Lock()
connected := c.connected
conn := c.Conn
c.mutex.Unlock()
if connected && conn != nil {
err := conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
logger.Logger.Printf("Failed to send ping: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
go c.reconnect()
} else {
logger.Logger.Printf("Ping sent successfully")
}
}
case <-c.closeChan:
logger.Logger.Printf("Heartbeat loop received close signal, exiting")
ticker.Stop() // Explicitly stop ticker
return
}
}
}
func (c *WSClient) reRegisterAndResubscribe() {
// 1. 等待连接稳定
time.Sleep(1 * time.Second)
// 2. 重新注册
containerName := "fireleave-container"
serviceID, err := RegisterService(containerName)
if err != nil {
logger.Logger.Printf("Failed to re-register service after reconnect: %v", err)
return
}
logger.Logger.Printf("Re-registration successful: ServiceID=%s", serviceID)
// 3. 重新订阅
for topic, callback := range c.subscribers {
if err := SubscribeTopic(topic, callback); err != nil {
logger.Logger.Printf("Failed to re-subscribe topic %s: %v", topic, err)
} else {
logger.Logger.Printf("Re-subscribed to topic: %s", topic)
}
}
}
// connect/wschannel.go → reconnect complete fix
func (c *WSClient) reconnect() {
if !c.reconnectFlag {
return
}
for i := 0; i < 12; i++ {
delay := time.Duration(1<<uint(i)) * time.Second
if delay > 30*time.Second {
delay = 30 * time.Second
}
time.Sleep(delay)
conn, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
logger.Logger.Printf("Reconnection attempt %d/12 failed: %v", i+1, err)
continue
}
c.mutex.Lock()
c.Conn = conn
c.connected = true
c.mutex.Unlock()
logger.Logger.Printf("WebSocket reconnection successful!")
// 关键:重连后必须先 register
go c.reRegisterAndResubscribe()
// 重启读写循环
go c.readLoop()
go c.heartbeatLoop()
return
}
logger.Logger.Printf("Reconnection failed after 12 attempts")
c.reconnectFlag = false
}
func (d *DeviceData) ToInferenceParams(port int) InferenceParams {
return InferenceParams{
DeviceUUID: d.DeviceUUID,
CameraRTSP: d.CameraRTSP,
CameraChannel: d.CameraChannel,
Confidence: d.Confidence,
DetectArea: ProcessDetectAreaForInference(d.DetectArea),
Port: port,
}
}
// SaveServiceConfig saves multiple device configurations to service config file
func SaveServiceConfig(deviceDataList []DeviceData) error {
configData, err := json.MarshalIndent(deviceDataList, "", " ")
if err != nil {
logger.Logger.Printf("Failed to serialize service configuration: %v", err)
return err
}
if err := os.WriteFile(serviceConfigFile, configData, 0644); err != nil {
logger.Logger.Printf("Failed to write service config file: %v", err)
return err
}
logger.Logger.Printf("Service configuration saved to: %s, %d devices", serviceConfigFile, len(deviceDataList))
return nil
}
// SaveSingleServiceConfig saves single device configuration (compatible with existing calls)
func SaveSingleServiceConfig(deviceData DeviceData) error {
// First read existing configuration
existingConfigs, err := LoadServiceConfig()
if err != nil {
// If reading fails, create new configuration list
existingConfigs = []DeviceData{deviceData}
} else {
// Find if configuration with same DeviceUUID already exists
found := false
for i, existing := range existingConfigs {
if existing.DeviceUUID == deviceData.DeviceUUID {
existingConfigs[i] = deviceData
found = true
break
}
}
// If not exists, add to list
if !found {
existingConfigs = append(existingConfigs, deviceData)
}
}
return SaveServiceConfig(existingConfigs)
}
// LoadServiceConfig loads multiple device data from service config file
func LoadServiceConfig() ([]DeviceData, error) {
if _, err := os.Stat(serviceConfigFile); os.IsNotExist(err) {
logger.Logger.Printf("Service config file does not exist: %s", serviceConfigFile)
return nil, fmt.Errorf("service config file does not exist")
}
data, err := os.ReadFile(serviceConfigFile)
if err != nil {
logger.Logger.Printf("Failed to read service config file: %v", err)
return nil, err
}
// Check if file content is empty
if len(data) == 0 {
logger.Logger.Printf("Service config file is empty")
return nil, fmt.Errorf("service config file is empty")
}
// Parse as DeviceData array
var deviceDataList []DeviceData
if err := json.Unmarshal(data, &deviceDataList); err != nil {
logger.Logger.Printf("Failed to parse service config file: %v", err)
return nil, err
}
// Filter out configurations with empty DeviceUUID
var validDeviceDataList []DeviceData
for _, deviceData := range deviceDataList {
if deviceData.DeviceUUID != "" {
validDeviceDataList = append(validDeviceDataList, deviceData)
}
}
const FixedUploadURL = "http://192.168.80.168/api/v1/upload/video"
for i := range validDeviceDataList {
if validDeviceDataList[i].UploadURL == "" {
validDeviceDataList[i].UploadURL = FixedUploadURL
logger.Logger.Printf("Auto-completed UploadURL: %s (DeviceUUID=%s)",
FixedUploadURL, validDeviceDataList[i].DeviceUUID)
}
}
if len(validDeviceDataList) == 0 {
logger.Logger.Printf("No valid device configurations found in service config file")
return nil, fmt.Errorf("no valid device configurations")
}
logger.Logger.Printf("Loaded %d device configurations from service config file", len(validDeviceDataList))
return validDeviceDataList, nil
}
// ProcessDetectAreaForInference processes detection area data, converts to format required by inference program
func ProcessDetectAreaForInference(areas []Area) string {
if len(areas) == 0 {
return ""
}
// Only take the first detection area
area := areas[0]
var points []string
for _, point := range area.Pts {
// Directly round, remove decimals
x := int(point.X)
y := int(point.Y)
points = append(points, fmt.Sprintf("%d,%d", x, y))
}
return strings.Join(points, ",")
}
func ProcessDeviceDataForInference(data DeviceData) DeviceData {
// Create a copy, only process fields that need conversion
processed := data
processed.DetectAreaStr = ProcessDetectAreaForInference(data.DetectArea)
return processed
}
func (c *WSClient) SendAsync(message interface{}) {
go func() {
ctx, cancel := context.WithTimeout(context.Background(), 8*time.Second)
defer cancel()
done := make(chan error, 1)
go func() {
done <- c.SendRawMessage(message)
}()
select {
case err := <-done:
if err != nil {
logger.Logger.Printf("Asynchronous reporting failed: %v", err)
} else {
logger.Logger.Printf("Asynchronous reporting successful")
}
case <-ctx.Done():
logger.Logger.Printf("Asynchronous reporting timeout (8 seconds no response), continuing business process")
}
}()
}