189 lines
5.1 KiB
Go
189 lines
5.1 KiB
Go
package httpreader
|
|
|
|
import (
|
|
"FireLeave_tool/connect"
|
|
"bytes"
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"net/http"
|
|
"os"
|
|
"sync"
|
|
"time"
|
|
|
|
"FireLeave_tool/logger"
|
|
)
|
|
|
|
// InferenceResult 推理结果
|
|
type InferenceResult struct {
|
|
Serial string `json:"serial"`
|
|
At int64 `json:"at"`
|
|
Type int `json:"type"`
|
|
Params []struct {
|
|
ClassIdx int `json:"class_idx"`
|
|
Name string `json:"name"`
|
|
Number int `json:"number"`
|
|
} `json:"params"`
|
|
}
|
|
|
|
// InferenceServer 推理数据接收服务器
|
|
type InferenceServer struct {
|
|
port int
|
|
deviceUUID string
|
|
server *http.Server
|
|
dataChan chan InferenceResult
|
|
mu sync.RWMutex
|
|
running bool
|
|
}
|
|
|
|
// NewInferenceServer 创建推理数据接收服务器
|
|
func NewInferenceServer(deviceUUID string, port int) *InferenceServer {
|
|
return &InferenceServer{
|
|
port: port,
|
|
deviceUUID: deviceUUID,
|
|
dataChan: make(chan InferenceResult, 100), // 缓冲通道,避免阻塞
|
|
running: false,
|
|
}
|
|
}
|
|
|
|
// Start 启动HTTP服务器
|
|
func (is *InferenceServer) Start() error {
|
|
is.mu.Lock()
|
|
defer is.mu.Unlock()
|
|
|
|
if is.running {
|
|
return fmt.Errorf("服务器已在运行")
|
|
}
|
|
|
|
mux := http.NewServeMux()
|
|
mux.HandleFunc("/video/post", is.handleVideoPost)
|
|
mux.HandleFunc("/health", is.handleHealthCheck)
|
|
|
|
is.server = &http.Server{
|
|
Addr: fmt.Sprintf(":%d", is.port),
|
|
Handler: mux,
|
|
ReadTimeout: 10 * time.Second,
|
|
WriteTimeout: 10 * time.Second,
|
|
}
|
|
|
|
go func() {
|
|
logger.Logger.Printf("Start the inference data receiving server (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port)
|
|
if err := is.server.ListenAndServe(); err != nil && err != http.ErrServerClosed {
|
|
logger.Logger.Printf("The inference data receiving server failed to start (DeviceUUID=%s): %v", is.deviceUUID, err)
|
|
}
|
|
}()
|
|
|
|
// 等待服务器启动
|
|
time.Sleep(100 * time.Millisecond)
|
|
is.running = true
|
|
|
|
logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port)
|
|
return nil
|
|
}
|
|
|
|
// handleVideoPost 处理视频数据POST请求
|
|
func (is *InferenceServer) handleVideoPost(w http.ResponseWriter, r *http.Request) {
|
|
startTime := time.Now()
|
|
|
|
if r.Method != "POST" {
|
|
http.Error(w, "Method not allowed", http.StatusMethodNotAllowed)
|
|
return
|
|
}
|
|
|
|
// 读取原始 body
|
|
bodyBytes, err := io.ReadAll(r.Body)
|
|
if err != nil {
|
|
logger.Logger.Printf("Read request body failed: %v", err)
|
|
http.Error(w, "Bad request", http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 重新设置 body 用于解析
|
|
r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes))
|
|
|
|
var result InferenceResult
|
|
if err := json.NewDecoder(r.Body).Decode(&result); err != nil {
|
|
logger.Logger.Printf("Parse JSON failed: %v", err)
|
|
http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest)
|
|
return
|
|
}
|
|
|
|
// 验证必要字段
|
|
if result.Serial == "" {
|
|
logger.Logger.Printf("Data validation failed: Serial is empty")
|
|
http.Error(w, `{"error":"Missing serial field"}`, http.StatusBadRequest)
|
|
return
|
|
}
|
|
now := time.Now().Format("2006/01/02 15:04:05.000")
|
|
logger.Logger.Printf("♥ 推理心跳收到 | 时间: %s | UUID: %s | Serial: %s | 人数量: %d",
|
|
now, is.deviceUUID, result.Serial, getPersonCount(result))
|
|
|
|
// 这一行就是你缺失的救命稻草!
|
|
connect.UpdateInferenceHeartbeat(is.deviceUUID)
|
|
|
|
// 额外写一份永不丢失的独立心跳日志
|
|
if f, err := os.OpenFile("/data/heartbeat.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644); err == nil {
|
|
defer f.Close()
|
|
fmt.Fprintf(f, "[%s] HEARTBEAT OK | %s | person=%d\n", now, is.deviceUUID, getPersonCount(result))
|
|
}
|
|
|
|
// 发送到处理通道
|
|
select {
|
|
case is.dataChan <- result:
|
|
w.Header().Set("Content-Type", "application/json")
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte(`{"status":"ok", "message":"data received"}`))
|
|
// 使用 startTime 记录处理时间
|
|
logger.Logger.Printf("Inference data processed in %v", time.Since(startTime))
|
|
default:
|
|
logger.Logger.Printf("Data channel full, discard data")
|
|
http.Error(w, `{"error":"Channel full"}`, http.StatusServiceUnavailable)
|
|
}
|
|
}
|
|
|
|
func (is *InferenceServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) {
|
|
w.Header().Set("Content-Type", "application/json")
|
|
|
|
is.mu.RLock()
|
|
running := is.running
|
|
is.mu.RUnlock()
|
|
|
|
if running {
|
|
w.WriteHeader(http.StatusOK)
|
|
w.Write([]byte(`{
|
|
"status": "healthy",
|
|
"device_uuid": "` + is.deviceUUID + `",
|
|
"port": ` + fmt.Sprintf("%d", is.port) + `,
|
|
"timestamp": "` + time.Now().Format(time.RFC3339) + `"
|
|
}`))
|
|
} else {
|
|
w.WriteHeader(http.StatusServiceUnavailable)
|
|
w.Write([]byte(`{"status": "unavailable"}`))
|
|
}
|
|
}
|
|
|
|
func (is *InferenceServer) GetDataChan() <-chan InferenceResult {
|
|
return is.dataChan
|
|
}
|
|
|
|
// StopWithContext 带上下文的停止方法
|
|
func (s *InferenceServer) StopWithContext(ctx context.Context) {
|
|
if s.server != nil {
|
|
// 使用带超时的关闭
|
|
shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second)
|
|
defer cancel()
|
|
|
|
s.server.Shutdown(shutdownCtx)
|
|
logger.Logger.Printf("The HTTP server has been shut down (DeviceUUID=%s)", s.deviceUUID)
|
|
}
|
|
}
|
|
func getPersonCount(result InferenceResult) int {
|
|
for _, p := range result.Params {
|
|
if p.ClassIdx == 1 {
|
|
return p.Number
|
|
}
|
|
}
|
|
return 0
|
|
}
|