486 lines
12 KiB
Go
486 lines
12 KiB
Go
package connect
|
|
|
|
import (
|
|
"Dynamic_environmental_detection/logger"
|
|
"encoding/json"
|
|
"fmt"
|
|
"github.com/gorilla/websocket"
|
|
"sync"
|
|
"time"
|
|
)
|
|
|
|
// Global variables
|
|
var (
|
|
wsClient *WSClient
|
|
wsMutex sync.RWMutex
|
|
isConnecting bool
|
|
wsURL = "ws://172.17.0.1:18080/ws"
|
|
)
|
|
|
|
type AlertMessage struct {
|
|
UUID string `json:"uuid"`
|
|
Topic string `json:"topic"`
|
|
QoS int `json:"qos"`
|
|
Content string `json:"content"`
|
|
Timeout int `json:"timeout"`
|
|
}
|
|
type AlertContent struct {
|
|
Type int `json:"type"` // 1=蜂鸣器告警
|
|
BuzzerDuration int `json:"buzzer_duration"` // 响多久(秒)
|
|
}
|
|
type WSMessage struct {
|
|
ID int64 `json:"id,omitempty"`
|
|
Method string `json:"method"`
|
|
Params json.RawMessage `json:"params,omitempty"`
|
|
Result json.RawMessage `json:"result,omitempty"`
|
|
Error *WSError `json:"error,omitempty"`
|
|
}
|
|
|
|
type WSError struct {
|
|
Code int `json:"code"`
|
|
Message string `json:"message"`
|
|
}
|
|
|
|
type RegisterParams struct {
|
|
ServiceID string `json:"service_id"`
|
|
MetaData map[string]interface{} `json:"meta_data,omitempty"`
|
|
ContainerName string `json:"container_name,omitempty"`
|
|
}
|
|
|
|
type SubscribeParams struct {
|
|
Topic string `json:"topic"`
|
|
}
|
|
|
|
type MetricDataParams struct {
|
|
RouteKey string `json:"route_key"`
|
|
Metric interface{} `json:"metric"`
|
|
}
|
|
|
|
type PublishParams struct {
|
|
Topic string `json:"topic"`
|
|
Data json.RawMessage `json:"data,omitempty"`
|
|
Content json.RawMessage `json:"content,omitempty"`
|
|
}
|
|
|
|
type WSClient struct {
|
|
Conn *websocket.Conn
|
|
url string
|
|
serviceID string
|
|
metaData map[string]interface{}
|
|
containerName string
|
|
subscribers map[string]func(*PublishParams)
|
|
connected bool
|
|
mutex sync.Mutex
|
|
messageID int64
|
|
callbacks map[int64]chan *WSMessage
|
|
closeChan chan struct{}
|
|
reconnectFlag bool
|
|
}
|
|
|
|
// InitWSChannel initializes WebSocket connection
|
|
func InitWSChannel() error {
|
|
wsMutex.Lock()
|
|
defer wsMutex.Unlock()
|
|
|
|
if wsClient != nil && wsClient.connected {
|
|
logger.Logger.Printf("WebSocket connection already exists")
|
|
return nil
|
|
}
|
|
|
|
if isConnecting {
|
|
return fmt.Errorf("WebSocket connection is being established")
|
|
}
|
|
|
|
isConnecting = true
|
|
defer func() { isConnecting = false }()
|
|
|
|
logger.Logger.Printf("Starting WebSocket connection initialization: %s", wsURL)
|
|
|
|
client, err := NewWSClient(wsURL)
|
|
if err != nil {
|
|
logger.Logger.Printf("Failed to create WebSocket client: %v", err)
|
|
return err
|
|
}
|
|
|
|
wsClient = client
|
|
logger.Logger.Printf("WebSocket connection initialized successfully")
|
|
return nil
|
|
}
|
|
|
|
// GenerateServiceID generates a unique service ID
|
|
func GenerateServiceID() string {
|
|
return fmt.Sprintf("fireleave-service-%d", time.Now().UnixNano())
|
|
}
|
|
|
|
// RegisterService registers the service with the server
|
|
func RegisterService(serviceID string, metaData map[string]interface{}, containerName string) error {
|
|
if wsClient == nil {
|
|
return fmt.Errorf("WebSocket client not initialized")
|
|
}
|
|
|
|
// Store registration info for reconnection
|
|
wsClient.serviceID = serviceID
|
|
wsClient.metaData = metaData
|
|
wsClient.containerName = containerName
|
|
|
|
params := RegisterParams{
|
|
ServiceID: serviceID,
|
|
MetaData: metaData,
|
|
ContainerName: containerName,
|
|
}
|
|
|
|
logger.Logger.Printf("Starting service registration: ServiceID=%s", serviceID)
|
|
response, err := wsClient.sendRequest("register", params, true)
|
|
if err != nil {
|
|
return fmt.Errorf("service registration failed: %v", err)
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return fmt.Errorf("service registration error: %s", response.Error.Message)
|
|
}
|
|
|
|
var resultStr string
|
|
if err := json.Unmarshal(response.Result, &resultStr); err != nil {
|
|
return fmt.Errorf("failed to parse registration response: %v", err)
|
|
}
|
|
|
|
if resultStr != "ok" {
|
|
return fmt.Errorf("service registration failed: server returned %s", resultStr)
|
|
}
|
|
|
|
logger.Logger.Printf("Service registration successful: ServiceID=%s", serviceID)
|
|
return nil
|
|
}
|
|
|
|
// SubscribeTopic subscribes to a topic
|
|
func SubscribeTopic(topic string, callback func(*PublishParams)) error {
|
|
if wsClient == nil {
|
|
return fmt.Errorf("WebSocket client not initialized")
|
|
}
|
|
|
|
params := SubscribeParams{Topic: topic}
|
|
|
|
logger.Logger.Printf("Starting topic subscription: Topic=%s", topic)
|
|
response, err := wsClient.sendRequest("subscribe", params, true)
|
|
if err != nil {
|
|
return fmt.Errorf("topic subscription failed: %v", err)
|
|
}
|
|
|
|
if response.Error != nil {
|
|
return fmt.Errorf("topic subscription error: %s", response.Error.Message)
|
|
}
|
|
|
|
wsClient.mutex.Lock()
|
|
wsClient.subscribers[topic] = callback
|
|
wsClient.mutex.Unlock()
|
|
|
|
logger.Logger.Printf("Topic subscription successful: Topic=%s", topic)
|
|
return nil
|
|
}
|
|
|
|
// ReportMetricData reports metric data to the server (no response expected)
|
|
func ReportMetricData(routeKey string, metric interface{}) error {
|
|
if wsClient == nil {
|
|
return fmt.Errorf("WebSocket client not initialized")
|
|
}
|
|
|
|
params := MetricDataParams{
|
|
RouteKey: routeKey,
|
|
Metric: metric,
|
|
}
|
|
|
|
// metric_data method doesn't expect a response
|
|
_, err := wsClient.sendRequest("metric_data", params, false)
|
|
if err != nil {
|
|
return fmt.Errorf("metric data reporting failed: %v", err)
|
|
}
|
|
|
|
logger.Logger.Printf("Metric data reported successfully: RouteKey=%s", routeKey)
|
|
return nil
|
|
}
|
|
|
|
// Close closes the WebSocket connection
|
|
func Close() {
|
|
if wsClient != nil {
|
|
wsClient.close()
|
|
}
|
|
}
|
|
|
|
// ==================== Internal Methods ====================
|
|
|
|
// NewWSClient creates a new WebSocket client
|
|
func NewWSClient(url string) (*WSClient, error) {
|
|
conn, _, err := websocket.DefaultDialer.Dial(url, nil)
|
|
if err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
client := &WSClient{
|
|
Conn: conn,
|
|
url: url,
|
|
connected: true,
|
|
messageID: 1,
|
|
callbacks: make(map[int64]chan *WSMessage),
|
|
subscribers: make(map[string]func(*PublishParams)),
|
|
closeChan: make(chan struct{}),
|
|
reconnectFlag: true,
|
|
}
|
|
|
|
go client.readLoop()
|
|
go client.heartbeatLoop()
|
|
return client, nil
|
|
}
|
|
|
|
func (c *WSClient) IsConnected() bool {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
return c.connected
|
|
}
|
|
|
|
func (c *WSClient) sendRequest(method string, params interface{}, waitResponse bool) (*WSMessage, error) {
|
|
c.mutex.Lock()
|
|
id := c.messageID
|
|
c.messageID++
|
|
c.mutex.Unlock()
|
|
|
|
message := WSMessage{
|
|
ID: id,
|
|
Method: method,
|
|
}
|
|
|
|
if params != nil {
|
|
paramsData, err := json.Marshal(params)
|
|
if err != nil {
|
|
return nil, fmt.Errorf("parameter serialization failed: %v", err)
|
|
}
|
|
message.Params = paramsData
|
|
}
|
|
|
|
if !waitResponse {
|
|
return nil, c.sendRawMessage(message)
|
|
}
|
|
|
|
callback := make(chan *WSMessage, 1)
|
|
c.mutex.Lock()
|
|
c.callbacks[id] = callback
|
|
c.mutex.Unlock()
|
|
|
|
defer func() {
|
|
c.mutex.Lock()
|
|
delete(c.callbacks, id)
|
|
c.mutex.Unlock()
|
|
}()
|
|
|
|
if err := c.sendRawMessage(message); err != nil {
|
|
return nil, err
|
|
}
|
|
|
|
select {
|
|
case response := <-callback:
|
|
return response, nil
|
|
case <-time.After(30 * time.Second):
|
|
return nil, fmt.Errorf("response timeout")
|
|
case <-c.closeChan:
|
|
return nil, fmt.Errorf("connection closed")
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) sendRawMessage(message interface{}) error {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
if !c.connected {
|
|
return fmt.Errorf("WebSocket connection disconnected")
|
|
}
|
|
|
|
messageData, err := json.Marshal(message)
|
|
if err != nil {
|
|
return fmt.Errorf("message serialization failed: %v", err)
|
|
}
|
|
|
|
err = c.Conn.WriteMessage(websocket.TextMessage, messageData)
|
|
if err != nil {
|
|
logger.Logger.Printf("WebSocket message sending failed: %v", err)
|
|
c.connected = false
|
|
go c.reconnect()
|
|
return fmt.Errorf("message sending failed: %v", err)
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *WSClient) readLoop() {
|
|
defer func() {
|
|
if r := recover(); r != nil {
|
|
logger.Logger.Printf("readLoop panic: %v", r)
|
|
}
|
|
}()
|
|
|
|
for {
|
|
_, messageData, err := c.Conn.ReadMessage()
|
|
if err != nil {
|
|
logger.Logger.Printf("WebSocket message reading failed: %v", err)
|
|
c.mutex.Lock()
|
|
c.connected = false
|
|
c.mutex.Unlock()
|
|
go c.reconnect()
|
|
return
|
|
}
|
|
|
|
if string(messageData) == "pong" {
|
|
continue
|
|
}
|
|
|
|
var message WSMessage
|
|
if err := json.Unmarshal(messageData, &message); err != nil {
|
|
logger.Logger.Printf("WebSocket message parsing failed: %v", err)
|
|
continue
|
|
}
|
|
|
|
// Handle callbacks for request-response messages
|
|
if message.ID > 0 {
|
|
c.mutex.Lock()
|
|
if callback, exists := c.callbacks[message.ID]; exists {
|
|
select {
|
|
case callback <- &message:
|
|
// Message delivered to callback
|
|
default:
|
|
logger.Logger.Printf("Callback channel full, discarding message: ID=%d", message.ID)
|
|
}
|
|
}
|
|
c.mutex.Unlock()
|
|
}
|
|
|
|
// Handle publish messages (broadcasts)
|
|
if message.Method == "publish" {
|
|
c.handlePublishMessage(message)
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) handlePublishMessage(message WSMessage) {
|
|
var publishParams PublishParams
|
|
if err := json.Unmarshal(message.Params, &publishParams); err != nil {
|
|
logger.Logger.Printf("Failed to parse publish message parameters: %v", err)
|
|
return
|
|
}
|
|
// 加这一行!万能抓包,永不漏!
|
|
logger.Logger.Printf("WebSocket 收到 publish 消息 → Topic: %s | 原始数据: %s",
|
|
publishParams.Topic, string(publishParams.Data))
|
|
c.mutex.Lock()
|
|
callback, exists := c.subscribers[publishParams.Topic]
|
|
c.mutex.Unlock()
|
|
|
|
if exists && callback != nil {
|
|
callback(&publishParams)
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) heartbeatLoop() {
|
|
ticker := time.NewTicker(25 * time.Second)
|
|
defer ticker.Stop()
|
|
|
|
for {
|
|
select {
|
|
case <-ticker.C:
|
|
c.mutex.Lock()
|
|
connected := c.connected
|
|
conn := c.Conn
|
|
c.mutex.Unlock()
|
|
|
|
if connected && conn != nil {
|
|
err := conn.WriteMessage(websocket.PingMessage, []byte{})
|
|
if err != nil {
|
|
logger.Logger.Printf("Failed to send ping: %v", err)
|
|
c.mutex.Lock()
|
|
c.connected = false
|
|
c.mutex.Unlock()
|
|
go c.reconnect()
|
|
}
|
|
}
|
|
case <-c.closeChan:
|
|
return
|
|
}
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) reconnect() {
|
|
if !c.reconnectFlag {
|
|
return
|
|
}
|
|
|
|
for i := 0; i < 12; i++ {
|
|
delay := time.Duration(1<<uint(i)) * time.Second
|
|
if delay > 30*time.Second {
|
|
delay = 30 * time.Second
|
|
}
|
|
time.Sleep(delay)
|
|
|
|
conn, _, err := websocket.DefaultDialer.Dial(c.url, nil)
|
|
if err != nil {
|
|
logger.Logger.Printf("Reconnection attempt %d/12 failed: %v", i+1, err)
|
|
continue
|
|
}
|
|
|
|
c.mutex.Lock()
|
|
c.Conn = conn
|
|
c.connected = true
|
|
c.mutex.Unlock()
|
|
|
|
logger.Logger.Printf("WebSocket reconnection successful!")
|
|
|
|
// Auto re-register and re-subscribe after reconnection
|
|
c.autoReRegister()
|
|
return
|
|
}
|
|
|
|
logger.Logger.Printf("Reconnection failed after 12 attempts")
|
|
c.reconnectFlag = false
|
|
}
|
|
|
|
func (c *WSClient) autoReRegister() {
|
|
// Re-register service if we have the registration info
|
|
if c.serviceID != "" {
|
|
time.Sleep(1 * time.Second) // Wait for connection to stabilize
|
|
|
|
params := RegisterParams{
|
|
ServiceID: c.serviceID,
|
|
MetaData: c.metaData,
|
|
ContainerName: c.containerName,
|
|
}
|
|
|
|
if _, err := c.sendRequest("register", params, true); err != nil {
|
|
logger.Logger.Printf("Auto re-registration failed: %v", err)
|
|
} else {
|
|
logger.Logger.Printf("Auto re-registration successful")
|
|
}
|
|
|
|
// Re-subscribe to all topics
|
|
c.mutex.Lock()
|
|
subscribers := make(map[string]func(*PublishParams))
|
|
for topic, callback := range c.subscribers {
|
|
subscribers[topic] = callback
|
|
}
|
|
c.mutex.Unlock()
|
|
|
|
//for topic, callback := range subscribers {
|
|
// if err := SubscribeTopic(topic, callback); err != nil {
|
|
// logger.Logger.Printf("Failed to re-subscribe topic %s: %v", topic, err)
|
|
// }
|
|
//}
|
|
}
|
|
}
|
|
|
|
func (c *WSClient) close() {
|
|
c.mutex.Lock()
|
|
defer c.mutex.Unlock()
|
|
|
|
c.reconnectFlag = false
|
|
c.connected = false
|
|
close(c.closeChan)
|
|
|
|
if c.Conn != nil {
|
|
c.Conn.Close()
|
|
}
|
|
}
|