"""FastAPI server for the web-based barrier-opening picker. Browse to http://:8000/ and you'll see a video player loaded with the next un-annotated video from the queue. Use the arrow keys to scrub (←/→ ±5 s, Shift+←/→ ±1 s, Ctrl+←/→ ±0.1 s), space to pause/play, or click the seekbar. When the barrier opens, click one of: [All barriers open] every ROI is usable post-opening [Upper barriers open] only ROIs 1,3,5 are usable [Lower barriers open] only ROIs 2,4,6 are usable The current playhead time is recorded as the barrier-opening moment; ROI inclusion is set accordingly. There is also a Skip and a Mark unusable button. The queue is built from the merged TSV plus the inventory: every unique (machine, date, time) that has both a tracking DB and an mp4 on disk and is not yet in barrier_opening.csv. Submissions persist to barrier_opening.csv after every click — refresh-safe. Configuration (environment variables): CUPIDO_DATA_VOLUME /mnt/data/projects/cupido (data volume) CUPIDO_INVENTORY_CSV /cupido/data/metadata/video_inventory.csv CUPIDO_OUTPUT_CSV /cupido/data/metadata/barrier_opening.csv """ from __future__ import annotations import os import re from dataclasses import dataclass from pathlib import Path import pandas as pd from fastapi import FastAPI, HTTPException, Request from fastapi.responses import FileResponse, HTMLResponse, JSONResponse, Response from fastapi.staticfiles import StaticFiles from pydantic import BaseModel # ─── Config ────────────────────────────────────────────────────────────── DATA_VOLUME = Path(os.environ.get("CUPIDO_DATA_VOLUME", "/mnt/data/projects/cupido")) INVENTORY_CSV = Path(os.environ.get( "CUPIDO_INVENTORY_CSV", "/cupido/data/metadata/video_inventory.csv" )) OUTPUT_CSV = Path(os.environ.get( "CUPIDO_OUTPUT_CSV", "/cupido/data/metadata/barrier_opening.csv" )) TSV_PATH = DATA_VOLUME / "all_video_info_merged.tsv" # Reason: the (date, time, machine_uuid) prefix encoded in every tracking # DB filename and every inventory mp4 filename. DB_NAME_RE = re.compile( r"^(\d{4}-\d{2}-\d{2})_(\d{2}-\d{2}-\d{2})_([0-9a-f]{32})__" ) OUT_COLS = ["machine_name", "session_date", "session_time", "opening_s", "trim_first_s", "bad_rois", "notes"] # ROI numbering in the HD mating arena (verified via tracking_geometry): # upper row = ROIs 1, 3, 5 (y ≈ 0.125) # lower row = ROIs 2, 4, 6 (y ≈ 0.795) ROIS_UPPER = "1,3,5" ROIS_LOWER = "2,4,6" @dataclass(frozen=True) class QueueItem: idx: int machine_name: str session_date: str session_time: str mp4_path: str duration_s: float | None done: bool # ─── Queue building ───────────────────────────────────────────────────── def _build_queue() -> list[QueueItem]: """Build the ordered queue of pickable videos.""" if not TSV_PATH.exists(): raise RuntimeError(f"merged TSV not found at {TSV_PATH}") if not INVENTORY_CSV.exists(): raise RuntimeError(f"inventory not found at {INVENTORY_CSV}") tsv = pd.read_csv(TSV_PATH, sep="\t") inv = pd.read_csv(INVENTORY_CSV) inv_by_key: dict[tuple[str, str, str], dict] = {} for r in inv.itertuples(index=False): inv_by_key[(r.machine_name, r.session_date, r.session_time)] = { "mp4_path": r.mp4_path, "duration_s": float(r.duration_s) if pd.notna(r.duration_s) else None, } if OUTPUT_CSV.exists(): out = pd.read_csv(OUTPUT_CSV) done_keys = set(zip(out["machine_name"], out["session_date"], out["session_time"])) else: done_keys = set() seen: set[tuple[str, str, str]] = set() items: list[QueueItem] = [] for col in ("training_db_path", "testing_db_path"): for row in tsv.itertuples(index=False): db = getattr(row, col) if not isinstance(db, str) or not db: continue db_path = Path(db) if not db_path.exists(): continue m = DB_NAME_RE.match(db_path.name) if not m: continue session_date, session_time = m.group(1), m.group(2) key = (row.machine_name, session_date, session_time) if key in seen: continue seen.add(key) inv_row = inv_by_key.get(key) if inv_row is None or not Path(inv_row["mp4_path"]).exists(): continue items.append(QueueItem( idx=len(items), machine_name=row.machine_name, session_date=session_date, session_time=session_time, mp4_path=inv_row["mp4_path"], duration_s=inv_row["duration_s"], done=key in done_keys, )) return items # ─── App ─────────────────────────────────────────────────────────────── app = FastAPI(title="Cupido barrier-opening picker") STATIC_DIR = Path(__file__).parent / "static" app.mount("/static", StaticFiles(directory=STATIC_DIR), name="static") @app.get("/") async def index() -> FileResponse: return FileResponse(STATIC_DIR / "index.html") @app.get("/api/queue") async def get_queue() -> JSONResponse: queue = _build_queue() return JSONResponse([ { "idx": q.idx, "machine_name": q.machine_name, "session_date": q.session_date, "session_time": q.session_time, "duration_s": q.duration_s, "done": q.done, } for q in queue ]) def _stream_video(file_path: Path, request: Request) -> Response: """HTTP Range-aware video streaming.""" file_size = file_path.stat().st_size range_header = request.headers.get("range") if range_header is None: return FileResponse(file_path, media_type="video/mp4", headers={"Accept-Ranges": "bytes"}) # Parse "bytes=START-END" (END optional) m = re.match(r"bytes=(\d+)-(\d*)", range_header) if not m: raise HTTPException(status_code=416, detail="bad Range header") start = int(m.group(1)) end = int(m.group(2)) if m.group(2) else file_size - 1 end = min(end, file_size - 1) if start > end: raise HTTPException(status_code=416, detail="range not satisfiable") chunk_size = end - start + 1 def iterfile(): with open(file_path, "rb") as f: f.seek(start) remaining = chunk_size while remaining > 0: buf = f.read(min(64 * 1024, remaining)) if not buf: break yield buf remaining -= len(buf) return Response( content=b"".join(iterfile()), status_code=206, media_type="video/mp4", headers={ "Content-Range": f"bytes {start}-{end}/{file_size}", "Accept-Ranges": "bytes", "Content-Length": str(chunk_size), }, ) @app.get("/api/video/{idx}") async def get_video(idx: int, request: Request) -> Response: queue = _build_queue() if not 0 <= idx < len(queue): raise HTTPException(status_code=404, detail="idx out of range") return _stream_video(Path(queue[idx].mp4_path), request) class Submission(BaseModel): idx: int time_s: float | None # None when marking unusable mode: str # "all" | "upper" | "lower" | "unusable" | "skip" notes: str = "" @app.post("/api/submit") async def submit(payload: Submission) -> dict: queue = _build_queue() if not 0 <= payload.idx < len(queue): raise HTTPException(status_code=404, detail="idx out of range") item = queue[payload.idx] if payload.mode == "skip": return {"status": "skipped"} if payload.mode == "unusable": row = { "machine_name": item.machine_name, "session_date": item.session_date, "session_time": item.session_time, "opening_s": float("nan"), "trim_first_s": 0, "bad_rois": "", "notes": payload.notes or "unusable", } else: if payload.time_s is None: raise HTTPException(status_code=400, detail="time_s required") bad_rois = { "all": "", "upper": ROIS_LOWER, # upper-only opens → lower row is bad "lower": ROIS_UPPER, # lower-only opens → upper row is bad }.get(payload.mode) if bad_rois is None: raise HTTPException(status_code=400, detail=f"unknown mode: {payload.mode}") row = { "machine_name": item.machine_name, "session_date": item.session_date, "session_time": item.session_time, "opening_s": round(payload.time_s, 1), "trim_first_s": 0, "bad_rois": bad_rois, "notes": payload.notes, } OUTPUT_CSV.parent.mkdir(parents=True, exist_ok=True) if OUTPUT_CSV.exists(): out = pd.read_csv(OUTPUT_CSV) else: out = pd.DataFrame(columns=OUT_COLS) for col in OUT_COLS: if col not in out.columns: out[col] = "" # Replace any existing row for this key. mask = ~((out["machine_name"] == row["machine_name"]) & (out["session_date"] == row["session_date"]) & (out["session_time"] == row["session_time"])) out = pd.concat([out[mask], pd.DataFrame([row])], ignore_index=True) out[OUT_COLS].to_csv(OUTPUT_CSV, index=False) return {"status": "saved", "row": row} if __name__ == "__main__": import uvicorn host = os.environ.get("HOST", "0.0.0.0") port = int(os.environ.get("PORT", "8000")) uvicorn.run("app:app", host=host, port=port, reload=False)