commit b1a036936210bcec86b9a32f4602bd328b66d7e0 Author: gqc Date: Fri Apr 17 16:23:12 2026 +0800 初始提交 diff --git a/.idea/.gitignore b/.idea/.gitignore new file mode 100644 index 0000000..35410ca --- /dev/null +++ b/.idea/.gitignore @@ -0,0 +1,8 @@ +# 默认忽略的文件 +/shelf/ +/workspace.xml +# 基于编辑器的 HTTP 客户端请求 +/httpRequests/ +# Datasource local storage ignored files +/dataSources/ +/dataSources.local.xml diff --git a/.idea/PTset.iml b/.idea/PTset.iml new file mode 100644 index 0000000..e814c35 --- /dev/null +++ b/.idea/PTset.iml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/Project_Default.xml b/.idea/inspectionProfiles/Project_Default.xml new file mode 100644 index 0000000..d8cad39 --- /dev/null +++ b/.idea/inspectionProfiles/Project_Default.xml @@ -0,0 +1,21 @@ + + + + \ No newline at end of file diff --git a/.idea/inspectionProfiles/profiles_settings.xml b/.idea/inspectionProfiles/profiles_settings.xml new file mode 100644 index 0000000..105ce2d --- /dev/null +++ b/.idea/inspectionProfiles/profiles_settings.xml @@ -0,0 +1,6 @@ + + + + \ No newline at end of file diff --git a/.idea/misc.xml b/.idea/misc.xml new file mode 100644 index 0000000..e57194d --- /dev/null +++ b/.idea/misc.xml @@ -0,0 +1,7 @@ + + + + + + \ No newline at end of file diff --git a/.idea/modules.xml b/.idea/modules.xml new file mode 100644 index 0000000..61250a4 --- /dev/null +++ b/.idea/modules.xml @@ -0,0 +1,8 @@ + + + + + + + + \ No newline at end of file diff --git a/__pycache__/latest_universal.cpython-311.pyc.2160079884656 b/__pycache__/latest_universal.cpython-311.pyc.2160079884656 new file mode 100644 index 0000000..6d57d29 Binary files /dev/null and b/__pycache__/latest_universal.cpython-311.pyc.2160079884656 differ diff --git a/latest_universal.py b/latest_universal.py new file mode 100644 index 0000000..063e76b --- /dev/null +++ b/latest_universal.py @@ -0,0 +1,872 @@ +import os +import re +import shutil +import threading +import time +import tkinter as tk +from tkinter import filedialog, messagebox, simpledialog +from tkinter.scrolledtext import ScrolledText + +import cv2 +from PIL import Image, ImageTk + + +class UniversalAnnotationTool: + def __init__(self, root): + self.root = root + self.root.title("通用标注与裁剪工具") + self.root.geometry("1600x920") + + self.image = None + self.photo = None + self.file_path = None + self.folder_path = None + self.image_list = [] + self.current_image_index = 0 + self.coordinates = {} + self.current_shape = None + self.mode = "polygon" + + self.points = [] + self.lines = [] + self.point_labels = [] + + self.box_preview = None + self.box_preview_text = None + self.box_start = None + self.box_coords = None + self.box_shape_id = None + self.box_text_id = None + + self.restored_points = [] + self.restored_lines = [] + self.restored_text_labels = [] + self.restored_rectangles = [] + + self.rtsp_url = "" + self.cap = None + self.rtsp_thread = None + + self._build_layout() + self._bind_events() + self.enable_polygon_mode() + + def _build_layout(self): + main_frame = tk.Frame(self.root) + main_frame.pack(fill="both", expand=True) + + left_frame = tk.Frame(main_frame) + left_frame.pack(side="left", fill="both", expand=True, padx=10, pady=10) + + canvas_frame = tk.Frame(left_frame) + canvas_frame.pack(fill="both", expand=True) + + v_scrollbar = tk.Scrollbar(canvas_frame, orient="vertical") + v_scrollbar.pack(side="right", fill="y") + h_scrollbar = tk.Scrollbar(canvas_frame, orient="horizontal") + h_scrollbar.pack(side="bottom", fill="x") + + self.canvas = tk.Canvas( + canvas_frame, + bg="white", + width=1100, + height=760, + yscrollcommand=v_scrollbar.set, + xscrollcommand=h_scrollbar.set, + ) + self.canvas.pack(side="left", fill="both", expand=True) + v_scrollbar.config(command=self.canvas.yview) + h_scrollbar.config(command=self.canvas.xview) + + btn_row1 = tk.Frame(left_frame) + btn_row1.pack(fill="x", pady=(10, 4)) + btn_row2 = tk.Frame(left_frame) + btn_row2.pack(fill="x", pady=4) + btn_row3 = tk.Frame(left_frame) + btn_row3.pack(fill="x", pady=4) + + tk.Button(btn_row1, text="上传文件", command=self.upload_file, width=12).pack(side="left", padx=3) + tk.Button(btn_row1, text="选择文件夹", command=self.select_folder, width=12).pack(side="left", padx=3) + tk.Button(btn_row1, text="上一张", command=self.prev_image, width=12).pack(side="left", padx=3) + tk.Button(btn_row1, text="下一张", command=self.next_image, width=12).pack(side="left", padx=3) + tk.Button(btn_row1, text="保存坐标", command=self.save_coordinates, width=12).pack(side="left", padx=3) + tk.Button(btn_row1, text="重置", command=self.reset, width=12).pack(side="left", padx=3) + + tk.Button(btn_row2, text="多边形模式", command=self.enable_polygon_mode, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="框选模式", command=self.enable_box_mode, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="撤回", command=self.undo_last_point, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="恢复输入", command=self.restore_coordinates, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="批量回显", command=self.batch_restore_coordinates, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="裁剪", command=self.crop_image, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="压缩当前", command=self.compress_image, width=12).pack(side="left", padx=3) + tk.Button(btn_row2, text="批量压缩", command=self.compress_all_images, width=12).pack(side="left", padx=3) + + tk.Button(btn_row3, text="连接 RTSP", command=self.connect_rtsp, width=12).pack(side="left", padx=3) + tk.Button(btn_row3, text="截图 RTSP", command=self.capture_rtsp_frame, width=12).pack(side="left", padx=3) + tk.Button(btn_row3, text="断开 RTSP", command=self.disconnect_rtsp, width=12).pack(side="left", padx=3) + tk.Button(btn_row3, text="视频抽帧", command=self.extract_video_frames, width=12).pack(side="left", padx=3) + tk.Button(btn_row3, text="整理标注图", command=self.collect_labeled_images, width=12).pack(side="left", padx=3) + + self.status_label = tk.Label(left_frame, text="状态: 就绪", anchor="w", fg="darkgreen") + self.status_label.pack(fill="x", pady=(6, 0)) + + right_frame = tk.Frame(main_frame) + right_frame.pack(side="right", fill="y", padx=10, pady=10) + + tk.Label(right_frame, text="图片列表", font=("Arial", 12, "bold")).pack(anchor="w") + self.image_listbox = tk.Listbox(right_frame, width=34, height=24) + self.image_listbox.pack(fill="y", pady=(4, 8)) + self.image_listbox.bind("<>", self.on_image_select) + + search_frame = tk.Frame(right_frame) + search_frame.pack(fill="x", pady=(0, 8)) + tk.Label(search_frame, text="搜索").pack(side="left") + self.search_entry = tk.Entry(search_frame) + self.search_entry.pack(side="left", fill="x", expand=True, padx=(6, 0)) + self.search_entry.bind("", self.search_image) + + tk.Label(right_frame, text="坐标 / 标注输入").pack(anchor="w") + self.coords_text = ScrolledText(right_frame, width=38, height=10) + self.coords_text.pack(fill="both", pady=(4, 8)) + + crop_frame = tk.Frame(right_frame) + crop_frame.pack(fill="x", pady=(0, 8)) + tk.Label(crop_frame, text="裁剪坐标").pack(side="left") + self.crop_entry = tk.Entry(crop_frame) + self.crop_entry.pack(side="left", fill="x", expand=True, padx=(6, 0)) + + tk.Label(right_frame, text="说明").pack(anchor="w") + self.help_text = tk.Label( + right_frame, + justify="left", + anchor="w", + text=( + "1. 多边形: 逐点点击,点回起点附近可闭合\n" + "2. 框选: 拖拽生成矩形,自动写入裁剪框\n" + "3. 输入支持:\n" + " rect:10,20,200,300\n" + " polygon:10,20,100,20,100,100\n" + " 纯 4 个数默认矩形,6 个及以上默认多边形\n" + "4. 批量回显支持 result 1: (x1 y1 x2 y2) 0.98, label" + ), + ) + self.help_text.pack(fill="x") + + def _bind_events(self): + self.root.bind("", self.undo_last_point) + + def set_status(self, message, color="darkgreen"): + self.status_label.config(text=f"状态: {message}", fg=color) + + def update_canvas_size(self, width, height): + self.canvas.config(scrollregion=(0, 0, width, height)) + self.canvas.config(width=min(width, 1100), height=min(height, 760)) + + def display_image(self): + self.canvas.delete("all") + self.clear_canvas_state() + if not self.image: + return + self.update_canvas_size(self.image.width, self.image.height) + self.photo = ImageTk.PhotoImage(self.image) + self.canvas.create_image(0, 0, image=self.photo, anchor="nw") + self.redraw_current_shape() + + def clear_canvas_state(self): + self.lines = [] + self.point_labels = [] + self.box_shape_id = None + self.box_text_id = None + self.box_preview = None + self.box_preview_text = None + self.clear_restored_shapes() + + def clear_restored_shapes(self): + for item in self.restored_points + self.restored_lines + self.restored_text_labels + self.restored_rectangles: + self.canvas.delete(item) + self.restored_points = [] + self.restored_lines = [] + self.restored_text_labels = [] + self.restored_rectangles = [] + + def update_input_boxes(self): + self.coords_text.delete("1.0", tk.END) + if self.current_shape: + self.coords_text.insert("1.0", self.serialize_annotation(self.current_shape)) + self.crop_entry.delete(0, tk.END) + if self.box_coords: + self.crop_entry.insert(0, f"{self.box_coords[0]},{self.box_coords[1]},{self.box_coords[2]},{self.box_coords[3]}") + + def redraw_current_shape(self): + if not self.current_shape: + self.update_input_boxes() + return + if self.current_shape["type"] == "polygon": + self.points = [tuple(point) for point in self.current_shape["points"]] + self.draw_polygon(self.points, color="red", editable=True) + elif self.current_shape["type"] == "rect": + self.box_coords = tuple(self.current_shape["coords"]) + self.draw_rect(self.box_coords, color="blue", editable=True) + self.update_input_boxes() + + def draw_polygon(self, points, color="red", editable=False): + if not points: + return + for index, (x, y) in enumerate(points, start=1): + oval = self.canvas.create_oval(x - 5, y - 5, x + 5, y + 5, fill=color, outline=color) + label = self.canvas.create_text( + x + 10, + y, + text=str(index), + fill="green" if editable else color, + font=("Arial", 14, "bold"), + ) + if editable: + self.point_labels.extend([oval, label]) + else: + self.restored_points.append(oval) + self.restored_text_labels.append(label) + for start, end in zip(points, points[1:]): + line = self.canvas.create_line(start, end, fill=color, width=2) + if editable: + self.lines.append(line) + else: + self.restored_lines.append(line) + if len(points) > 2: + line = self.canvas.create_line(points[-1], points[0], fill=color, width=2) + if editable: + self.lines.append(line) + else: + self.restored_lines.append(line) + + def draw_rect(self, coords, color="blue", editable=False, label=None): + x1, y1, x2, y2 = coords + rect = self.canvas.create_rectangle(x1, y1, x2, y2, outline=color, width=2) + width = abs(x2 - x1) + height = abs(y2 - y1) + text = self.canvas.create_text( + min(x1, x2) + 10, + min(y1, y2) + 12, + anchor="nw", + text=label or f"{width}x{height}", + fill=color, + font=("Arial", 12, "bold"), + ) + if editable: + self.box_shape_id = rect + self.box_text_id = text + else: + self.restored_rectangles.append(rect) + self.restored_text_labels.append(text) + + def enable_polygon_mode(self): + self.mode = "polygon" + self.canvas.unbind("") + self.canvas.unbind("") + self.canvas.unbind("") + self.canvas.bind("", self.on_click) + self.set_status("已切换到多边形模式") + + def enable_box_mode(self): + self.mode = "box" + self.canvas.unbind("") + self.canvas.bind("", self.on_box_press) + self.canvas.bind("", self.on_box_drag) + self.canvas.bind("", self.on_box_release) + self.set_status("已切换到框选模式") + + def on_click(self, event): + if self.mode != "polygon" or not self.image: + return + + x, y = self.clamp_to_image(event.x, event.y) + if self.box_coords: + self.box_coords = None + + if len(self.points) > 2: + first_x, first_y = self.points[0] + distance = ((x - first_x) ** 2 + (y - first_y) ** 2) ** 0.5 + if distance < 20: + self.current_shape = {"type": "polygon", "points": self.points[:]} + self.display_image() + self.set_status("多边形已闭合") + return + + self.points.append((x, y)) + self.current_shape = {"type": "polygon", "points": self.points[:]} + self.display_image() + + def on_box_press(self, event): + if self.mode != "box" or not self.image: + return + self.box_start = self.clamp_to_image(event.x, event.y) + self.box_coords = None + self.current_shape = None + self.display_image() + + def on_box_drag(self, event): + if self.mode != "box" or not self.image or not self.box_start: + return + x0, y0 = self.box_start + x1, y1 = self.clamp_to_image(event.x, event.y) + if self.box_preview: + self.canvas.delete(self.box_preview) + if self.box_preview_text: + self.canvas.delete(self.box_preview_text) + self.box_preview = self.canvas.create_rectangle(x0, y0, x1, y1, outline="blue", width=2, dash=(4, 2)) + self.box_preview_text = self.canvas.create_text( + min(x0, x1) + 10, + min(y0, y1) + 12, + anchor="nw", + text=f"{abs(x1 - x0)}x{abs(y1 - y0)}", + fill="blue", + font=("Arial", 12, "bold"), + ) + + def on_box_release(self, event): + if self.mode != "box" or not self.image or not self.box_start: + return + x0, y0 = self.box_start + x1, y1 = self.clamp_to_image(event.x, event.y) + self.box_start = None + self.box_coords = (min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1)) + self.current_shape = {"type": "rect", "coords": self.box_coords} + self.display_image() + self.set_status("框选完成") + + def clamp_to_image(self, x, y): + if not self.image: + return x, y + x = min(max(int(self.canvas.canvasx(x)), 0), max(self.image.width - 1, 0)) + y = min(max(int(self.canvas.canvasy(y)), 0), max(self.image.height - 1, 0)) + return x, y + + def serialize_annotation(self, annotation): + if annotation["type"] == "rect": + x1, y1, x2, y2 = annotation["coords"] + return f"rect:{x1},{y1},{x2},{y2}" + points = annotation["points"] + flat = ",".join(f"{x},{y}" for x, y in points) + return f"polygon:{flat}" + + def parse_annotation(self, text): + raw = text.strip() + if not raw: + return None + lowered = raw.lower() + if lowered.startswith("rect:"): + coords = self.parse_ints(raw.split(":", 1)[1]) + if len(coords) != 4: + raise ValueError("矩形需要 4 个数字") + x1, y1, x2, y2 = coords + return {"type": "rect", "coords": (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))} + if lowered.startswith("polygon:"): + coords = self.parse_ints(raw.split(":", 1)[1]) + if len(coords) < 6 or len(coords) % 2 != 0: + raise ValueError("多边形至少需要 3 个点") + points = [(coords[i], coords[i + 1]) for i in range(0, len(coords), 2)] + return {"type": "polygon", "points": points} + + coords = self.parse_ints(raw) + if len(coords) == 4: + x1, y1, x2, y2 = coords + return {"type": "rect", "coords": (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))} + if len(coords) >= 6 and len(coords) % 2 == 0: + points = [(coords[i], coords[i + 1]) for i in range(0, len(coords), 2)] + return {"type": "polygon", "points": points} + raise ValueError("无法识别坐标格式") + + def parse_ints(self, text): + numbers = re.findall(r"-?\d+", text) + return [int(num) for num in numbers] + + def load_annotation(self, annotation, from_input=False): + self.current_shape = annotation + if annotation["type"] == "polygon": + self.points = [tuple(point) for point in annotation["points"]] + self.box_coords = None + else: + self.points = [] + self.box_coords = tuple(annotation["coords"]) + self.display_image() + if from_input: + self.set_status("已根据输入恢复标注") + + def save_coordinates(self): + if not self.current_shape: + self.set_status("请先绘制或恢复标注", "red") + return + + coords_str = self.serialize_annotation(self.current_shape) + self.update_input_boxes() + print(coords_str) + + if self.folder_path and self.image_list: + image_name = self.image_list[self.current_image_index] + self.coordinates[image_name] = coords_str + self.write_coordinates_file() + self.highlight_images_with_coordinates() + self.set_status(f"已保存 {image_name} 的标注") + if self.current_image_index < len(self.image_list) - 1: + self.next_image() + else: + self.set_status("已输出当前标注") + + def write_coordinates_file(self): + if not self.folder_path: + return + data_file = os.path.join(self.folder_path, "data.txt") + with open(data_file, "w", encoding="utf-8") as file: + for image_name, coords in self.coordinates.items(): + file.write(f"{image_name}: {coords}\n") + + def reset(self): + self.points = [] + self.current_shape = None + self.box_coords = None + self.box_start = None + self.display_image() + self.update_input_boxes() + self.set_status("已重置当前标注") + + def undo_last_point(self, event=None): + if self.current_shape and self.current_shape["type"] == "rect": + self.current_shape = None + self.box_coords = None + self.display_image() + self.update_input_boxes() + self.set_status("已清除矩形框") + return + + if self.points: + self.points.pop() + if self.points: + self.current_shape = {"type": "polygon", "points": self.points[:]} + else: + self.current_shape = None + self.display_image() + self.set_status("已撤回一个点") + + def upload_file(self): + file_types = [("Image/Video files", "*.jpg *.jpeg *.png *.bmp *.mp4 *.avi *.mov")] + file_path = filedialog.askopenfilename(filetypes=file_types) + if not file_path: + return + + self.file_path = file_path + if file_path.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")): + self.image = Image.open(file_path).convert("RGB") + else: + cap = cv2.VideoCapture(file_path) + success, frame = cap.read() + cap.release() + if not success: + self.set_status("视频首帧读取失败", "red") + return + frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + self.image = Image.fromarray(frame) + + self.current_shape = None + self.points = [] + self.box_coords = None + self.display_image() + self.set_status(f"已加载文件: {os.path.basename(file_path)}") + + def select_folder(self): + folder_path = filedialog.askdirectory() + if not folder_path: + return + self.folder_path = folder_path + self.refresh_image_list() + self.load_existing_coordinates() + if self.image_list: + self.current_image_index = 0 + self.load_image_from_folder() + self.set_status(f"已选择文件夹: {folder_path}") + + def refresh_image_list(self): + if not self.folder_path: + return + self.image_list = sorted( + [ + name + for name in os.listdir(self.folder_path) + if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")) + ] + ) + self.image_listbox.delete(0, tk.END) + for image_name in self.image_list: + self.image_listbox.insert(tk.END, image_name) + self.highlight_images_with_coordinates() + + def load_existing_coordinates(self): + self.coordinates = {} + if not self.folder_path: + return + data_file = os.path.join(self.folder_path, "data.txt") + if not os.path.exists(data_file): + return + with open(data_file, "r", encoding="utf-8") as file: + for raw_line in file: + line = raw_line.strip() + if not line or ": " not in line: + continue + image_name, coords_str = line.split(": ", 1) + self.coordinates[image_name] = coords_str + + def load_image_from_folder(self): + if not self.folder_path or not self.image_list: + return + image_name = self.image_list[self.current_image_index] + image_path = os.path.join(self.folder_path, image_name) + self.image = Image.open(image_path).convert("RGB") + self.current_shape = None + self.points = [] + self.box_coords = None + self.display_image() + self.highlight_current_image() + self.restore_coordinates_from_file() + self.set_status(f"已加载: {image_name}") + + def on_image_select(self, event): + if not self.image_listbox.curselection(): + return + self.current_image_index = self.image_listbox.curselection()[0] + self.load_image_from_folder() + + def highlight_current_image(self): + for index, image_name in enumerate(self.image_list): + color = "blue" if image_name in self.coordinates else "black" + self.image_listbox.itemconfig(index, {"fg": color}) + if self.image_list: + self.image_listbox.itemconfig(self.current_image_index, {"fg": "red"}) + self.image_listbox.selection_clear(0, tk.END) + self.image_listbox.selection_set(self.current_image_index) + self.image_listbox.see(self.current_image_index) + + def highlight_images_with_coordinates(self): + for index, image_name in enumerate(self.image_list): + color = "blue" if image_name in self.coordinates else "black" + self.image_listbox.itemconfig(index, {"fg": color}) + + def prev_image(self): + if not self.image_list: + return + self.current_image_index = max(0, self.current_image_index - 1) + self.load_image_from_folder() + + def next_image(self): + if not self.image_list: + return + self.current_image_index = min(len(self.image_list) - 1, self.current_image_index + 1) + self.load_image_from_folder() + + def search_image(self, event=None): + keyword = self.search_entry.get().strip().lower() + if not keyword: + return + for index, image_name in enumerate(self.image_list): + if keyword in image_name.lower(): + self.current_image_index = index + self.load_image_from_folder() + return + self.set_status("未找到匹配图片", "red") + + def restore_coordinates(self): + if not self.image: + self.set_status("请先加载图片", "red") + return + coords_str = self.coords_text.get("1.0", tk.END).strip() + if not coords_str: + self.set_status("请输入坐标内容", "red") + return + try: + annotation = self.parse_annotation(coords_str) + except ValueError as exc: + self.set_status(str(exc), "red") + return + self.load_annotation(annotation, from_input=True) + + def restore_coordinates_from_file(self): + if not self.image_list: + return + image_name = self.image_list[self.current_image_index] + coords_str = self.coordinates.get(image_name) + if not coords_str: + self.coords_text.delete("1.0", tk.END) + self.crop_entry.delete(0, tk.END) + return + try: + annotation = self.parse_annotation(coords_str) + except ValueError: + self.coords_text.delete("1.0", tk.END) + self.coords_text.insert("1.0", coords_str) + self.set_status(f"{image_name} 的坐标格式无法自动恢复", "red") + return + self.load_annotation(annotation) + + def batch_restore_coordinates(self): + if not self.image: + self.set_status("请先加载图片", "red") + return + input_text = self.coords_text.get("1.0", tk.END).strip() + if not input_text: + self.set_status("请输入批量框内容", "red") + return + + self.clear_restored_shapes() + count = 0 + pattern = re.compile( + r"result\s+(\d+):\s*\(\s*(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s*\)\s*[\d.]+\s*,\s*([A-Za-z0-9_\-]+)" + ) + for line in input_text.splitlines(): + match = pattern.search(line) + if not match: + continue + result_id, x1, y1, x2, y2, label = match.groups() + coords = (int(x1), int(y1), int(x2), int(y2)) + self.draw_rect(coords, color="yellow", editable=False, label=f"result {result_id} {label}") + count += 1 + + if count == 0: + self.set_status("没有识别到批量框格式", "red") + else: + self.set_status(f"已回显 {count} 个检测框") + + def get_crop_box(self): + if self.box_coords: + return self.box_coords + raw = self.crop_entry.get().strip() + if not raw: + raise ValueError("请先框选或输入裁剪坐标") + annotation = self.parse_annotation(raw) + if annotation["type"] == "rect": + return annotation["coords"] + xs = [point[0] for point in annotation["points"]] + ys = [point[1] for point in annotation["points"]] + return min(xs), min(ys), max(xs), max(ys) + + def crop_image(self): + if not self.image: + self.set_status("请先加载图片", "red") + return + try: + x1, y1, x2, y2 = self.get_crop_box() + except ValueError as exc: + self.set_status(str(exc), "red") + return + + x1 = max(0, min(x1, self.image.width)) + y1 = max(0, min(y1, self.image.height)) + x2 = max(0, min(x2, self.image.width)) + y2 = max(0, min(y2, self.image.height)) + if x2 <= x1 or y2 <= y1: + self.set_status("裁剪区域无效", "red") + return + + cropped = self.image.crop((x1, y1, x2, y2)) + target_size = 640 + ratio = min(target_size / cropped.width, target_size / cropped.height) + new_width = max(1, int(cropped.width * ratio)) + new_height = max(1, int(cropped.height * ratio)) + resized = cropped.resize((new_width, new_height), Image.Resampling.LANCZOS) + background = Image.new("RGB", (target_size, target_size), (0, 0, 0)) + offset = ((target_size - new_width) // 2, (target_size - new_height) // 2) + background.paste(resized, offset) + + save_path = filedialog.asksaveasfilename( + title="保存裁剪结果", + defaultextension=".jpg", + filetypes=[("JPEG files", "*.jpg"), ("PNG files", "*.png")], + ) + if save_path: + background.save(save_path) + + self.image = background + self.current_shape = None + self.points = [] + self.box_coords = None + self.display_image() + self.set_status(f"裁剪完成: ({x1},{y1})-({x2},{y2})") + + def compress_image(self): + if not self.image: + self.set_status("请先加载图片", "red") + return + self.image = self.make_640_image(self.image) + self.current_shape = None + self.points = [] + self.box_coords = None + self.display_image() + self.set_status("当前图片已压缩到 640x640") + + def compress_all_images(self): + if not self.folder_path or not self.image_list: + self.set_status("请先选择工作文件夹", "red") + return + for image_name in self.image_list: + image_path = os.path.join(self.folder_path, image_name) + image = Image.open(image_path).convert("RGB") + compressed = self.make_640_image(image) + compressed.save(image_path) + self.load_image_from_folder() + self.set_status("批量压缩完成") + + def make_640_image(self, image): + target_size = 640 + ratio = min(target_size / image.width, target_size / image.height) + new_width = max(1, int(image.width * ratio)) + new_height = max(1, int(image.height * ratio)) + resized = image.resize((new_width, new_height), Image.Resampling.LANCZOS) + background = Image.new("RGB", (target_size, target_size), (0, 0, 0)) + offset = ((target_size - new_width) // 2, (target_size - new_height) // 2) + background.paste(resized, offset) + return background + + def connect_rtsp(self): + rtsp_url = simpledialog.askstring("RTSP 连接", "请输入 RTSP 地址:") + if not rtsp_url: + return + self.rtsp_url = rtsp_url + self.set_status("正在连接 RTSP...", "blue") + self.rtsp_thread = threading.Thread(target=self._connect_rtsp_thread, daemon=True) + self.rtsp_thread.start() + + def _connect_rtsp_thread(self): + try: + cap = cv2.VideoCapture(self.rtsp_url) + cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000) + if not cap.isOpened(): + self.root.after(0, lambda: self.set_status("RTSP 连接失败", "red")) + return + success, _ = cap.read() + if not success: + cap.release() + self.root.after(0, lambda: self.set_status("RTSP 读取失败", "red")) + return + self.cap = cap + self.root.after(0, lambda: self.set_status("RTSP 已连接")) + except Exception as exc: + self.root.after(0, lambda: self.set_status(f"RTSP 错误: {exc}", "red")) + + def capture_rtsp_frame(self): + if not self.cap or not self.cap.isOpened(): + self.set_status("RTSP 尚未连接", "red") + return + success, frame = self.cap.read() + if not success: + self.set_status("RTSP 截图失败", "red") + return + frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB) + self.image = Image.fromarray(frame_rgb) + self.current_shape = None + self.points = [] + self.box_coords = None + self.display_image() + + save_path = filedialog.asksaveasfilename( + title="保存 RTSP 截图", + initialfile=f"rtsp_capture_{time.strftime('%Y%m%d_%H%M%S')}.jpg", + defaultextension=".jpg", + filetypes=[("JPEG files", "*.jpg"), ("PNG files", "*.png")], + ) + if save_path: + cv2.imwrite(save_path, frame) + if self.folder_path: + copied_path = os.path.join(self.folder_path, os.path.basename(save_path)) + shutil.copy2(save_path, copied_path) + self.refresh_image_list() + self.set_status("RTSP 截图完成") + + def disconnect_rtsp(self): + if self.cap: + self.cap.release() + self.cap = None + self.set_status("RTSP 已断开") + + def extract_video_frames(self): + video_path = filedialog.askopenfilename( + title="选择视频文件", + filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv")], + ) + if not video_path: + return + output_folder = filedialog.askdirectory(title="选择输出文件夹") + if not output_folder: + return + target_fps = simpledialog.askinteger("抽帧", "请输入目标帧率(FPS):", initialvalue=5, minvalue=1, maxvalue=60) + if not target_fps: + return + + video = cv2.VideoCapture(video_path) + original_fps = video.get(cv2.CAP_PROP_FPS) + if not original_fps: + video.release() + self.set_status("无法读取视频帧率", "red") + return + + interval = max(1, int(original_fps / target_fps)) + frame_index = 0 + saved_count = 0 + while True: + success, frame = video.read() + if not success: + break + if frame_index % interval == 0: + save_path = os.path.join(output_folder, f"frame_{saved_count:05d}.jpg") + cv2.imwrite(save_path, frame) + saved_count += 1 + frame_index += 1 + video.release() + self.set_status(f"抽帧完成,保存 {saved_count} 张") + + def collect_labeled_images(self): + label_folder = filedialog.askdirectory(title="选择标签文件夹") + if not label_folder: + return + image_source_folder = filedialog.askdirectory(title="选择图片源文件夹") + if not image_source_folder: + return + output_folder = filedialog.askdirectory(title="选择输出文件夹") + if not output_folder: + return + + images_out = os.path.join(output_folder, "images") + labels_out = os.path.join(output_folder, "labels") + os.makedirs(images_out, exist_ok=True) + os.makedirs(labels_out, exist_ok=True) + + count = 0 + for filename in os.listdir(label_folder): + if not filename.endswith(".txt") or filename == "classes.txt": + continue + base_name = os.path.splitext(filename)[0] + src_label = os.path.join(label_folder, filename) + found_image = None + for suffix in (".jpg", ".jpeg", ".png", ".bmp"): + candidate = os.path.join(image_source_folder, base_name + suffix) + if os.path.exists(candidate): + found_image = candidate + break + if not found_image: + continue + shutil.copy2(found_image, os.path.join(images_out, os.path.basename(found_image))) + shutil.copy2(src_label, os.path.join(labels_out, filename)) + count += 1 + + classes_path = os.path.join(label_folder, "classes.txt") + if os.path.exists(classes_path): + shutil.copy2(classes_path, os.path.join(output_folder, "classes.txt")) + + self.set_status(f"整理完成,共复制 {count} 组图像和标签") + messagebox.showinfo("完成", f"已整理 {count} 组标注数据") + + +def main(): + root = tk.Tk() + UniversalAnnotationTool(root) + root.mainloop() + + +if __name__ == "__main__": + main() diff --git a/video_frame_extractor_gui.py b/video_frame_extractor_gui.py new file mode 100644 index 0000000..695c615 --- /dev/null +++ b/video_frame_extractor_gui.py @@ -0,0 +1,239 @@ +import math +import os +import tkinter as tk +from tkinter import filedialog, messagebox + +import cv2 + + +class VideoFrameExtractorApp: + def __init__(self, root): + self.root = root + self.root.title("视频拆分图片工具") + self.root.geometry("760x520") + + self.video_path = "" + self.output_dir = "" + self.video_info = {} + + self._build_ui() + + def _build_ui(self): + main = tk.Frame(self.root, padx=14, pady=14) + main.pack(fill="both", expand=True) + + source_frame = tk.LabelFrame(main, text="视频源", padx=10, pady=10) + source_frame.pack(fill="x", pady=(0, 10)) + + self.video_path_var = tk.StringVar() + tk.Entry(source_frame, textvariable=self.video_path_var).pack(side="left", fill="x", expand=True) + tk.Button(source_frame, text="添加视频", width=12, command=self.choose_video).pack(side="left", padx=(8, 0)) + + info_frame = tk.LabelFrame(main, text="视频信息", padx=10, pady=10) + info_frame.pack(fill="x", pady=(0, 10)) + + self.info_labels = {} + fields = [ + ("file_name", "文件名"), + ("resolution", "分辨率"), + ("fps", "视频帧率"), + ("frame_count", "总帧数"), + ("duration", "总时长"), + ("codec", "编码格式"), + ("size", "文件大小"), + ] + for row, (key, title) in enumerate(fields): + tk.Label(info_frame, text=f"{title}:", width=12, anchor="w").grid(row=row, column=0, sticky="w", pady=2) + label = tk.Label(info_frame, text="未加载", anchor="w") + label.grid(row=row, column=1, sticky="w", pady=2) + self.info_labels[key] = label + + settings_frame = tk.LabelFrame(main, text="抽帧设置", padx=10, pady=10) + settings_frame.pack(fill="x", pady=(0, 10)) + + tk.Label(settings_frame, text="每秒抽取图片数:", width=14, anchor="w").grid(row=0, column=0, sticky="w") + self.target_fps_var = tk.StringVar(value="5") + tk.Entry(settings_frame, textvariable=self.target_fps_var, width=12).grid(row=0, column=1, sticky="w") + + tk.Label(settings_frame, text="输出文件夹:", width=14, anchor="w").grid(row=1, column=0, sticky="w", pady=(10, 0)) + self.output_dir_var = tk.StringVar() + tk.Entry(settings_frame, textvariable=self.output_dir_var).grid(row=1, column=1, sticky="ew", pady=(10, 0)) + tk.Button(settings_frame, text="选择输出", width=12, command=self.choose_output_dir).grid(row=1, column=2, padx=(8, 0), pady=(10, 0)) + settings_frame.grid_columnconfigure(1, weight=1) + + action_frame = tk.Frame(main) + action_frame.pack(fill="x", pady=(0, 10)) + tk.Button(action_frame, text="开始拆分", width=14, command=self.extract_frames).pack(side="left") + + self.progress_var = tk.StringVar(value="等待开始") + tk.Label(main, textvariable=self.progress_var, anchor="w", justify="left", fg="darkgreen").pack(fill="x") + + def choose_video(self): + video_path = filedialog.askopenfilename( + title="选择视频文件", + filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv *.flv *.wmv"), ("All files", "*.*")], + ) + if not video_path: + return + + self.video_path = video_path + self.video_path_var.set(video_path) + self.load_video_info(video_path) + + if not self.output_dir_var.get().strip(): + default_name = os.path.splitext(os.path.basename(video_path))[0] + "_frames" + self.output_dir = os.path.join(os.path.dirname(video_path), default_name) + self.output_dir_var.set(self.output_dir) + + def choose_output_dir(self): + output_dir = filedialog.askdirectory(title="选择输出文件夹") + if not output_dir: + return + self.output_dir = output_dir + self.output_dir_var.set(output_dir) + + def load_video_info(self, video_path): + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + messagebox.showerror("错误", "无法打开该视频文件。") + return + + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH) or 0) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT) or 0) + fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) + duration_seconds = frame_count / fps if fps > 0 else 0 + fourcc_value = int(cap.get(cv2.CAP_PROP_FOURCC) or 0) + cap.release() + + self.video_info = { + "file_name": os.path.basename(video_path), + "resolution": f"{width} x {height}", + "fps": f"{fps:.3f}" if fps else "未知", + "frame_count": str(frame_count) if frame_count else "未知", + "duration": self.format_duration(duration_seconds), + "codec": self.decode_fourcc(fourcc_value), + "size": self.format_size(os.path.getsize(video_path)), + } + + for key, value in self.video_info.items(): + self.info_labels[key].config(text=value) + + self.progress_var.set("视频信息已读取,可以设置抽帧参数。") + + def decode_fourcc(self, value): + if value <= 0: + return "未知" + chars = [chr((value >> 8 * i) & 0xFF) for i in range(4)] + codec = "".join(chars).strip() + return codec if codec else "未知" + + def format_duration(self, seconds): + seconds = max(0, int(round(seconds))) + hours = seconds // 3600 + minutes = (seconds % 3600) // 60 + secs = seconds % 60 + return f"{hours:02d}:{minutes:02d}:{secs:02d}" + + def format_size(self, size_bytes): + units = ["B", "KB", "MB", "GB", "TB"] + size = float(size_bytes) + index = 0 + while size >= 1024 and index < len(units) - 1: + size /= 1024 + index += 1 + return f"{size:.2f} {units[index]}" + + def extract_frames(self): + video_path = self.video_path_var.get().strip() + output_dir = self.output_dir_var.get().strip() + target_fps_text = self.target_fps_var.get().strip() + + if not video_path: + messagebox.showwarning("提示", "请先添加视频文件。") + return + if not os.path.exists(video_path): + messagebox.showwarning("提示", "视频文件不存在,请重新选择。") + return + if not output_dir: + messagebox.showwarning("提示", "请选择输出文件夹。") + return + + try: + target_fps = float(target_fps_text) + except ValueError: + messagebox.showwarning("提示", "每秒抽取图片数必须是数字。") + return + + if target_fps <= 0: + messagebox.showwarning("提示", "每秒抽取图片数必须大于 0。") + return + + os.makedirs(output_dir, exist_ok=True) + + cap = cv2.VideoCapture(video_path) + if not cap.isOpened(): + messagebox.showerror("错误", "无法打开视频文件。") + return + + original_fps = float(cap.get(cv2.CAP_PROP_FPS) or 0.0) + frame_count = int(cap.get(cv2.CAP_PROP_FRAME_COUNT) or 0) + if original_fps <= 0: + cap.release() + messagebox.showerror("错误", "无法读取视频帧率。") + return + + if target_fps >= original_fps: + interval = 1 + actual_fps = original_fps + else: + interval = max(1, int(round(original_fps / target_fps))) + actual_fps = original_fps / interval + + frame_index = 0 + saved_count = 0 + base_name = os.path.splitext(os.path.basename(video_path))[0] + + self.progress_var.set( + f"开始拆分:原始 FPS={original_fps:.3f},目标={target_fps:.3f},实际约={actual_fps:.3f} 张/秒" + ) + self.root.update_idletasks() + + while True: + success, frame = cap.read() + if not success: + break + + if frame_index % interval == 0: + timestamp = frame_index / original_fps + output_name = f"{base_name}_{saved_count:06d}_{timestamp:09.3f}s.jpg" + output_path = os.path.join(output_dir, output_name) + cv2.imwrite(output_path, frame) + saved_count += 1 + + frame_index += 1 + + if frame_count and frame_index % 100 == 0: + percent = frame_index / frame_count * 100 + self.progress_var.set(f"处理中:{frame_index}/{frame_count} 帧,约 {percent:.1f}%") + self.root.update_idletasks() + + cap.release() + + self.progress_var.set( + f"拆分完成:共导出 {saved_count} 张图片,输出目录:{output_dir}" + ) + messagebox.showinfo( + "完成", + f"视频拆分完成。\n\n导出图片:{saved_count} 张\n输出目录:{output_dir}\n实际抽取频率:约 {actual_fps:.3f} 张/秒", + ) + + +def main(): + root = tk.Tk() + VideoFrameExtractorApp(root) + root.mainloop() + + +if __name__ == "__main__": + main()