package httpreader import ( "FireLeave_tool/connect" "bytes" "context" "encoding/json" "fmt" "io" "net/http" "os" "sync" "time" "FireLeave_tool/logger" ) // InferenceResult 推理结果 type InferenceResult struct { Serial string `json:"serial"` At int64 `json:"at"` Type int `json:"type"` Params []struct { ClassIdx int `json:"class_idx"` Name string `json:"name"` Number int `json:"number"` } `json:"params"` } // InferenceServer 推理数据接收服务器 type InferenceServer struct { port int deviceUUID string server *http.Server dataChan chan InferenceResult mu sync.RWMutex running bool } // NewInferenceServer 创建推理数据接收服务器 func NewInferenceServer(deviceUUID string, port int) *InferenceServer { return &InferenceServer{ port: port, deviceUUID: deviceUUID, dataChan: make(chan InferenceResult, 100), // 缓冲通道,避免阻塞 running: false, } } // Start 启动HTTP服务器 func (is *InferenceServer) Start() error { is.mu.Lock() defer is.mu.Unlock() if is.running { return fmt.Errorf("服务器已在运行") } mux := http.NewServeMux() mux.HandleFunc("/video/post", is.handleVideoPost) mux.HandleFunc("/health", is.handleHealthCheck) is.server = &http.Server{ Addr: fmt.Sprintf(":%d", is.port), Handler: mux, ReadTimeout: 10 * time.Second, WriteTimeout: 10 * time.Second, } go func() { logger.Logger.Printf("Start the inference data receiving server (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port) if err := is.server.ListenAndServe(); err != nil && err != http.ErrServerClosed { logger.Logger.Printf("The inference data receiving server failed to start (DeviceUUID=%s): %v", is.deviceUUID, err) } }() // 等待服务器启动 time.Sleep(100 * time.Millisecond) is.running = true logger.Logger.Printf("The inference data receiving server has been successfully started (DeviceUUID=%s, Port=%d)", is.deviceUUID, is.port) return nil } // handleVideoPost 处理视频数据POST请求 func (is *InferenceServer) handleVideoPost(w http.ResponseWriter, r *http.Request) { startTime := time.Now() if r.Method != "POST" { http.Error(w, "Method not allowed", http.StatusMethodNotAllowed) return } // 读取原始 body bodyBytes, err := io.ReadAll(r.Body) if err != nil { logger.Logger.Printf("Read request body failed: %v", err) http.Error(w, "Bad request", http.StatusBadRequest) return } // 重新设置 body 用于解析 r.Body = io.NopCloser(bytes.NewBuffer(bodyBytes)) var result InferenceResult if err := json.NewDecoder(r.Body).Decode(&result); err != nil { logger.Logger.Printf("Parse JSON failed: %v", err) http.Error(w, `{"error":"Invalid JSON"}`, http.StatusBadRequest) return } // 验证必要字段 if result.Serial == "" { logger.Logger.Printf("Data validation failed: Serial is empty") http.Error(w, `{"error":"Missing serial field"}`, http.StatusBadRequest) return } now := time.Now().Format("2006/01/02 15:04:05.000") logger.Logger.Printf("♥ 推理心跳收到 | 时间: %s | UUID: %s | Serial: %s | 人数量: %d", now, is.deviceUUID, result.Serial, getPersonCount(result)) // 这一行就是你缺失的救命稻草! connect.UpdateInferenceHeartbeat(is.deviceUUID) // 额外写一份永不丢失的独立心跳日志 if f, err := os.OpenFile("/data/heartbeat.log", os.O_CREATE|os.O_WRONLY|os.O_APPEND, 0644); err == nil { defer f.Close() fmt.Fprintf(f, "[%s] HEARTBEAT OK | %s | person=%d\n", now, is.deviceUUID, getPersonCount(result)) } // 发送到处理通道 select { case is.dataChan <- result: w.Header().Set("Content-Type", "application/json") w.WriteHeader(http.StatusOK) w.Write([]byte(`{"status":"ok", "message":"data received"}`)) // 使用 startTime 记录处理时间 logger.Logger.Printf("Inference data processed in %v", time.Since(startTime)) default: logger.Logger.Printf("Data channel full, discard data") http.Error(w, `{"error":"Channel full"}`, http.StatusServiceUnavailable) } } func (is *InferenceServer) handleHealthCheck(w http.ResponseWriter, r *http.Request) { w.Header().Set("Content-Type", "application/json") is.mu.RLock() running := is.running is.mu.RUnlock() if running { w.WriteHeader(http.StatusOK) w.Write([]byte(`{ "status": "healthy", "device_uuid": "` + is.deviceUUID + `", "port": ` + fmt.Sprintf("%d", is.port) + `, "timestamp": "` + time.Now().Format(time.RFC3339) + `" }`)) } else { w.WriteHeader(http.StatusServiceUnavailable) w.Write([]byte(`{"status": "unavailable"}`)) } } func (is *InferenceServer) GetDataChan() <-chan InferenceResult { return is.dataChan } // StopWithContext 带上下文的停止方法 func (s *InferenceServer) StopWithContext(ctx context.Context) { if s.server != nil { // 使用带超时的关闭 shutdownCtx, cancel := context.WithTimeout(ctx, 5*time.Second) defer cancel() s.server.Shutdown(shutdownCtx) logger.Logger.Printf("The HTTP server has been shut down (DeviceUUID=%s)", s.deviceUUID) } } func getPersonCount(result InferenceResult) int { for _, p := range result.Params { if p.ClassIdx == 1 { return p.Number } } return 0 }