873 lines
34 KiB
Python
873 lines
34 KiB
Python
import os
|
||
import re
|
||
import shutil
|
||
import threading
|
||
import time
|
||
import tkinter as tk
|
||
from tkinter import filedialog, messagebox, simpledialog
|
||
from tkinter.scrolledtext import ScrolledText
|
||
|
||
import cv2
|
||
from PIL import Image, ImageTk
|
||
|
||
|
||
class UniversalAnnotationTool:
|
||
def __init__(self, root):
|
||
self.root = root
|
||
self.root.title("通用标注与裁剪工具")
|
||
self.root.geometry("1600x920")
|
||
|
||
self.image = None
|
||
self.photo = None
|
||
self.file_path = None
|
||
self.folder_path = None
|
||
self.image_list = []
|
||
self.current_image_index = 0
|
||
self.coordinates = {}
|
||
self.current_shape = None
|
||
self.mode = "polygon"
|
||
|
||
self.points = []
|
||
self.lines = []
|
||
self.point_labels = []
|
||
|
||
self.box_preview = None
|
||
self.box_preview_text = None
|
||
self.box_start = None
|
||
self.box_coords = None
|
||
self.box_shape_id = None
|
||
self.box_text_id = None
|
||
|
||
self.restored_points = []
|
||
self.restored_lines = []
|
||
self.restored_text_labels = []
|
||
self.restored_rectangles = []
|
||
|
||
self.rtsp_url = ""
|
||
self.cap = None
|
||
self.rtsp_thread = None
|
||
|
||
self._build_layout()
|
||
self._bind_events()
|
||
self.enable_polygon_mode()
|
||
|
||
def _build_layout(self):
|
||
main_frame = tk.Frame(self.root)
|
||
main_frame.pack(fill="both", expand=True)
|
||
|
||
left_frame = tk.Frame(main_frame)
|
||
left_frame.pack(side="left", fill="both", expand=True, padx=10, pady=10)
|
||
|
||
canvas_frame = tk.Frame(left_frame)
|
||
canvas_frame.pack(fill="both", expand=True)
|
||
|
||
v_scrollbar = tk.Scrollbar(canvas_frame, orient="vertical")
|
||
v_scrollbar.pack(side="right", fill="y")
|
||
h_scrollbar = tk.Scrollbar(canvas_frame, orient="horizontal")
|
||
h_scrollbar.pack(side="bottom", fill="x")
|
||
|
||
self.canvas = tk.Canvas(
|
||
canvas_frame,
|
||
bg="white",
|
||
width=1100,
|
||
height=760,
|
||
yscrollcommand=v_scrollbar.set,
|
||
xscrollcommand=h_scrollbar.set,
|
||
)
|
||
self.canvas.pack(side="left", fill="both", expand=True)
|
||
v_scrollbar.config(command=self.canvas.yview)
|
||
h_scrollbar.config(command=self.canvas.xview)
|
||
|
||
btn_row1 = tk.Frame(left_frame)
|
||
btn_row1.pack(fill="x", pady=(10, 4))
|
||
btn_row2 = tk.Frame(left_frame)
|
||
btn_row2.pack(fill="x", pady=4)
|
||
btn_row3 = tk.Frame(left_frame)
|
||
btn_row3.pack(fill="x", pady=4)
|
||
|
||
tk.Button(btn_row1, text="上传文件", command=self.upload_file, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row1, text="选择文件夹", command=self.select_folder, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row1, text="上一张", command=self.prev_image, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row1, text="下一张", command=self.next_image, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row1, text="保存坐标", command=self.save_coordinates, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row1, text="重置", command=self.reset, width=12).pack(side="left", padx=3)
|
||
|
||
tk.Button(btn_row2, text="多边形模式", command=self.enable_polygon_mode, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="框选模式", command=self.enable_box_mode, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="撤回", command=self.undo_last_point, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="恢复输入", command=self.restore_coordinates, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="批量回显", command=self.batch_restore_coordinates, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="裁剪", command=self.crop_image, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="压缩当前", command=self.compress_image, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row2, text="批量压缩", command=self.compress_all_images, width=12).pack(side="left", padx=3)
|
||
|
||
tk.Button(btn_row3, text="连接 RTSP", command=self.connect_rtsp, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row3, text="截图 RTSP", command=self.capture_rtsp_frame, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row3, text="断开 RTSP", command=self.disconnect_rtsp, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row3, text="视频抽帧", command=self.extract_video_frames, width=12).pack(side="left", padx=3)
|
||
tk.Button(btn_row3, text="整理标注图", command=self.collect_labeled_images, width=12).pack(side="left", padx=3)
|
||
|
||
self.status_label = tk.Label(left_frame, text="状态: 就绪", anchor="w", fg="darkgreen")
|
||
self.status_label.pack(fill="x", pady=(6, 0))
|
||
|
||
right_frame = tk.Frame(main_frame)
|
||
right_frame.pack(side="right", fill="y", padx=10, pady=10)
|
||
|
||
tk.Label(right_frame, text="图片列表", font=("Arial", 12, "bold")).pack(anchor="w")
|
||
self.image_listbox = tk.Listbox(right_frame, width=34, height=24)
|
||
self.image_listbox.pack(fill="y", pady=(4, 8))
|
||
self.image_listbox.bind("<<ListboxSelect>>", self.on_image_select)
|
||
|
||
search_frame = tk.Frame(right_frame)
|
||
search_frame.pack(fill="x", pady=(0, 8))
|
||
tk.Label(search_frame, text="搜索").pack(side="left")
|
||
self.search_entry = tk.Entry(search_frame)
|
||
self.search_entry.pack(side="left", fill="x", expand=True, padx=(6, 0))
|
||
self.search_entry.bind("<Return>", self.search_image)
|
||
|
||
tk.Label(right_frame, text="坐标 / 标注输入").pack(anchor="w")
|
||
self.coords_text = ScrolledText(right_frame, width=38, height=10)
|
||
self.coords_text.pack(fill="both", pady=(4, 8))
|
||
|
||
crop_frame = tk.Frame(right_frame)
|
||
crop_frame.pack(fill="x", pady=(0, 8))
|
||
tk.Label(crop_frame, text="裁剪坐标").pack(side="left")
|
||
self.crop_entry = tk.Entry(crop_frame)
|
||
self.crop_entry.pack(side="left", fill="x", expand=True, padx=(6, 0))
|
||
|
||
tk.Label(right_frame, text="说明").pack(anchor="w")
|
||
self.help_text = tk.Label(
|
||
right_frame,
|
||
justify="left",
|
||
anchor="w",
|
||
text=(
|
||
"1. 多边形: 逐点点击,点回起点附近可闭合\n"
|
||
"2. 框选: 拖拽生成矩形,自动写入裁剪框\n"
|
||
"3. 输入支持:\n"
|
||
" rect:10,20,200,300\n"
|
||
" polygon:10,20,100,20,100,100\n"
|
||
" 纯 4 个数默认矩形,6 个及以上默认多边形\n"
|
||
"4. 批量回显支持 result 1: (x1 y1 x2 y2) 0.98, label"
|
||
),
|
||
)
|
||
self.help_text.pack(fill="x")
|
||
|
||
def _bind_events(self):
|
||
self.root.bind("<Control-z>", self.undo_last_point)
|
||
|
||
def set_status(self, message, color="darkgreen"):
|
||
self.status_label.config(text=f"状态: {message}", fg=color)
|
||
|
||
def update_canvas_size(self, width, height):
|
||
self.canvas.config(scrollregion=(0, 0, width, height))
|
||
self.canvas.config(width=min(width, 1100), height=min(height, 760))
|
||
|
||
def display_image(self):
|
||
self.canvas.delete("all")
|
||
self.clear_canvas_state()
|
||
if not self.image:
|
||
return
|
||
self.update_canvas_size(self.image.width, self.image.height)
|
||
self.photo = ImageTk.PhotoImage(self.image)
|
||
self.canvas.create_image(0, 0, image=self.photo, anchor="nw")
|
||
self.redraw_current_shape()
|
||
|
||
def clear_canvas_state(self):
|
||
self.lines = []
|
||
self.point_labels = []
|
||
self.box_shape_id = None
|
||
self.box_text_id = None
|
||
self.box_preview = None
|
||
self.box_preview_text = None
|
||
self.clear_restored_shapes()
|
||
|
||
def clear_restored_shapes(self):
|
||
for item in self.restored_points + self.restored_lines + self.restored_text_labels + self.restored_rectangles:
|
||
self.canvas.delete(item)
|
||
self.restored_points = []
|
||
self.restored_lines = []
|
||
self.restored_text_labels = []
|
||
self.restored_rectangles = []
|
||
|
||
def update_input_boxes(self):
|
||
self.coords_text.delete("1.0", tk.END)
|
||
if self.current_shape:
|
||
self.coords_text.insert("1.0", self.serialize_annotation(self.current_shape))
|
||
self.crop_entry.delete(0, tk.END)
|
||
if self.box_coords:
|
||
self.crop_entry.insert(0, f"{self.box_coords[0]},{self.box_coords[1]},{self.box_coords[2]},{self.box_coords[3]}")
|
||
|
||
def redraw_current_shape(self):
|
||
if not self.current_shape:
|
||
self.update_input_boxes()
|
||
return
|
||
if self.current_shape["type"] == "polygon":
|
||
self.points = [tuple(point) for point in self.current_shape["points"]]
|
||
self.draw_polygon(self.points, color="red", editable=True)
|
||
elif self.current_shape["type"] == "rect":
|
||
self.box_coords = tuple(self.current_shape["coords"])
|
||
self.draw_rect(self.box_coords, color="blue", editable=True)
|
||
self.update_input_boxes()
|
||
|
||
def draw_polygon(self, points, color="red", editable=False):
|
||
if not points:
|
||
return
|
||
for index, (x, y) in enumerate(points, start=1):
|
||
oval = self.canvas.create_oval(x - 5, y - 5, x + 5, y + 5, fill=color, outline=color)
|
||
label = self.canvas.create_text(
|
||
x + 10,
|
||
y,
|
||
text=str(index),
|
||
fill="green" if editable else color,
|
||
font=("Arial", 14, "bold"),
|
||
)
|
||
if editable:
|
||
self.point_labels.extend([oval, label])
|
||
else:
|
||
self.restored_points.append(oval)
|
||
self.restored_text_labels.append(label)
|
||
for start, end in zip(points, points[1:]):
|
||
line = self.canvas.create_line(start, end, fill=color, width=2)
|
||
if editable:
|
||
self.lines.append(line)
|
||
else:
|
||
self.restored_lines.append(line)
|
||
if len(points) > 2:
|
||
line = self.canvas.create_line(points[-1], points[0], fill=color, width=2)
|
||
if editable:
|
||
self.lines.append(line)
|
||
else:
|
||
self.restored_lines.append(line)
|
||
|
||
def draw_rect(self, coords, color="blue", editable=False, label=None):
|
||
x1, y1, x2, y2 = coords
|
||
rect = self.canvas.create_rectangle(x1, y1, x2, y2, outline=color, width=2)
|
||
width = abs(x2 - x1)
|
||
height = abs(y2 - y1)
|
||
text = self.canvas.create_text(
|
||
min(x1, x2) + 10,
|
||
min(y1, y2) + 12,
|
||
anchor="nw",
|
||
text=label or f"{width}x{height}",
|
||
fill=color,
|
||
font=("Arial", 12, "bold"),
|
||
)
|
||
if editable:
|
||
self.box_shape_id = rect
|
||
self.box_text_id = text
|
||
else:
|
||
self.restored_rectangles.append(rect)
|
||
self.restored_text_labels.append(text)
|
||
|
||
def enable_polygon_mode(self):
|
||
self.mode = "polygon"
|
||
self.canvas.unbind("<ButtonPress-1>")
|
||
self.canvas.unbind("<B1-Motion>")
|
||
self.canvas.unbind("<ButtonRelease-1>")
|
||
self.canvas.bind("<Button-1>", self.on_click)
|
||
self.set_status("已切换到多边形模式")
|
||
|
||
def enable_box_mode(self):
|
||
self.mode = "box"
|
||
self.canvas.unbind("<Button-1>")
|
||
self.canvas.bind("<ButtonPress-1>", self.on_box_press)
|
||
self.canvas.bind("<B1-Motion>", self.on_box_drag)
|
||
self.canvas.bind("<ButtonRelease-1>", self.on_box_release)
|
||
self.set_status("已切换到框选模式")
|
||
|
||
def on_click(self, event):
|
||
if self.mode != "polygon" or not self.image:
|
||
return
|
||
|
||
x, y = self.clamp_to_image(event.x, event.y)
|
||
if self.box_coords:
|
||
self.box_coords = None
|
||
|
||
if len(self.points) > 2:
|
||
first_x, first_y = self.points[0]
|
||
distance = ((x - first_x) ** 2 + (y - first_y) ** 2) ** 0.5
|
||
if distance < 20:
|
||
self.current_shape = {"type": "polygon", "points": self.points[:]}
|
||
self.display_image()
|
||
self.set_status("多边形已闭合")
|
||
return
|
||
|
||
self.points.append((x, y))
|
||
self.current_shape = {"type": "polygon", "points": self.points[:]}
|
||
self.display_image()
|
||
|
||
def on_box_press(self, event):
|
||
if self.mode != "box" or not self.image:
|
||
return
|
||
self.box_start = self.clamp_to_image(event.x, event.y)
|
||
self.box_coords = None
|
||
self.current_shape = None
|
||
self.display_image()
|
||
|
||
def on_box_drag(self, event):
|
||
if self.mode != "box" or not self.image or not self.box_start:
|
||
return
|
||
x0, y0 = self.box_start
|
||
x1, y1 = self.clamp_to_image(event.x, event.y)
|
||
if self.box_preview:
|
||
self.canvas.delete(self.box_preview)
|
||
if self.box_preview_text:
|
||
self.canvas.delete(self.box_preview_text)
|
||
self.box_preview = self.canvas.create_rectangle(x0, y0, x1, y1, outline="blue", width=2, dash=(4, 2))
|
||
self.box_preview_text = self.canvas.create_text(
|
||
min(x0, x1) + 10,
|
||
min(y0, y1) + 12,
|
||
anchor="nw",
|
||
text=f"{abs(x1 - x0)}x{abs(y1 - y0)}",
|
||
fill="blue",
|
||
font=("Arial", 12, "bold"),
|
||
)
|
||
|
||
def on_box_release(self, event):
|
||
if self.mode != "box" or not self.image or not self.box_start:
|
||
return
|
||
x0, y0 = self.box_start
|
||
x1, y1 = self.clamp_to_image(event.x, event.y)
|
||
self.box_start = None
|
||
self.box_coords = (min(x0, x1), min(y0, y1), max(x0, x1), max(y0, y1))
|
||
self.current_shape = {"type": "rect", "coords": self.box_coords}
|
||
self.display_image()
|
||
self.set_status("框选完成")
|
||
|
||
def clamp_to_image(self, x, y):
|
||
if not self.image:
|
||
return x, y
|
||
x = min(max(int(self.canvas.canvasx(x)), 0), max(self.image.width - 1, 0))
|
||
y = min(max(int(self.canvas.canvasy(y)), 0), max(self.image.height - 1, 0))
|
||
return x, y
|
||
|
||
def serialize_annotation(self, annotation):
|
||
if annotation["type"] == "rect":
|
||
x1, y1, x2, y2 = annotation["coords"]
|
||
return f"rect:{x1},{y1},{x2},{y2}"
|
||
points = annotation["points"]
|
||
flat = ",".join(f"{x},{y}" for x, y in points)
|
||
return f"polygon:{flat}"
|
||
|
||
def parse_annotation(self, text):
|
||
raw = text.strip()
|
||
if not raw:
|
||
return None
|
||
lowered = raw.lower()
|
||
if lowered.startswith("rect:"):
|
||
coords = self.parse_ints(raw.split(":", 1)[1])
|
||
if len(coords) != 4:
|
||
raise ValueError("矩形需要 4 个数字")
|
||
x1, y1, x2, y2 = coords
|
||
return {"type": "rect", "coords": (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))}
|
||
if lowered.startswith("polygon:"):
|
||
coords = self.parse_ints(raw.split(":", 1)[1])
|
||
if len(coords) < 6 or len(coords) % 2 != 0:
|
||
raise ValueError("多边形至少需要 3 个点")
|
||
points = [(coords[i], coords[i + 1]) for i in range(0, len(coords), 2)]
|
||
return {"type": "polygon", "points": points}
|
||
|
||
coords = self.parse_ints(raw)
|
||
if len(coords) == 4:
|
||
x1, y1, x2, y2 = coords
|
||
return {"type": "rect", "coords": (min(x1, x2), min(y1, y2), max(x1, x2), max(y1, y2))}
|
||
if len(coords) >= 6 and len(coords) % 2 == 0:
|
||
points = [(coords[i], coords[i + 1]) for i in range(0, len(coords), 2)]
|
||
return {"type": "polygon", "points": points}
|
||
raise ValueError("无法识别坐标格式")
|
||
|
||
def parse_ints(self, text):
|
||
numbers = re.findall(r"-?\d+", text)
|
||
return [int(num) for num in numbers]
|
||
|
||
def load_annotation(self, annotation, from_input=False):
|
||
self.current_shape = annotation
|
||
if annotation["type"] == "polygon":
|
||
self.points = [tuple(point) for point in annotation["points"]]
|
||
self.box_coords = None
|
||
else:
|
||
self.points = []
|
||
self.box_coords = tuple(annotation["coords"])
|
||
self.display_image()
|
||
if from_input:
|
||
self.set_status("已根据输入恢复标注")
|
||
|
||
def save_coordinates(self):
|
||
if not self.current_shape:
|
||
self.set_status("请先绘制或恢复标注", "red")
|
||
return
|
||
|
||
coords_str = self.serialize_annotation(self.current_shape)
|
||
self.update_input_boxes()
|
||
print(coords_str)
|
||
|
||
if self.folder_path and self.image_list:
|
||
image_name = self.image_list[self.current_image_index]
|
||
self.coordinates[image_name] = coords_str
|
||
self.write_coordinates_file()
|
||
self.highlight_images_with_coordinates()
|
||
self.set_status(f"已保存 {image_name} 的标注")
|
||
if self.current_image_index < len(self.image_list) - 1:
|
||
self.next_image()
|
||
else:
|
||
self.set_status("已输出当前标注")
|
||
|
||
def write_coordinates_file(self):
|
||
if not self.folder_path:
|
||
return
|
||
data_file = os.path.join(self.folder_path, "data.txt")
|
||
with open(data_file, "w", encoding="utf-8") as file:
|
||
for image_name, coords in self.coordinates.items():
|
||
file.write(f"{image_name}: {coords}\n")
|
||
|
||
def reset(self):
|
||
self.points = []
|
||
self.current_shape = None
|
||
self.box_coords = None
|
||
self.box_start = None
|
||
self.display_image()
|
||
self.update_input_boxes()
|
||
self.set_status("已重置当前标注")
|
||
|
||
def undo_last_point(self, event=None):
|
||
if self.current_shape and self.current_shape["type"] == "rect":
|
||
self.current_shape = None
|
||
self.box_coords = None
|
||
self.display_image()
|
||
self.update_input_boxes()
|
||
self.set_status("已清除矩形框")
|
||
return
|
||
|
||
if self.points:
|
||
self.points.pop()
|
||
if self.points:
|
||
self.current_shape = {"type": "polygon", "points": self.points[:]}
|
||
else:
|
||
self.current_shape = None
|
||
self.display_image()
|
||
self.set_status("已撤回一个点")
|
||
|
||
def upload_file(self):
|
||
file_types = [("Image/Video files", "*.jpg *.jpeg *.png *.bmp *.mp4 *.avi *.mov")]
|
||
file_path = filedialog.askopenfilename(filetypes=file_types)
|
||
if not file_path:
|
||
return
|
||
|
||
self.file_path = file_path
|
||
if file_path.lower().endswith((".jpg", ".jpeg", ".png", ".bmp")):
|
||
self.image = Image.open(file_path).convert("RGB")
|
||
else:
|
||
cap = cv2.VideoCapture(file_path)
|
||
success, frame = cap.read()
|
||
cap.release()
|
||
if not success:
|
||
self.set_status("视频首帧读取失败", "red")
|
||
return
|
||
frame = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
self.image = Image.fromarray(frame)
|
||
|
||
self.current_shape = None
|
||
self.points = []
|
||
self.box_coords = None
|
||
self.display_image()
|
||
self.set_status(f"已加载文件: {os.path.basename(file_path)}")
|
||
|
||
def select_folder(self):
|
||
folder_path = filedialog.askdirectory()
|
||
if not folder_path:
|
||
return
|
||
self.folder_path = folder_path
|
||
self.refresh_image_list()
|
||
self.load_existing_coordinates()
|
||
if self.image_list:
|
||
self.current_image_index = 0
|
||
self.load_image_from_folder()
|
||
self.set_status(f"已选择文件夹: {folder_path}")
|
||
|
||
def refresh_image_list(self):
|
||
if not self.folder_path:
|
||
return
|
||
self.image_list = sorted(
|
||
[
|
||
name
|
||
for name in os.listdir(self.folder_path)
|
||
if name.lower().endswith((".jpg", ".jpeg", ".png", ".bmp"))
|
||
]
|
||
)
|
||
self.image_listbox.delete(0, tk.END)
|
||
for image_name in self.image_list:
|
||
self.image_listbox.insert(tk.END, image_name)
|
||
self.highlight_images_with_coordinates()
|
||
|
||
def load_existing_coordinates(self):
|
||
self.coordinates = {}
|
||
if not self.folder_path:
|
||
return
|
||
data_file = os.path.join(self.folder_path, "data.txt")
|
||
if not os.path.exists(data_file):
|
||
return
|
||
with open(data_file, "r", encoding="utf-8") as file:
|
||
for raw_line in file:
|
||
line = raw_line.strip()
|
||
if not line or ": " not in line:
|
||
continue
|
||
image_name, coords_str = line.split(": ", 1)
|
||
self.coordinates[image_name] = coords_str
|
||
|
||
def load_image_from_folder(self):
|
||
if not self.folder_path or not self.image_list:
|
||
return
|
||
image_name = self.image_list[self.current_image_index]
|
||
image_path = os.path.join(self.folder_path, image_name)
|
||
self.image = Image.open(image_path).convert("RGB")
|
||
self.current_shape = None
|
||
self.points = []
|
||
self.box_coords = None
|
||
self.display_image()
|
||
self.highlight_current_image()
|
||
self.restore_coordinates_from_file()
|
||
self.set_status(f"已加载: {image_name}")
|
||
|
||
def on_image_select(self, event):
|
||
if not self.image_listbox.curselection():
|
||
return
|
||
self.current_image_index = self.image_listbox.curselection()[0]
|
||
self.load_image_from_folder()
|
||
|
||
def highlight_current_image(self):
|
||
for index, image_name in enumerate(self.image_list):
|
||
color = "blue" if image_name in self.coordinates else "black"
|
||
self.image_listbox.itemconfig(index, {"fg": color})
|
||
if self.image_list:
|
||
self.image_listbox.itemconfig(self.current_image_index, {"fg": "red"})
|
||
self.image_listbox.selection_clear(0, tk.END)
|
||
self.image_listbox.selection_set(self.current_image_index)
|
||
self.image_listbox.see(self.current_image_index)
|
||
|
||
def highlight_images_with_coordinates(self):
|
||
for index, image_name in enumerate(self.image_list):
|
||
color = "blue" if image_name in self.coordinates else "black"
|
||
self.image_listbox.itemconfig(index, {"fg": color})
|
||
|
||
def prev_image(self):
|
||
if not self.image_list:
|
||
return
|
||
self.current_image_index = max(0, self.current_image_index - 1)
|
||
self.load_image_from_folder()
|
||
|
||
def next_image(self):
|
||
if not self.image_list:
|
||
return
|
||
self.current_image_index = min(len(self.image_list) - 1, self.current_image_index + 1)
|
||
self.load_image_from_folder()
|
||
|
||
def search_image(self, event=None):
|
||
keyword = self.search_entry.get().strip().lower()
|
||
if not keyword:
|
||
return
|
||
for index, image_name in enumerate(self.image_list):
|
||
if keyword in image_name.lower():
|
||
self.current_image_index = index
|
||
self.load_image_from_folder()
|
||
return
|
||
self.set_status("未找到匹配图片", "red")
|
||
|
||
def restore_coordinates(self):
|
||
if not self.image:
|
||
self.set_status("请先加载图片", "red")
|
||
return
|
||
coords_str = self.coords_text.get("1.0", tk.END).strip()
|
||
if not coords_str:
|
||
self.set_status("请输入坐标内容", "red")
|
||
return
|
||
try:
|
||
annotation = self.parse_annotation(coords_str)
|
||
except ValueError as exc:
|
||
self.set_status(str(exc), "red")
|
||
return
|
||
self.load_annotation(annotation, from_input=True)
|
||
|
||
def restore_coordinates_from_file(self):
|
||
if not self.image_list:
|
||
return
|
||
image_name = self.image_list[self.current_image_index]
|
||
coords_str = self.coordinates.get(image_name)
|
||
if not coords_str:
|
||
self.coords_text.delete("1.0", tk.END)
|
||
self.crop_entry.delete(0, tk.END)
|
||
return
|
||
try:
|
||
annotation = self.parse_annotation(coords_str)
|
||
except ValueError:
|
||
self.coords_text.delete("1.0", tk.END)
|
||
self.coords_text.insert("1.0", coords_str)
|
||
self.set_status(f"{image_name} 的坐标格式无法自动恢复", "red")
|
||
return
|
||
self.load_annotation(annotation)
|
||
|
||
def batch_restore_coordinates(self):
|
||
if not self.image:
|
||
self.set_status("请先加载图片", "red")
|
||
return
|
||
input_text = self.coords_text.get("1.0", tk.END).strip()
|
||
if not input_text:
|
||
self.set_status("请输入批量框内容", "red")
|
||
return
|
||
|
||
self.clear_restored_shapes()
|
||
count = 0
|
||
pattern = re.compile(
|
||
r"result\s+(\d+):\s*\(\s*(\d+)\s+(\d+)\s+(\d+)\s+(\d+)\s*\)\s*[\d.]+\s*,\s*([A-Za-z0-9_\-]+)"
|
||
)
|
||
for line in input_text.splitlines():
|
||
match = pattern.search(line)
|
||
if not match:
|
||
continue
|
||
result_id, x1, y1, x2, y2, label = match.groups()
|
||
coords = (int(x1), int(y1), int(x2), int(y2))
|
||
self.draw_rect(coords, color="yellow", editable=False, label=f"result {result_id} {label}")
|
||
count += 1
|
||
|
||
if count == 0:
|
||
self.set_status("没有识别到批量框格式", "red")
|
||
else:
|
||
self.set_status(f"已回显 {count} 个检测框")
|
||
|
||
def get_crop_box(self):
|
||
if self.box_coords:
|
||
return self.box_coords
|
||
raw = self.crop_entry.get().strip()
|
||
if not raw:
|
||
raise ValueError("请先框选或输入裁剪坐标")
|
||
annotation = self.parse_annotation(raw)
|
||
if annotation["type"] == "rect":
|
||
return annotation["coords"]
|
||
xs = [point[0] for point in annotation["points"]]
|
||
ys = [point[1] for point in annotation["points"]]
|
||
return min(xs), min(ys), max(xs), max(ys)
|
||
|
||
def crop_image(self):
|
||
if not self.image:
|
||
self.set_status("请先加载图片", "red")
|
||
return
|
||
try:
|
||
x1, y1, x2, y2 = self.get_crop_box()
|
||
except ValueError as exc:
|
||
self.set_status(str(exc), "red")
|
||
return
|
||
|
||
x1 = max(0, min(x1, self.image.width))
|
||
y1 = max(0, min(y1, self.image.height))
|
||
x2 = max(0, min(x2, self.image.width))
|
||
y2 = max(0, min(y2, self.image.height))
|
||
if x2 <= x1 or y2 <= y1:
|
||
self.set_status("裁剪区域无效", "red")
|
||
return
|
||
|
||
cropped = self.image.crop((x1, y1, x2, y2))
|
||
target_size = 640
|
||
ratio = min(target_size / cropped.width, target_size / cropped.height)
|
||
new_width = max(1, int(cropped.width * ratio))
|
||
new_height = max(1, int(cropped.height * ratio))
|
||
resized = cropped.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||
background = Image.new("RGB", (target_size, target_size), (0, 0, 0))
|
||
offset = ((target_size - new_width) // 2, (target_size - new_height) // 2)
|
||
background.paste(resized, offset)
|
||
|
||
save_path = filedialog.asksaveasfilename(
|
||
title="保存裁剪结果",
|
||
defaultextension=".jpg",
|
||
filetypes=[("JPEG files", "*.jpg"), ("PNG files", "*.png")],
|
||
)
|
||
if save_path:
|
||
background.save(save_path)
|
||
|
||
self.image = background
|
||
self.current_shape = None
|
||
self.points = []
|
||
self.box_coords = None
|
||
self.display_image()
|
||
self.set_status(f"裁剪完成: ({x1},{y1})-({x2},{y2})")
|
||
|
||
def compress_image(self):
|
||
if not self.image:
|
||
self.set_status("请先加载图片", "red")
|
||
return
|
||
self.image = self.make_640_image(self.image)
|
||
self.current_shape = None
|
||
self.points = []
|
||
self.box_coords = None
|
||
self.display_image()
|
||
self.set_status("当前图片已压缩到 640x640")
|
||
|
||
def compress_all_images(self):
|
||
if not self.folder_path or not self.image_list:
|
||
self.set_status("请先选择工作文件夹", "red")
|
||
return
|
||
for image_name in self.image_list:
|
||
image_path = os.path.join(self.folder_path, image_name)
|
||
image = Image.open(image_path).convert("RGB")
|
||
compressed = self.make_640_image(image)
|
||
compressed.save(image_path)
|
||
self.load_image_from_folder()
|
||
self.set_status("批量压缩完成")
|
||
|
||
def make_640_image(self, image):
|
||
target_size = 640
|
||
ratio = min(target_size / image.width, target_size / image.height)
|
||
new_width = max(1, int(image.width * ratio))
|
||
new_height = max(1, int(image.height * ratio))
|
||
resized = image.resize((new_width, new_height), Image.Resampling.LANCZOS)
|
||
background = Image.new("RGB", (target_size, target_size), (0, 0, 0))
|
||
offset = ((target_size - new_width) // 2, (target_size - new_height) // 2)
|
||
background.paste(resized, offset)
|
||
return background
|
||
|
||
def connect_rtsp(self):
|
||
rtsp_url = simpledialog.askstring("RTSP 连接", "请输入 RTSP 地址:")
|
||
if not rtsp_url:
|
||
return
|
||
self.rtsp_url = rtsp_url
|
||
self.set_status("正在连接 RTSP...", "blue")
|
||
self.rtsp_thread = threading.Thread(target=self._connect_rtsp_thread, daemon=True)
|
||
self.rtsp_thread.start()
|
||
|
||
def _connect_rtsp_thread(self):
|
||
try:
|
||
cap = cv2.VideoCapture(self.rtsp_url)
|
||
cap.set(cv2.CAP_PROP_OPEN_TIMEOUT_MSEC, 5000)
|
||
if not cap.isOpened():
|
||
self.root.after(0, lambda: self.set_status("RTSP 连接失败", "red"))
|
||
return
|
||
success, _ = cap.read()
|
||
if not success:
|
||
cap.release()
|
||
self.root.after(0, lambda: self.set_status("RTSP 读取失败", "red"))
|
||
return
|
||
self.cap = cap
|
||
self.root.after(0, lambda: self.set_status("RTSP 已连接"))
|
||
except Exception as exc:
|
||
self.root.after(0, lambda: self.set_status(f"RTSP 错误: {exc}", "red"))
|
||
|
||
def capture_rtsp_frame(self):
|
||
if not self.cap or not self.cap.isOpened():
|
||
self.set_status("RTSP 尚未连接", "red")
|
||
return
|
||
success, frame = self.cap.read()
|
||
if not success:
|
||
self.set_status("RTSP 截图失败", "red")
|
||
return
|
||
frame_rgb = cv2.cvtColor(frame, cv2.COLOR_BGR2RGB)
|
||
self.image = Image.fromarray(frame_rgb)
|
||
self.current_shape = None
|
||
self.points = []
|
||
self.box_coords = None
|
||
self.display_image()
|
||
|
||
save_path = filedialog.asksaveasfilename(
|
||
title="保存 RTSP 截图",
|
||
initialfile=f"rtsp_capture_{time.strftime('%Y%m%d_%H%M%S')}.jpg",
|
||
defaultextension=".jpg",
|
||
filetypes=[("JPEG files", "*.jpg"), ("PNG files", "*.png")],
|
||
)
|
||
if save_path:
|
||
cv2.imwrite(save_path, frame)
|
||
if self.folder_path:
|
||
copied_path = os.path.join(self.folder_path, os.path.basename(save_path))
|
||
shutil.copy2(save_path, copied_path)
|
||
self.refresh_image_list()
|
||
self.set_status("RTSP 截图完成")
|
||
|
||
def disconnect_rtsp(self):
|
||
if self.cap:
|
||
self.cap.release()
|
||
self.cap = None
|
||
self.set_status("RTSP 已断开")
|
||
|
||
def extract_video_frames(self):
|
||
video_path = filedialog.askopenfilename(
|
||
title="选择视频文件",
|
||
filetypes=[("Video files", "*.mp4 *.avi *.mov *.mkv")],
|
||
)
|
||
if not video_path:
|
||
return
|
||
output_folder = filedialog.askdirectory(title="选择输出文件夹")
|
||
if not output_folder:
|
||
return
|
||
target_fps = simpledialog.askinteger("抽帧", "请输入目标帧率(FPS):", initialvalue=5, minvalue=1, maxvalue=60)
|
||
if not target_fps:
|
||
return
|
||
|
||
video = cv2.VideoCapture(video_path)
|
||
original_fps = video.get(cv2.CAP_PROP_FPS)
|
||
if not original_fps:
|
||
video.release()
|
||
self.set_status("无法读取视频帧率", "red")
|
||
return
|
||
|
||
interval = max(1, int(original_fps / target_fps))
|
||
frame_index = 0
|
||
saved_count = 0
|
||
while True:
|
||
success, frame = video.read()
|
||
if not success:
|
||
break
|
||
if frame_index % interval == 0:
|
||
save_path = os.path.join(output_folder, f"frame_{saved_count:05d}.jpg")
|
||
cv2.imwrite(save_path, frame)
|
||
saved_count += 1
|
||
frame_index += 1
|
||
video.release()
|
||
self.set_status(f"抽帧完成,保存 {saved_count} 张")
|
||
|
||
def collect_labeled_images(self):
|
||
label_folder = filedialog.askdirectory(title="选择标签文件夹")
|
||
if not label_folder:
|
||
return
|
||
image_source_folder = filedialog.askdirectory(title="选择图片源文件夹")
|
||
if not image_source_folder:
|
||
return
|
||
output_folder = filedialog.askdirectory(title="选择输出文件夹")
|
||
if not output_folder:
|
||
return
|
||
|
||
images_out = os.path.join(output_folder, "images")
|
||
labels_out = os.path.join(output_folder, "labels")
|
||
os.makedirs(images_out, exist_ok=True)
|
||
os.makedirs(labels_out, exist_ok=True)
|
||
|
||
count = 0
|
||
for filename in os.listdir(label_folder):
|
||
if not filename.endswith(".txt") or filename == "classes.txt":
|
||
continue
|
||
base_name = os.path.splitext(filename)[0]
|
||
src_label = os.path.join(label_folder, filename)
|
||
found_image = None
|
||
for suffix in (".jpg", ".jpeg", ".png", ".bmp"):
|
||
candidate = os.path.join(image_source_folder, base_name + suffix)
|
||
if os.path.exists(candidate):
|
||
found_image = candidate
|
||
break
|
||
if not found_image:
|
||
continue
|
||
shutil.copy2(found_image, os.path.join(images_out, os.path.basename(found_image)))
|
||
shutil.copy2(src_label, os.path.join(labels_out, filename))
|
||
count += 1
|
||
|
||
classes_path = os.path.join(label_folder, "classes.txt")
|
||
if os.path.exists(classes_path):
|
||
shutil.copy2(classes_path, os.path.join(output_folder, "classes.txt"))
|
||
|
||
self.set_status(f"整理完成,共复制 {count} 组图像和标签")
|
||
messagebox.showinfo("完成", f"已整理 {count} 组标注数据")
|
||
|
||
|
||
def main():
|
||
root = tk.Tk()
|
||
UniversalAnnotationTool(root)
|
||
root.mainloop()
|
||
|
||
|
||
if __name__ == "__main__":
|
||
main()
|