2025-11-24 18:06:00 +08:00

481 lines
11 KiB
Go

package connect
import (
"Dynamic_environmental_detection/logger"
"encoding/json"
"fmt"
"github.com/gorilla/websocket"
"sync"
"time"
)
// Global variables
var (
wsClient *WSClient
wsMutex sync.RWMutex
isConnecting bool
wsURL = "ws://172.17.0.1:18080/ws"
)
type WSMessage struct {
ID int64 `json:"id,omitempty"`
Method string `json:"method"`
Params json.RawMessage `json:"params,omitempty"`
Result json.RawMessage `json:"result,omitempty"`
Error *WSError `json:"error,omitempty"`
}
type WSError struct {
Code int `json:"code"`
Message string `json:"message"`
}
type RegisterParams struct {
ServiceID string `json:"service_id"`
MetaData map[string]interface{} `json:"meta_data,omitempty"`
ContainerName string `json:"container_name,omitempty"`
}
type SubscribeParams struct {
Topic string `json:"topic"`
}
type MetricDataParams struct {
RouteKey string `json:"route_key"`
Metric interface{} `json:"metric"`
}
type PublishParams struct {
Topic string `json:"topic"`
Data json.RawMessage `json:"data,omitempty"`
}
type WSClient struct {
Conn *websocket.Conn
url string
serviceID string
metaData map[string]interface{}
containerName string
subscribers map[string]func(*PublishParams)
connected bool
mutex sync.Mutex
messageID int64
callbacks map[int64]chan *WSMessage
closeChan chan struct{}
reconnectFlag bool
}
// ==================== Public API ====================
//// GetWSClient returns the global WebSocket client instance
//func GetWSClient() *WSClient {
// wsMutex.RLock()
// defer wsMutex.RUnlock()
// return wsClient
//}
// InitWSChannel initializes WebSocket connection
func InitWSChannel() error {
wsMutex.Lock()
defer wsMutex.Unlock()
if wsClient != nil && wsClient.connected {
logger.Logger.Printf("WebSocket connection already exists")
return nil
}
if isConnecting {
return fmt.Errorf("WebSocket connection is being established")
}
isConnecting = true
defer func() { isConnecting = false }()
logger.Logger.Printf("Starting WebSocket connection initialization: %s", wsURL)
client, err := NewWSClient(wsURL)
if err != nil {
logger.Logger.Printf("Failed to create WebSocket client: %v", err)
return err
}
wsClient = client
logger.Logger.Printf("WebSocket connection initialized successfully")
return nil
}
// GenerateServiceID generates a unique service ID
func GenerateServiceID() string {
return fmt.Sprintf("fireleave-service-%d", time.Now().UnixNano())
}
// RegisterService registers the service with the server
func RegisterService(serviceID string, metaData map[string]interface{}, containerName string) error {
if wsClient == nil {
return fmt.Errorf("WebSocket client not initialized")
}
// Store registration info for reconnection
wsClient.serviceID = serviceID
wsClient.metaData = metaData
wsClient.containerName = containerName
params := RegisterParams{
ServiceID: serviceID,
MetaData: metaData,
ContainerName: containerName,
}
logger.Logger.Printf("Starting service registration: ServiceID=%s", serviceID)
response, err := wsClient.sendRequest("register", params, true)
if err != nil {
return fmt.Errorf("service registration failed: %v", err)
}
if response.Error != nil {
return fmt.Errorf("service registration error: %s", response.Error.Message)
}
var resultStr string
if err := json.Unmarshal(response.Result, &resultStr); err != nil {
return fmt.Errorf("failed to parse registration response: %v", err)
}
if resultStr != "ok" {
return fmt.Errorf("service registration failed: server returned %s", resultStr)
}
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
return nil
}
// SubscribeTopic subscribes to a topic
func SubscribeTopic(topic string, callback func(*PublishParams)) error {
if wsClient == nil {
return fmt.Errorf("WebSocket client not initialized")
}
params := SubscribeParams{Topic: topic}
logger.Logger.Printf("Starting topic subscription: Topic=%s", topic)
response, err := wsClient.sendRequest("subscribe", params, true)
if err != nil {
return fmt.Errorf("topic subscription failed: %v", err)
}
if response.Error != nil {
return fmt.Errorf("topic subscription error: %s", response.Error.Message)
}
wsClient.mutex.Lock()
wsClient.subscribers[topic] = callback
wsClient.mutex.Unlock()
logger.Logger.Printf("Topic subscription successful: Topic=%s", topic)
return nil
}
// ReportMetricData reports metric data to the server (no response expected)
func ReportMetricData(routeKey string, metric interface{}) error {
if wsClient == nil {
return fmt.Errorf("WebSocket client not initialized")
}
params := MetricDataParams{
RouteKey: routeKey,
Metric: metric,
}
// metric_data method doesn't expect a response
_, err := wsClient.sendRequest("metric_data", params, false)
if err != nil {
return fmt.Errorf("metric data reporting failed: %v", err)
}
logger.Logger.Printf("Metric data reported successfully: RouteKey=%s", routeKey)
return nil
}
// Close closes the WebSocket connection
func Close() {
if wsClient != nil {
wsClient.close()
}
}
// ==================== Internal Methods ====================
// NewWSClient creates a new WebSocket client
func NewWSClient(url string) (*WSClient, error) {
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
if err != nil {
return nil, err
}
client := &WSClient{
Conn: conn,
url: url,
connected: true,
messageID: 1,
callbacks: make(map[int64]chan *WSMessage),
subscribers: make(map[string]func(*PublishParams)),
closeChan: make(chan struct{}),
reconnectFlag: true,
}
go client.readLoop()
go client.heartbeatLoop()
return client, nil
}
func (c *WSClient) IsConnected() bool {
c.mutex.Lock()
defer c.mutex.Unlock()
return c.connected
}
func (c *WSClient) sendRequest(method string, params interface{}, waitResponse bool) (*WSMessage, error) {
c.mutex.Lock()
id := c.messageID
c.messageID++
c.mutex.Unlock()
message := WSMessage{
ID: id,
Method: method,
}
if params != nil {
paramsData, err := json.Marshal(params)
if err != nil {
return nil, fmt.Errorf("parameter serialization failed: %v", err)
}
message.Params = paramsData
}
if !waitResponse {
return nil, c.sendRawMessage(message)
}
callback := make(chan *WSMessage, 1)
c.mutex.Lock()
c.callbacks[id] = callback
c.mutex.Unlock()
defer func() {
c.mutex.Lock()
delete(c.callbacks, id)
c.mutex.Unlock()
}()
if err := c.sendRawMessage(message); err != nil {
return nil, err
}
select {
case response := <-callback:
return response, nil
case <-time.After(30 * time.Second):
return nil, fmt.Errorf("response timeout")
case <-c.closeChan:
return nil, fmt.Errorf("connection closed")
}
}
func (c *WSClient) sendRawMessage(message interface{}) error {
c.mutex.Lock()
defer c.mutex.Unlock()
if !c.connected {
return fmt.Errorf("WebSocket connection disconnected")
}
messageData, err := json.Marshal(message)
if err != nil {
return fmt.Errorf("message serialization failed: %v", err)
}
err = c.Conn.WriteMessage(websocket.TextMessage, messageData)
if err != nil {
logger.Logger.Printf("WebSocket message sending failed: %v", err)
c.connected = false
go c.reconnect()
return fmt.Errorf("message sending failed: %v", err)
}
return nil
}
func (c *WSClient) readLoop() {
defer func() {
if r := recover(); r != nil {
logger.Logger.Printf("readLoop panic: %v", r)
}
}()
for {
_, messageData, err := c.Conn.ReadMessage()
if err != nil {
logger.Logger.Printf("WebSocket message reading failed: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
go c.reconnect()
return
}
if string(messageData) == "pong" {
continue
}
var message WSMessage
if err := json.Unmarshal(messageData, &message); err != nil {
logger.Logger.Printf("WebSocket message parsing failed: %v", err)
continue
}
// Handle callbacks for request-response messages
if message.ID > 0 {
c.mutex.Lock()
if callback, exists := c.callbacks[message.ID]; exists {
select {
case callback <- &message:
// Message delivered to callback
default:
logger.Logger.Printf("Callback channel full, discarding message: ID=%d", message.ID)
}
}
c.mutex.Unlock()
}
// Handle publish messages (broadcasts)
if message.Method == "publish" {
c.handlePublishMessage(message)
}
}
}
func (c *WSClient) handlePublishMessage(message WSMessage) {
var publishParams PublishParams
if err := json.Unmarshal(message.Params, &publishParams); err != nil {
logger.Logger.Printf("Failed to parse publish message parameters: %v", err)
return
}
c.mutex.Lock()
callback, exists := c.subscribers[publishParams.Topic]
c.mutex.Unlock()
if exists && callback != nil {
callback(&publishParams)
}
}
func (c *WSClient) heartbeatLoop() {
ticker := time.NewTicker(25 * time.Second)
defer ticker.Stop()
for {
select {
case <-ticker.C:
c.mutex.Lock()
connected := c.connected
conn := c.Conn
c.mutex.Unlock()
if connected && conn != nil {
err := conn.WriteMessage(websocket.PingMessage, []byte{})
if err != nil {
logger.Logger.Printf("Failed to send ping: %v", err)
c.mutex.Lock()
c.connected = false
c.mutex.Unlock()
go c.reconnect()
}
}
case <-c.closeChan:
return
}
}
}
func (c *WSClient) reconnect() {
if !c.reconnectFlag {
return
}
for i := 0; i < 12; i++ {
delay := time.Duration(1<<uint(i)) * time.Second
if delay > 30*time.Second {
delay = 30 * time.Second
}
time.Sleep(delay)
conn, _, err := websocket.DefaultDialer.Dial(c.url, nil)
if err != nil {
logger.Logger.Printf("Reconnection attempt %d/12 failed: %v", i+1, err)
continue
}
c.mutex.Lock()
c.Conn = conn
c.connected = true
c.mutex.Unlock()
logger.Logger.Printf("WebSocket reconnection successful!")
// Auto re-register and re-subscribe after reconnection
c.autoReRegister()
return
}
logger.Logger.Printf("Reconnection failed after 12 attempts")
c.reconnectFlag = false
}
func (c *WSClient) autoReRegister() {
// Re-register service if we have the registration info
if c.serviceID != "" {
time.Sleep(1 * time.Second) // Wait for connection to stabilize
params := RegisterParams{
ServiceID: c.serviceID,
MetaData: c.metaData,
ContainerName: c.containerName,
}
if _, err := c.sendRequest("register", params, true); err != nil {
logger.Logger.Printf("Auto re-registration failed: %v", err)
} else {
logger.Logger.Printf("Auto re-registration successful")
}
// Re-subscribe to all topics
c.mutex.Lock()
subscribers := make(map[string]func(*PublishParams))
for topic, callback := range c.subscribers {
subscribers[topic] = callback
}
c.mutex.Unlock()
for topic, callback := range subscribers {
if err := SubscribeTopic(topic, callback); err != nil {
logger.Logger.Printf("Failed to re-subscribe topic %s: %v", topic, err)
}
}
}
}
func (c *WSClient) close() {
c.mutex.Lock()
defer c.mutex.Unlock()
c.reconnectFlag = false
c.connected = false
close(c.closeChan)
if c.Conn != nil {
c.Conn.Close()
}
}