From d2028a642bf2d8710ba2f8f5647809a2676f2916 Mon Sep 17 00:00:00 2001 From: "Simonson, Andrew" Date: Wed, 18 Feb 2026 15:30:52 -0600 Subject: [PATCH] Add async pipeline with progress monitoring, resumability, and result transparency Pipeline engine rewritten with combo-first loop: each combination is processed through all requested passes before moving to the next, with incremental DB saves after every step (crash-safe). Blocked combos now get result rows so they appear in the results page with constraint violation reasons. New pipeline_runs table tracks run lifecycle (pending/running/completed/failed/ cancelled). Web route launches pipeline in a background thread with its own DB connection. HTMX polling partial shows live progress with per-pass breakdown. Also: status guard prevents reviewed->scored downgrade, save_combination loads existing status on dedup for correct resume, per-metric scores show domain bounds + units + position bars, ensure_metric backfills units on existing rows. Co-Authored-By: Claude Opus 4.6 --- CLAUDE.md | 65 +++ src/physcom/db/repository.py | 156 ++++++- src/physcom/db/schema.py | 18 + src/physcom/engine/pipeline.py | 387 ++++++++++++------ src/physcom/models/domain.py | 1 + src/physcom/seed/transport_example.py | 20 +- src/physcom_web/routes/entities.py | 30 +- src/physcom_web/routes/pipeline.py | 128 +++++- src/physcom_web/routes/results.py | 4 +- src/physcom_web/static/style.css | 74 ++++ src/physcom_web/templates/domains/list.html | 4 +- .../templates/entities/detail.html | 32 +- .../templates/pipeline/_run_status.html | 78 ++++ src/physcom_web/templates/pipeline/run.html | 112 ++++- src/physcom_web/templates/results/detail.html | 44 +- src/physcom_web/templates/results/list.html | 22 +- tests/test_pipeline_async.py | 305 ++++++++++++++ 17 files changed, 1263 insertions(+), 217 deletions(-) create mode 100644 CLAUDE.md create mode 100644 src/physcom_web/templates/pipeline/_run_status.html create mode 100644 tests/test_pipeline_async.py diff --git a/CLAUDE.md b/CLAUDE.md new file mode 100644 index 0000000..3f905d4 --- /dev/null +++ b/CLAUDE.md @@ -0,0 +1,65 @@ +# PhysCom — Physical Combinatorics + +Innovation discovery engine: generate entity combinations, filter by physical constraints, score against domain-specific metrics, rank results. + +## Commands + +- **Tests**: `python -m pytest tests/ -q` (48 tests, ~3s). Run after every change. +- **Web dev server**: `python -m physcom_web` +- **CLI**: `python -m physcom` +- **Seed data**: loaded automatically on first DB init (SQLite, `physcom.db` or `$PHYSCOM_DB`) + +## Architecture + +``` +src/physcom/ # Core library (no web dependency) + models/ # Dataclasses: Entity, Dependency, Combination, Domain, MetricBound + db/schema.py # DDL (all CREATE TABLE statements) + db/repository.py # All DB access — single Repository class, sqlite3 row_factory=Row + engine/combinator.py # Cartesian product of entities across dimensions + engine/constraint_resolver.py # Pass 1: requires/excludes/mutex/range/force checks + engine/scorer.py # Pass 3: log-normalize raw→0-1, weighted geometric mean composite + engine/pipeline.py # Orchestrator: combo-first loop, incremental saves, resume, cancel + llm/base.py # LLMProvider ABC (estimate_physics, review_plausibility) + llm/providers/mock.py # MockLLMProvider for tests + seed/transport_example.py # 9 platforms + 9 power sources, 2 domains + +src/physcom_web/ # Flask web UI + app.py # App factory, get_repo(), DB path resolution + routes/pipeline.py # Background thread pipeline execution, HTMX status/cancel endpoints + routes/results.py # Results browse, detail view, human review submission + routes/entities.py # Entity CRUD + routes/domains.py # Domain listing + templates/ # Jinja2, extends base.html, uses HTMX for polling + static/style.css # Single stylesheet + +tests/ # pytest, uses seeded_repo fixture from conftest.py +``` + +## Key patterns + +- **Repository is the only DB interface.** No raw SQL outside `repository.py`. +- **Pipeline is combo-first**: each combo goes through all requested passes before the next combo starts. Progress is persisted per-combo (crash-safe, resumable). +- **`pipeline_runs` table** tracks run lifecycle: pending → running → completed/failed/cancelled. The web route creates the record, then starts a background thread with its own `sqlite3.Connection`. +- **`combination_results`** has rows for ALL combos including blocked ones (pass_reached=1, composite_score=0.0). Scored combos get pass_reached=3+. +- **Status guard**: `update_combination_status` refuses to downgrade `reviewed` → `scored`. +- **`save_combination`** loads existing status/block_reason on dedup (important for resume). +- **`ensure_metric`** backfills unit if the row already exists with an empty unit. +- **MetricBound** carries `unit` — flows through seed → ensure_metric → metrics table → get_combination_scores → template display. +- **HTMX polling**: `_run_status.html` partial polls every 2s while run is pending/running; stops polling when terminal. + +## Data flow (pipeline passes) + +1. **Pass 1 — Constraints**: `ConstraintResolver.resolve()` → blocked/conditional/valid. Blocked combos get a result row and `continue`. +2. **Pass 2 — Estimation**: LLM or `_stub_estimate()` → raw metric values. Saved immediately via `save_raw_estimates()` (normalized_score=NULL). +3. **Pass 3 — Scoring**: `Scorer.score_combination()` → log-normalized scores + weighted geometric mean composite. Saves via `save_scores()` + `save_result()`. +4. **Pass 4 — LLM Review**: Only for above-threshold combos with an LLM provider. +5. **Pass 5 — Human Review**: Manual via web UI results page. + +## Conventions + +- Python 3.11+, `from __future__ import annotations` everywhere. +- Dataclasses for models, no ORM. +- Tests use `seeded_repo` fixture (in-memory SQLite with transport seed data). +- Don't use `cd` in Bash commands — run from the working directory so pre-approved permission patterns match. +- Don't add docstrings/comments/type annotations to code you didn't change. diff --git a/src/physcom/db/repository.py b/src/physcom/db/repository.py index 9bc80af..6a2ac9e 100644 --- a/src/physcom/db/repository.py +++ b/src/physcom/db/repository.py @@ -3,7 +3,9 @@ from __future__ import annotations import hashlib +import json import sqlite3 +from datetime import datetime, timezone from typing import Sequence from physcom.models.entity import Dependency, Entity @@ -170,6 +172,11 @@ class Repository: "INSERT OR IGNORE INTO metrics (name, unit, description) VALUES (?, ?, ?)", (name, unit, description), ) + if unit: + self.conn.execute( + "UPDATE metrics SET unit = ? WHERE name = ? AND (unit IS NULL OR unit = '')", + (unit, name), + ) row = self.conn.execute("SELECT id FROM metrics WHERE name = ?", (name,)).fetchone() self.conn.commit() return row["id"] @@ -181,7 +188,7 @@ class Repository: ) domain.id = cur.lastrowid for mb in domain.metric_bounds: - metric_id = self.ensure_metric(mb.metric_name) + metric_id = self.ensure_metric(mb.metric_name, unit=mb.unit) mb.metric_id = metric_id self.conn.execute( """INSERT INTO domain_metric_weights @@ -233,10 +240,13 @@ class Repository: combination.hash = self.compute_hash(entity_ids) existing = self.conn.execute( - "SELECT id FROM combinations WHERE hash = ?", (combination.hash,) + "SELECT id, status, block_reason FROM combinations WHERE hash = ?", + (combination.hash,), ).fetchone() if existing: combination.id = existing["id"] + combination.status = existing["status"] + combination.block_reason = existing["block_reason"] return combination cur = self.conn.execute( @@ -255,6 +265,13 @@ class Repository: def update_combination_status( self, combo_id: int, status: str, block_reason: str | None = None ) -> None: + # Don't downgrade 'reviewed' to 'scored' — preserve human review state + if status == "scored": + row = self.conn.execute( + "SELECT status FROM combinations WHERE id = ?", (combo_id,) + ).fetchone() + if row and row["status"] == "reviewed": + return self.conn.execute( "UPDATE combinations SET status = ?, block_reason = ? WHERE id = ?", (status, block_reason, combo_id), @@ -327,7 +344,7 @@ class Repository: def get_combination_scores(self, combo_id: int, domain_id: int) -> list[dict]: """Return per-metric scores for a combination in a domain.""" rows = self.conn.execute( - """SELECT cs.*, m.name as metric_name + """SELECT cs.*, m.name as metric_name, m.unit as metric_unit FROM combination_scores cs JOIN metrics m ON cs.metric_id = m.id WHERE cs.combination_id = ? AND cs.domain_id = ?""", @@ -335,12 +352,52 @@ class Repository: ).fetchall() return [dict(r) for r in rows] - def count_combinations_by_status(self) -> dict[str, int]: - rows = self.conn.execute( - "SELECT status, COUNT(*) as cnt FROM combinations GROUP BY status" - ).fetchall() + def count_combinations_by_status(self, domain_name: str | None = None) -> dict[str, int]: + """Count combos by status. If domain_name given, only combos with results in that domain.""" + if domain_name: + rows = self.conn.execute( + """SELECT c.status, COUNT(*) as cnt + FROM combination_results cr + JOIN combinations c ON cr.combination_id = c.id + JOIN domains d ON cr.domain_id = d.id + WHERE d.name = ? + GROUP BY c.status""", + (domain_name,), + ).fetchall() + else: + rows = self.conn.execute( + "SELECT status, COUNT(*) as cnt FROM combinations GROUP BY status" + ).fetchall() return {r["status"]: r["cnt"] for r in rows} + def get_pipeline_summary(self, domain_name: str) -> dict | None: + """Return a summary of results for a domain, or None if no results.""" + row = self.conn.execute( + """SELECT COUNT(*) as total, + AVG(cr.composite_score) as avg_score, + MAX(cr.composite_score) as max_score, + MIN(cr.composite_score) as min_score, + MAX(cr.pass_reached) as last_pass + FROM combination_results cr + JOIN domains d ON cr.domain_id = d.id + WHERE d.name = ?""", + (domain_name,), + ).fetchone() + if not row or row["total"] == 0: + return None + # Also count blocked combos (they have no results but exist) + blocked = self.conn.execute( + "SELECT COUNT(*) as cnt FROM combinations WHERE status = 'blocked'" + ).fetchone() + return { + "total_results": row["total"], + "avg_score": row["avg_score"], + "max_score": row["max_score"], + "min_score": row["min_score"], + "last_pass": row["last_pass"], + "blocked": blocked["cnt"] if blocked else 0, + } + def get_result(self, combo_id: int, domain_id: int) -> dict | None: """Return a single combination_result row.""" row = self.conn.execute( @@ -412,3 +469,88 @@ class Repository: "pass_reached": r["pass_reached"], }) return results + + # ── Pipeline Runs ──────────────────────────────────────── + + def create_pipeline_run(self, domain_id: int, config: dict) -> int: + """Create a new pipeline_run record. Returns the run id.""" + cur = self.conn.execute( + """INSERT INTO pipeline_runs (domain_id, status, config, created_at) + VALUES (?, 'pending', ?, ?)""", + (domain_id, json.dumps(config), datetime.now(timezone.utc).isoformat()), + ) + self.conn.commit() + return cur.lastrowid + + def update_pipeline_run(self, run_id: int, **fields) -> None: + """Update arbitrary fields on a pipeline_run.""" + if not fields: + return + set_clause = ", ".join(f"{k} = ?" for k in fields) + values = list(fields.values()) + values.append(run_id) + self.conn.execute( + f"UPDATE pipeline_runs SET {set_clause} WHERE id = ?", values + ) + self.conn.commit() + + def get_pipeline_run(self, run_id: int) -> dict | None: + row = self.conn.execute( + "SELECT * FROM pipeline_runs WHERE id = ?", (run_id,) + ).fetchone() + return dict(row) if row else None + + def list_pipeline_runs(self, domain_id: int | None = None) -> list[dict]: + if domain_id is not None: + rows = self.conn.execute( + """SELECT pr.*, d.name as domain_name + FROM pipeline_runs pr + JOIN domains d ON pr.domain_id = d.id + WHERE pr.domain_id = ? + ORDER BY pr.created_at DESC""", + (domain_id,), + ).fetchall() + else: + rows = self.conn.execute( + """SELECT pr.*, d.name as domain_name + FROM pipeline_runs pr + JOIN domains d ON pr.domain_id = d.id + ORDER BY pr.created_at DESC""" + ).fetchall() + return [dict(r) for r in rows] + + def get_combo_pass_reached(self, combo_id: int, domain_id: int) -> int | None: + """Return the pass_reached for a combo in a domain, or None if no result.""" + row = self.conn.execute( + """SELECT pass_reached FROM combination_results + WHERE combination_id = ? AND domain_id = ?""", + (combo_id, domain_id), + ).fetchone() + return row["pass_reached"] if row else None + + def save_raw_estimates( + self, combo_id: int, domain_id: int, estimates: list[dict] + ) -> None: + """Save raw metric estimates (pass 2) with normalized_score=NULL. + + Each dict: metric_id, raw_value, estimation_method, confidence. + """ + for e in estimates: + self.conn.execute( + """INSERT OR REPLACE INTO combination_scores + (combination_id, domain_id, metric_id, raw_value, normalized_score, + estimation_method, confidence) + VALUES (?, ?, ?, ?, NULL, ?, ?)""", + (combo_id, domain_id, e["metric_id"], e["raw_value"], + e["estimation_method"], e["confidence"]), + ) + self.conn.commit() + + def get_existing_result(self, combo_id: int, domain_id: int) -> dict | None: + """Return the full combination_results row for resume logic.""" + row = self.conn.execute( + """SELECT * FROM combination_results + WHERE combination_id = ? AND domain_id = ?""", + (combo_id, domain_id), + ).fetchone() + return dict(row) if row else None diff --git a/src/physcom/db/schema.py b/src/physcom/db/schema.py index fd39128..5e637bb 100644 --- a/src/physcom/db/schema.py +++ b/src/physcom/db/schema.py @@ -91,11 +91,29 @@ CREATE TABLE IF NOT EXISTS combination_results ( UNIQUE(combination_id, domain_id) ); +CREATE TABLE IF NOT EXISTS pipeline_runs ( + id INTEGER PRIMARY KEY AUTOINCREMENT, + domain_id INTEGER NOT NULL REFERENCES domains(id), + status TEXT NOT NULL DEFAULT 'pending', + config TEXT, + total_combos INTEGER DEFAULT 0, + combos_pass1 INTEGER DEFAULT 0, + combos_pass2 INTEGER DEFAULT 0, + combos_pass3 INTEGER DEFAULT 0, + combos_pass4 INTEGER DEFAULT 0, + current_pass INTEGER, + error_message TEXT, + started_at TIMESTAMP, + completed_at TIMESTAMP, + created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP +); + CREATE INDEX IF NOT EXISTS idx_deps_entity ON dependencies(entity_id); CREATE INDEX IF NOT EXISTS idx_deps_category_key ON dependencies(category, key); CREATE INDEX IF NOT EXISTS idx_combo_status ON combinations(status); CREATE INDEX IF NOT EXISTS idx_scores_combo_domain ON combination_scores(combination_id, domain_id); CREATE INDEX IF NOT EXISTS idx_results_domain_score ON combination_results(domain_id, composite_score DESC); +CREATE INDEX IF NOT EXISTS idx_pipeline_runs_domain ON pipeline_runs(domain_id); """ diff --git a/src/physcom/engine/pipeline.py b/src/physcom/engine/pipeline.py index f231345..e69b7cb 100644 --- a/src/physcom/engine/pipeline.py +++ b/src/physcom/engine/pipeline.py @@ -1,15 +1,15 @@ -"""Multi-pass pipeline orchestrator.""" +"""Multi-pass pipeline orchestrator with incremental saves and resumability.""" from __future__ import annotations from dataclasses import dataclass, field +from datetime import datetime, timezone from physcom.db.repository import Repository from physcom.engine.combinator import generate_combinations from physcom.engine.constraint_resolver import ConstraintResolver, ConstraintResult from physcom.engine.scorer import Scorer from physcom.llm.base import LLMProvider -from physcom.llm.prompts import PHYSICS_ESTIMATION_PROMPT, PLAUSIBILITY_REVIEW_PROMPT from physcom.models.combination import Combination, ScoredResult from physcom.models.domain import Domain @@ -23,12 +23,17 @@ class PipelineResult: pass1_blocked: int = 0 pass1_conditional: int = 0 pass2_estimated: int = 0 + pass3_scored: int = 0 pass3_above_threshold: int = 0 pass4_reviewed: int = 0 pass5_human_reviewed: int = 0 top_results: list[dict] = field(default_factory=list) +class CancelledError(Exception): + """Raised when a pipeline run is cancelled.""" + + def _describe_combination(combo: Combination) -> str: """Build a natural-language description of a combination.""" parts = [f"{e.dimension}: {e.name}" for e in combo.entities] @@ -53,158 +58,281 @@ class Pipeline: self.scorer = scorer self.llm = llm + def _check_cancelled(self, run_id: int | None) -> None: + """Raise CancelledError if the run has been cancelled.""" + if run_id is None: + return + run = self.repo.get_pipeline_run(run_id) + if run and run["status"] == "cancelled": + raise CancelledError("Pipeline run cancelled") + + def _update_run_counters( + self, run_id: int | None, result: PipelineResult, current_pass: int + ) -> None: + """Update pipeline_run progress counters in the DB.""" + if run_id is None: + return + self.repo.update_pipeline_run( + run_id, + combos_pass1=result.pass1_valid + + result.pass1_conditional + + result.pass1_blocked, + combos_pass2=result.pass2_estimated, + combos_pass3=result.pass3_scored, + combos_pass4=result.pass4_reviewed, + current_pass=current_pass, + ) + def run( self, domain: Domain, dimensions: list[str], score_threshold: float = 0.1, passes: list[int] | None = None, + run_id: int | None = None, ) -> PipelineResult: if passes is None: passes = [1, 2, 3, 4, 5] result = PipelineResult() + # Mark run as running (unless already cancelled) + if run_id is not None: + run_record = self.repo.get_pipeline_run(run_id) + if run_record and run_record["status"] == "cancelled": + result.top_results = self.repo.get_top_results(domain.name, limit=20) + return result + self.repo.update_pipeline_run( + run_id, + status="running", + started_at=datetime.now(timezone.utc).isoformat(), + ) + # Generate all combinations combos = generate_combinations(self.repo, dimensions) result.total_generated = len(combos) - # Save all combinations to DB + # Save all combinations to DB (also loads status for existing combos) for combo in combos: self.repo.save_combination(combo) - # ── Pass 1: Constraint Resolution ─────────────────────── - valid_combos: list[Combination] = [] - if 1 in passes: - valid_combos = self._pass1_constraints(combos, result) - else: - valid_combos = combos + if run_id is not None: + self.repo.update_pipeline_run(run_id, total_combos=len(combos)) - # ── Pass 2: Physics Estimation ────────────────────────── - estimated: list[tuple[Combination, dict[str, float]]] = [] - if 2 in passes: - estimated = self._pass2_estimation(valid_combos, domain, result) - else: - # Skip estimation, use zeros - estimated = [(c, {}) for c in valid_combos] + # Prepare metric lookup + metric_names = [mb.metric_name for mb in domain.metric_bounds] + bounds_by_name = {mb.metric_name: mb for mb in domain.metric_bounds} - # ── Pass 3: Scoring & Ranking ─────────────────────────── - scored: list[tuple[Combination, ScoredResult]] = [] - if 3 in passes: - scored = self._pass3_scoring(estimated, domain, score_threshold, result) + # ── Combo-first loop ───────────────────────────────────── + try: + for combo in combos: + self._check_cancelled(run_id) - # ── Pass 4: LLM Review ────────────────────────────────── - if 4 in passes and self.llm: - self._pass4_llm_review(scored, domain, result) + # Check existing progress for this combo in this domain + existing_pass = self.repo.get_combo_pass_reached( + combo.id, domain.id + ) or 0 - # ── Save results after scoring ───────────────────────── - if 3 in passes: - max_pass = max(p for p in passes if p <= 5) - for combo, sr in scored: - self.repo.save_result( - combo.id, domain.id, sr.composite_score, - pass_reached=max_pass, - novelty_flag=sr.novelty_flag, - llm_review=sr.llm_review, + # Load existing result to preserve human review data + existing_result = self.repo.get_existing_result( + combo.id, domain.id ) - self.repo.update_combination_status(combo.id, "scored") - # Collect top results + # ── Pass 1: Constraint Resolution ──────────────── + if 1 in passes and existing_pass < 1: + cr: ConstraintResult = self.resolver.resolve(combo) + if cr.status == "blocked": + combo.status = "blocked" + combo.block_reason = "; ".join(cr.violations) + self.repo.update_combination_status( + combo.id, "blocked", combo.block_reason + ) + # Save a result row so blocked combos appear in results + self.repo.save_result( + combo.id, + domain.id, + composite_score=0.0, + pass_reached=1, + ) + result.pass1_blocked += 1 + self._update_run_counters(run_id, result, current_pass=1) + continue # blocked — skip remaining passes + elif cr.status == "conditional": + combo.status = "valid" + self.repo.update_combination_status(combo.id, "valid") + result.pass1_conditional += 1 + else: + combo.status = "valid" + self.repo.update_combination_status(combo.id, "valid") + result.pass1_valid += 1 + + self._update_run_counters(run_id, result, current_pass=1) + elif 1 in passes: + # Already pass1'd — check if it was blocked + if combo.status == "blocked": + result.pass1_blocked += 1 + continue + else: + result.pass1_valid += 1 + else: + # Pass 1 not requested; check if blocked from a prior run + if combo.status == "blocked": + result.pass1_blocked += 1 + continue + + # ── Pass 2: Physics Estimation ─────────────────── + raw_metrics: dict[str, float] = {} + if 2 in passes and existing_pass < 2: + description = _describe_combination(combo) + if self.llm: + raw_metrics = self.llm.estimate_physics( + description, metric_names + ) + else: + raw_metrics = self._stub_estimate(combo, metric_names) + + # Save raw estimates immediately (crash-safe) + estimate_dicts = [] + for mname, rval in raw_metrics.items(): + mb = bounds_by_name.get(mname) + if mb and mb.metric_id: + estimate_dicts.append({ + "metric_id": mb.metric_id, + "raw_value": rval, + "estimation_method": "llm" if self.llm else "stub", + "confidence": 1.0, + }) + if estimate_dicts: + self.repo.save_raw_estimates( + combo.id, domain.id, estimate_dicts + ) + + result.pass2_estimated += 1 + self._update_run_counters(run_id, result, current_pass=2) + elif 2 in passes: + # Already estimated — reload raw values from DB + existing_scores = self.repo.get_combination_scores( + combo.id, domain.id + ) + raw_metrics = { + s["metric_name"]: s["raw_value"] for s in existing_scores + } + result.pass2_estimated += 1 + else: + # Pass 2 not requested, use empty metrics + raw_metrics = {} + + # ── Pass 3: Scoring & Ranking ──────────────────── + if 3 in passes and existing_pass < 3: + sr = self.scorer.score_combination(combo, raw_metrics) + + # Persist per-metric scores with normalized values + score_dicts = [] + for s in sr.scores: + mb = bounds_by_name.get(s.metric_name) + if mb and mb.metric_id: + score_dicts.append({ + "metric_id": mb.metric_id, + "raw_value": s.raw_value, + "normalized_score": s.normalized_score, + "estimation_method": s.estimation_method, + "confidence": s.confidence, + }) + if score_dicts: + self.repo.save_scores(combo.id, domain.id, score_dicts) + + # Preserve existing human data + novelty_flag = ( + existing_result["novelty_flag"] if existing_result else None + ) + human_notes = ( + existing_result["human_notes"] if existing_result else None + ) + + self.repo.save_result( + combo.id, + domain.id, + sr.composite_score, + pass_reached=3, + novelty_flag=novelty_flag, + human_notes=human_notes, + ) + self.repo.update_combination_status(combo.id, "scored") + + result.pass3_scored += 1 + if sr.composite_score >= score_threshold: + result.pass3_above_threshold += 1 + + self._update_run_counters(run_id, result, current_pass=3) + elif 3 in passes and existing_pass >= 3: + # Already scored — count it + result.pass3_scored += 1 + if existing_result and existing_result["composite_score"] is not None: + if existing_result["composite_score"] >= score_threshold: + result.pass3_above_threshold += 1 + + # ── Pass 4: LLM Review ─────────────────────────── + if 4 in passes and self.llm: + cur_pass = self.repo.get_combo_pass_reached( + combo.id, domain.id + ) or 0 + if cur_pass < 4: + cur_result = self.repo.get_existing_result( + combo.id, domain.id + ) + if ( + cur_result + and cur_result["composite_score"] is not None + and cur_result["composite_score"] >= score_threshold + ): + description = _describe_combination(combo) + db_scores = self.repo.get_combination_scores( + combo.id, domain.id + ) + score_dict = { + s["metric_name"]: s["normalized_score"] + for s in db_scores + if s["normalized_score"] is not None + } + review = self.llm.review_plausibility( + description, score_dict + ) + + self.repo.save_result( + combo.id, + domain.id, + cur_result["composite_score"], + pass_reached=4, + novelty_flag=cur_result.get("novelty_flag"), + llm_review=review, + human_notes=cur_result.get("human_notes"), + ) + result.pass4_reviewed += 1 + self._update_run_counters( + run_id, result, current_pass=4 + ) + + except CancelledError: + if run_id is not None: + self.repo.update_pipeline_run( + run_id, + status="cancelled", + completed_at=datetime.now(timezone.utc).isoformat(), + ) + result.top_results = self.repo.get_top_results(domain.name, limit=20) + return result + + # Mark run as completed + if run_id is not None: + self.repo.update_pipeline_run( + run_id, + status="completed", + completed_at=datetime.now(timezone.utc).isoformat(), + ) + result.top_results = self.repo.get_top_results(domain.name, limit=20) return result - def _pass1_constraints( - self, combos: list[Combination], result: PipelineResult - ) -> list[Combination]: - valid = [] - for combo in combos: - cr: ConstraintResult = self.resolver.resolve(combo) - if cr.status == "blocked": - combo.status = "blocked" - combo.block_reason = "; ".join(cr.violations) - self.repo.update_combination_status( - combo.id, "blocked", combo.block_reason - ) - result.pass1_blocked += 1 - elif cr.status == "conditional": - combo.status = "valid" - self.repo.update_combination_status(combo.id, "valid") - valid.append(combo) - result.pass1_conditional += 1 - else: - combo.status = "valid" - self.repo.update_combination_status(combo.id, "valid") - valid.append(combo) - result.pass1_valid += 1 - return valid - - def _pass2_estimation( - self, - combos: list[Combination], - domain: Domain, - result: PipelineResult, - ) -> list[tuple[Combination, dict[str, float]]]: - metric_names = [mb.metric_name for mb in domain.metric_bounds] - estimated = [] - - for combo in combos: - description = _describe_combination(combo) - if self.llm: - raw_metrics = self.llm.estimate_physics(description, metric_names) - else: - # Stub estimation: derive from dependencies where possible - raw_metrics = self._stub_estimate(combo, metric_names) - estimated.append((combo, raw_metrics)) - result.pass2_estimated += 1 - - return estimated - - def _pass3_scoring( - self, - estimated: list[tuple[Combination, dict[str, float]]], - domain: Domain, - threshold: float, - result: PipelineResult, - ) -> list[tuple[Combination, ScoredResult]]: - scored = [] - for combo, raw_metrics in estimated: - sr = self.scorer.score_combination(combo, raw_metrics) - if sr.composite_score >= threshold: - scored.append((combo, sr)) - result.pass3_above_threshold += 1 - # Persist per-metric scores - score_dicts = [] - bounds_by_name = {mb.metric_name: mb for mb in domain.metric_bounds} - for s in sr.scores: - mb = bounds_by_name.get(s.metric_name) - if mb and mb.metric_id: - score_dicts.append({ - "metric_id": mb.metric_id, - "raw_value": s.raw_value, - "normalized_score": s.normalized_score, - "estimation_method": s.estimation_method, - "confidence": s.confidence, - }) - if score_dicts: - self.repo.save_scores(combo.id, domain.id, score_dicts) - - # Sort by composite score descending - scored.sort(key=lambda x: x[1].composite_score, reverse=True) - return scored - - def _pass4_llm_review( - self, - scored: list[tuple[Combination, ScoredResult]], - domain: Domain, - result: PipelineResult, - ) -> None: - for combo, sr in scored: - description = _describe_combination(combo) - score_dict = {s.metric_name: s.normalized_score for s in sr.scores} - review = self.llm.review_plausibility(description, score_dict) - sr.llm_review = review - result.pass4_reviewed += 1 - def _stub_estimate( self, combo: Combination, metric_names: list[str] ) -> dict[str, float]: @@ -223,24 +351,21 @@ class Pipeline: # Rough speed estimate: F=ma -> v proportional to power/mass if "speed" in raw and mass_kg > 0: - # Very rough: speed ~ power / (mass * drag_coeff) raw["speed"] = min(force_watts / mass_kg * 0.5, 300000) if "cost_efficiency" in raw: - # Lower force = cheaper per km (roughly) raw["cost_efficiency"] = max(0.01, 2.0 - force_watts / 100000) if "safety" in raw: - raw["safety"] = 0.5 # default mid-range + raw["safety"] = 0.5 if "availability" in raw: raw["availability"] = 0.5 if "range_fuel" in raw: - # More power = more range (very rough) raw["range_fuel"] = min(force_watts * 0.01, 1e10) if "range_degradation" in raw: - raw["range_degradation"] = 365 # 1 year default + raw["range_degradation"] = 365 return raw diff --git a/src/physcom/models/domain.py b/src/physcom/models/domain.py index 9946684..347fc79 100644 --- a/src/physcom/models/domain.py +++ b/src/physcom/models/domain.py @@ -13,6 +13,7 @@ class MetricBound: weight: float # 0.0–1.0 norm_min: float # Below this → score 0 norm_max: float # Above this → score 1 + unit: str = "" metric_id: int | None = None diff --git a/src/physcom/seed/transport_example.py b/src/physcom/seed/transport_example.py index 7fe397f..c8181ce 100644 --- a/src/physcom/seed/transport_example.py +++ b/src/physcom/seed/transport_example.py @@ -243,11 +243,11 @@ URBAN_COMMUTING = Domain( name="urban_commuting", description="Daily travel within a city, 1-50km range", metric_bounds=[ - MetricBound("speed", weight=0.25, norm_min=5, norm_max=120), - MetricBound("cost_efficiency", weight=0.25, norm_min=0.01, norm_max=2.0), - MetricBound("safety", weight=0.25, norm_min=0.0, norm_max=1.0), - MetricBound("availability", weight=0.15, norm_min=0.0, norm_max=1.0), - MetricBound("range_fuel", weight=0.10, norm_min=5, norm_max=500), + MetricBound("speed", weight=0.25, norm_min=5, norm_max=120, unit="km/h"), + MetricBound("cost_efficiency", weight=0.25, norm_min=0.01, norm_max=2.0, unit="$/km"), + MetricBound("safety", weight=0.25, norm_min=0.0, norm_max=1.0, unit="0-1"), + MetricBound("availability", weight=0.15, norm_min=0.0, norm_max=1.0, unit="0-1"), + MetricBound("range_fuel", weight=0.10, norm_min=5, norm_max=500, unit="km"), ], ) @@ -255,11 +255,11 @@ INTERPLANETARY = Domain( name="interplanetary_travel", description="Travel between planets within a solar system", metric_bounds=[ - MetricBound("speed", weight=0.30, norm_min=1000, norm_max=300000), - MetricBound("range_fuel", weight=0.30, norm_min=1e6, norm_max=1e10), - MetricBound("safety", weight=0.20, norm_min=0.0, norm_max=1.0), - MetricBound("cost_efficiency", weight=0.10, norm_min=1e3, norm_max=1e9), - MetricBound("range_degradation", weight=0.10, norm_min=100, norm_max=36500), + MetricBound("speed", weight=0.30, norm_min=1000, norm_max=300000, unit="km/s"), + MetricBound("range_fuel", weight=0.30, norm_min=1e6, norm_max=1e10, unit="km"), + MetricBound("safety", weight=0.20, norm_min=0.0, norm_max=1.0, unit="0-1"), + MetricBound("cost_efficiency", weight=0.10, norm_min=1e3, norm_max=1e9, unit="$/km"), + MetricBound("range_degradation", weight=0.10, norm_min=100, norm_max=36500, unit="days"), ], ) diff --git a/src/physcom_web/routes/entities.py b/src/physcom_web/routes/entities.py index 1151720..75d24cb 100644 --- a/src/physcom_web/routes/entities.py +++ b/src/physcom_web/routes/entities.py @@ -40,18 +40,8 @@ def entity_new(): dimensions=repo.list_dimensions()) -@bp.route("/") +@bp.route("/", methods=["GET", "POST"]) def entity_detail(entity_id: int): - repo = get_repo() - entity = repo.get_entity(entity_id) - if not entity: - flash("Entity not found.", "error") - return redirect(url_for("entities.entity_list")) - return render_template("entities/detail.html", entity=entity) - - -@bp.route("//edit", methods=["GET", "POST"]) -def entity_edit(entity_id: int): repo = get_repo() entity = repo.get_entity(entity_id) if not entity: @@ -62,13 +52,17 @@ def entity_edit(entity_id: int): description = request.form.get("description", "").strip() if not name: flash("Name is required.", "error") - return render_template("entities/form.html", entity=entity, - dimensions=repo.list_dimensions()) - repo.update_entity(entity_id, name, description) - flash(f"Entity '{name}' updated.", "success") - return redirect(url_for("entities.entity_detail", entity_id=entity_id)) - return render_template("entities/form.html", entity=entity, - dimensions=repo.list_dimensions()) + else: + repo.update_entity(entity_id, name, description) + flash(f"Entity '{name}' updated.", "success") + entity = repo.get_entity(entity_id) + return render_template("entities/detail.html", entity=entity) + + +@bp.route("//edit") +def entity_edit(entity_id: int): + """Legacy route — redirect to detail page.""" + return redirect(url_for("entities.entity_detail", entity_id=entity_id)) @bp.route("//delete", methods=["POST"]) diff --git a/src/physcom_web/routes/pipeline.py b/src/physcom_web/routes/pipeline.py index 303b68d..031383f 100644 --- a/src/physcom_web/routes/pipeline.py +++ b/src/physcom_web/routes/pipeline.py @@ -1,7 +1,12 @@ -"""Pipeline run routes.""" +"""Pipeline run routes with background execution and progress monitoring.""" from __future__ import annotations +import json +import os +import threading +from pathlib import Path + from flask import Blueprint, flash, redirect, render_template, request, url_for from physcom_web.app import get_repo @@ -9,12 +14,77 @@ from physcom_web.app import get_repo bp = Blueprint("pipeline", __name__, url_prefix="/pipeline") +def _run_pipeline_in_background( + db_path: str, + domain_name: str, + dim_list: list[str], + passes: list[int], + threshold: float, + run_id: int, +) -> None: + """Run the pipeline in a background thread with its own DB connection.""" + from physcom.db.schema import init_db + from physcom.db.repository import Repository + from physcom.engine.constraint_resolver import ConstraintResolver + from physcom.engine.scorer import Scorer + from physcom.engine.pipeline import Pipeline + + try: + conn = init_db(db_path) + repo = Repository(conn) + + domain = repo.get_domain(domain_name) + if not domain: + repo.update_pipeline_run( + run_id, status="failed", + error_message=f"Domain '{domain_name}' not found", + ) + conn.close() + return + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer, llm=None) + + pipeline.run( + domain, dim_list, + score_threshold=threshold, + passes=passes, + run_id=run_id, + ) + except Exception as exc: + try: + repo.update_pipeline_run( + run_id, status="failed", + error_message=str(exc)[:500], + ) + except Exception: + pass + finally: + try: + conn.close() + except Exception: + pass + + @bp.route("/") def pipeline_form(): repo = get_repo() domains = repo.list_domains() dimensions = repo.list_dimensions() - return render_template("pipeline/run.html", domains=domains, dimensions=dimensions) + # Build per-domain summaries + summaries = {} + for d in domains: + summaries[d.name] = repo.get_pipeline_summary(d.name) + # Get recent pipeline runs + runs = repo.list_pipeline_runs() + return render_template( + "pipeline/run.html", + domains=domains, + dimensions=dimensions, + summaries=summaries, + runs=runs, + ) @bp.route("/run", methods=["POST"]) @@ -37,20 +107,50 @@ def pipeline_run(): flash("Select at least one dimension.", "error") return redirect(url_for("pipeline.pipeline_form")) - from physcom.engine.constraint_resolver import ConstraintResolver - from physcom.engine.scorer import Scorer - from physcom.engine.pipeline import Pipeline + # Create pipeline_run record + config = { + "passes": passes, + "threshold": threshold, + "dimensions": dim_list, + } + run_id = repo.create_pipeline_run(domain.id, config) - resolver = ConstraintResolver() - scorer = Scorer(domain) - pipeline = Pipeline(repo, resolver, scorer, llm=None) + # Resolve DB path for the background thread + from physcom_web.app import DEFAULT_DB + db_path = str(Path(os.environ.get("PHYSCOM_DB", str(DEFAULT_DB)))) - result = pipeline.run(domain, dim_list, score_threshold=threshold, passes=passes) + # Start background thread + t = threading.Thread( + target=_run_pipeline_in_background, + args=(db_path, domain_name, dim_list, passes, threshold, run_id), + daemon=True, + ) + t.start() flash( - f"Pipeline complete: {result.total_generated} combos generated, " - f"{result.pass1_valid} valid, {result.pass1_blocked} blocked, " - f"{result.pass3_above_threshold} above threshold.", - "success", + f"Pipeline run #{run_id} started for {domain_name} " + f"(passes {passes}, threshold {threshold}).", + "info", ) - return redirect(url_for("results.results_domain", domain_name=domain_name)) + return redirect(url_for("pipeline.pipeline_form")) + + +@bp.route("/runs//status") +def run_status(run_id: int): + """HTMX partial: returns live progress for a single pipeline run.""" + repo = get_repo() + run = repo.get_pipeline_run(run_id) + if not run: + return "

Run not found.

", 404 + return render_template("pipeline/_run_status.html", run=run) + + +@bp.route("/runs//cancel", methods=["POST"]) +def run_cancel(run_id: int): + """Set a running pipeline to cancelled. The pipeline checks this flag.""" + repo = get_repo() + run = repo.get_pipeline_run(run_id) + if run and run["status"] == "running": + repo.update_pipeline_run(run_id, status="cancelled") + flash(f"Run #{run_id} cancellation requested.", "info") + return redirect(url_for("pipeline.pipeline_form")) diff --git a/src/physcom_web/routes/results.py b/src/physcom_web/routes/results.py index 11dee21..e87629d 100644 --- a/src/physcom_web/routes/results.py +++ b/src/physcom_web/routes/results.py @@ -26,7 +26,8 @@ def results_domain(domain_name: str): status_filter = request.args.get("status") results = repo.get_all_results(domain_name, status=status_filter) - statuses = repo.count_combinations_by_status() + # Domain-scoped status counts (only combos that have results in this domain) + statuses = repo.count_combinations_by_status(domain_name=domain_name) return render_template( "results/list.html", @@ -35,6 +36,7 @@ def results_domain(domain_name: str): results=results, status_filter=status_filter, statuses=statuses, + total_results=sum(statuses.values()), ) diff --git a/src/physcom_web/static/style.css b/src/physcom_web/static/style.css index b3d2a85..d83c473 100644 --- a/src/physcom_web/static/style.css +++ b/src/physcom_web/static/style.css @@ -154,3 +154,77 @@ dd { font-size: 0.9rem; } /* ── Dep add form ────────────────────────────────────────── */ .dep-add-form { margin-top: 0.75rem; } + +/* ── Form hints ──────────────────────────────────────────── */ +.form-hint { color: #666; font-size: 0.8rem; margin-bottom: 0.25rem; font-weight: 400; } + +/* ── Vertical checkbox list ──────────────────────────────── */ +.checkbox-col { display: flex; flex-direction: column; gap: 0.5rem; } +.checkbox-col label { display: flex; align-items: baseline; gap: 0.4rem; font-size: 0.9rem; } +.checkbox-col label .form-hint { display: block; margin-left: 1.3rem; } + +/* ── Summary DL (pipeline) ───────────────────────────────── */ +.summary-dl { display: grid; grid-template-columns: auto 1fr; gap: 0.15rem 1rem; } + +/* ── Pipeline run status ────────────────────────────────── */ +.badge-running { background: #dbeafe; color: #1e40af; } +.badge-completed { background: #dcfce7; color: #166534; } +.badge-failed { background: #fee2e2; color: #991b1b; } +.badge-cancelled { background: #fef3c7; color: #92400e; } + +.run-status { padding: 0.25rem 0; } +.run-status-header { display: flex; align-items: center; gap: 0.5rem; margin-bottom: 0.5rem; } +.run-status-label { font-weight: 600; font-size: 0.9rem; } + +.progress-bar-container { + background: #e5e7eb; + border-radius: 4px; + height: 8px; + overflow: hidden; + margin-bottom: 0.35rem; +} +.progress-bar { + background: #2563eb; + height: 100%; + border-radius: 4px; + transition: width 0.3s ease; +} + +.run-status-counters { + display: flex; + gap: 1rem; + font-size: 0.8rem; + color: #555; + margin-bottom: 0.35rem; +} + +.run-status-actions { margin-top: 0.35rem; } + +/* ── Block reason ───────────────────────────────────────── */ +.block-reason-cell { + font-size: 0.8rem; + color: #666; + max-width: 350px; + word-break: break-word; +} + +/* ── Metric position bar ────────────────────────────────── */ +.metric-bar-container { + display: inline-block; + width: 60px; + height: 6px; + background: #e5e7eb; + border-radius: 3px; + overflow: hidden; + vertical-align: middle; +} +.metric-bar { + height: 100%; + background: #2563eb; + border-radius: 3px; +} +.metric-bar-label { + font-size: 0.75rem; + color: #666; + margin-left: 0.3rem; +} diff --git a/src/physcom_web/templates/domains/list.html b/src/physcom_web/templates/domains/list.html index e70edfd..5b965b3 100644 --- a/src/physcom_web/templates/domains/list.html +++ b/src/physcom_web/templates/domains/list.html @@ -7,14 +7,13 @@ {% if not domains %}

No domains found. Seed data via CLI first.

{% else %} -
{% for d in domains %}

{{ d.name }}

{{ d.description }}

- + {% for mb in d.metric_bounds %} @@ -29,6 +28,5 @@
MetricWeightMinMax
MetricWeightNorm MinNorm Max
{% endfor %} -
{% endif %} {% endblock %} diff --git a/src/physcom_web/templates/entities/detail.html b/src/physcom_web/templates/entities/detail.html index cc0a80d..5cca0e0 100644 --- a/src/physcom_web/templates/entities/detail.html +++ b/src/physcom_web/templates/entities/detail.html @@ -3,21 +3,29 @@ {% block content %}
-
-
Dimension
{{ entity.dimension }}
-
Description
{{ entity.description or '—' }}
-
+
+
+
+ + +
+
+ + +
+
+ +
+
+

Dependencies

diff --git a/src/physcom_web/templates/pipeline/_run_status.html b/src/physcom_web/templates/pipeline/_run_status.html new file mode 100644 index 0000000..08fb13f --- /dev/null +++ b/src/physcom_web/templates/pipeline/_run_status.html @@ -0,0 +1,78 @@ +{# HTMX partial: live status for a single pipeline run #} +
+ +
+ {{ run.status }} + Run #{{ run.id }} + {% if run.current_pass %} + Processing pass {{ run.current_pass }} + {% endif %} +
+ + {% if run.total_combos and run.total_combos > 0 %} + {% set done = run.combos_pass1 or 0 %} + {% set pct = (done / run.total_combos * 100) | int %} +
+
+
+
+ {{ done }} / {{ run.total_combos }} combos processed +
+ + + + + + {% if (run.combos_pass1 or 0) > 0 %} + {% set valid = (run.combos_pass1 or 0) - (run.total_combos - (run.combos_pass2 or 0)) if (run.combos_pass2 or 0) > 0 else (run.combos_pass1 or 0) %} + + + + + {% endif %} + {% if (run.combos_pass2 or 0) > 0 %} + + + + + {% endif %} + {% if (run.combos_pass3 or 0) > 0 %} + + + + + {% endif %} + {% if (run.combos_pass4 or 0) > 0 %} + + + + + {% endif %} + +
PassResult
1 — Constraints{{ run.combos_pass1 or 0 }} checked + {%- if (run.combos_pass2 or 0) > 0 and (run.combos_pass1 or 0) > (run.combos_pass2 or 0) %}, + {{ (run.combos_pass1 or 0) - (run.combos_pass2 or 0) }} blocked + {%- endif -%} +
2 — Estimation{{ run.combos_pass2 or 0 }} estimated
3 — Scoring{{ run.combos_pass3 or 0 }} scored
4 — LLM Review{{ run.combos_pass4 or 0 }} reviewed
+ {% endif %} + + {% if run.error_message %} +
{{ run.error_message }}
+ {% endif %} + +
+ {% if run.status == 'running' %} +
+ +
+ {% endif %} + {% if run.status == 'completed' %} + View results + {% endif %} +
+
diff --git a/src/physcom_web/templates/pipeline/run.html b/src/physcom_web/templates/pipeline/run.html index 1b218f7..b84b76f 100644 --- a/src/physcom_web/templates/pipeline/run.html +++ b/src/physcom_web/templates/pipeline/run.html @@ -8,6 +8,7 @@
+

The evaluation context that defines which metrics matter and how they're weighted.

- Pass {{ p }} - {% if p == 1 %}(Constraints) - {% elif p == 2 %}(Estimation) - {% elif p == 3 %}(Scoring) - {% elif p == 4 %}(LLM Review) - {% elif p == 5 %}(Human Review) - {% endif %} + + Pass 1 — Constraint Resolution + Checks requires/provides/excludes compatibility between entities. Blocks impossible combinations. + + + + + - {% endfor %}
+

Minimum composite score (0–1) for a combination to pass scoring. Lower values keep more results; higher values are more selective.

Dimensions +

Which entity dimensions to combine. The pipeline generates the Cartesian product of all entities in the selected dimensions.

{% for d in dimensions %}
+ +{% set active_runs = runs | selectattr('status', 'in', ['pending', 'running']) | list %} +{% if active_runs %} +

Active Runs

+{% for run in active_runs %} +
+ {% include "pipeline/_run_status.html" %} +
+{% endfor %} +{% endif %} + +{% if runs %} +

Run History

+ + + + + + + + + + + + + + + + + {% for run in runs %} + {% set blocked = (run.combos_pass1 or 0) - (run.combos_pass2 or 0) if (run.combos_pass2 or 0) > 0 and (run.combos_pass1 or 0) > (run.combos_pass2 or 0) else 0 %} + + + + + + + + + + + + + {% endfor %} + +
IDDomainStatusTotalP1 CheckedP1 BlockedP2 EstimatedP3 ScoredP4 ReviewedStarted
{{ run.id }}{{ run.domain_name }}{{ run.status }}{{ run.total_combos or '—' }}{{ run.combos_pass1 or '—' }}{% if blocked %}{{ blocked }}{% else %}—{% endif %}{{ run.combos_pass2 or '—' }}{{ run.combos_pass3 or '—' }}{{ run.combos_pass4 or '—' }}{{ run.started_at or run.created_at }}
+{% endif %} + +{% if summaries.values()|select|list %} +

Domain Summaries

+{% for d in domains %} +{% set s = summaries[d.name] %} +{% if s %} +
+

{{ d.name }} {{ d.description }}

+
+
Results
{{ s.total_results }} scored combinations
+
Blocked
{{ s.blocked }} combinations
+
Score range
{{ "%.4f"|format(s.min_score) }} — {{ "%.4f"|format(s.max_score) }}
+
Avg score
{{ "%.4f"|format(s.avg_score) }}
+
Last pass
{{ s.last_pass }}
+
+ +
+{% endif %} +{% endfor %} +{% endif %} {% endblock %} diff --git a/src/physcom_web/templates/results/detail.html b/src/physcom_web/templates/results/detail.html index 4e9a14a..ac39b89 100644 --- a/src/physcom_web/templates/results/detail.html +++ b/src/physcom_web/templates/results/detail.html @@ -55,19 +55,55 @@ {% if scores %}

Per-Metric Scores

+{% set bounds = {} %} +{% for mb in domain.metric_bounds %} + {% set _ = bounds.update({mb.metric_name: mb}) %} +{% endfor %}
- + + + + + + + + {% for s in scores %} + {% set mb = bounds.get(s.metric_name) %} - + {% set unit = s.metric_unit or '' %} + + + - - + {% endfor %} diff --git a/src/physcom_web/templates/results/list.html b/src/physcom_web/templates/results/list.html index 5a0ba68..d7883ff 100644 --- a/src/physcom_web/templates/results/list.html +++ b/src/physcom_web/templates/results/list.html @@ -21,7 +21,7 @@
Filter: All + class="btn btn-sm {{ '' if status_filter else 'btn-primary' }}">All ({{ total_results }}) {% for s, cnt in statuses.items() %} @@ -32,7 +32,11 @@ {% endif %} {% if not results %} -

No results yet. Run the pipeline first.

+ {% if status_filter %} +

No results with status "{{ status_filter }}" in this domain.

+ {% else %} +

No results for this domain yet. Run the pipeline first.

+ {% endif %} {% else %}
MetricRaw ValueNormalizedMethodConfidence
MetricRaw ValueDomain RangePositionNormalizedWeight
{{ s.metric_name }}{{ "%.2f"|format(s.raw_value) if s.raw_value is not none else '—' }}{{ "%.2f"|format(s.raw_value) if s.raw_value is not none else '—' }}{{ ' ' + unit if unit and s.raw_value is not none else '' }} + {%- if mb -%} + {{ "%.2f"|format(mb.norm_min) }} — {{ "%.2f"|format(mb.norm_max) }}{{ ' ' + unit if unit else '' }} + {%- else -%} + — + {%- endif -%} + + {%- if mb and s.raw_value is not none -%} + {%- if s.raw_value <= mb.norm_min -%} + at/below min + {%- elif s.raw_value >= mb.norm_max -%} + at/above max + {%- else -%} + {% set pct = ((s.raw_value - mb.norm_min) / (mb.norm_max - mb.norm_min) * 100) | int %} +
+
+
+ ~{{ pct }}% + {%- endif -%} + {%- else -%} + — + {%- endif -%} +
{{ "%.4f"|format(s.normalized_score) if s.normalized_score is not none else '—' }}{{ s.estimation_method or '—' }}{{ "%.2f"|format(s.confidence) if s.confidence is not none else '—' }}{{ "%.0f%%"|format(mb.weight * 100) if mb else '—' }}
@@ -41,7 +45,7 @@ - + @@ -49,10 +53,18 @@ {% for r in results %} - + - +
Score Entities StatusNoveltyDetails
{{ loop.index }}{{ "%.4f"|format(r.composite_score) }}{{ "%.4f"|format(r.composite_score) if r.composite_score else '—' }} {{ r.combination.entities|map(attribute='name')|join(' + ') }} {{ r.combination.status }}{{ r.novelty_flag or '—' }} + {%- if r.combination.status == 'blocked' and r.combination.block_reason -%} + {{ r.combination.block_reason }} + {%- elif r.novelty_flag -%} + {{ r.novelty_flag }} + {%- else -%} + — + {%- endif -%} + View diff --git a/tests/test_pipeline_async.py b/tests/test_pipeline_async.py new file mode 100644 index 0000000..8d7ff7e --- /dev/null +++ b/tests/test_pipeline_async.py @@ -0,0 +1,305 @@ +"""Tests for async pipeline: resume, cancellation, status guard, run lifecycle.""" + +import json + +from physcom.engine.constraint_resolver import ConstraintResolver +from physcom.engine.scorer import Scorer +from physcom.engine.pipeline import Pipeline, CancelledError + + +def test_pipeline_run_lifecycle(seeded_repo): + """Pipeline run should transition: pending -> running -> completed.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + config = {"passes": [1, 2, 3], "threshold": 0.1, "dimensions": ["platform", "power_source"]} + run_id = repo.create_pipeline_run(domain.id, config) + + run = repo.get_pipeline_run(run_id) + assert run["status"] == "pending" + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + pipeline.run(domain, ["platform", "power_source"], passes=[1, 2, 3], run_id=run_id) + + run = repo.get_pipeline_run(run_id) + assert run["status"] == "completed" + assert run["total_combos"] == 81 + assert run["started_at"] is not None + assert run["completed_at"] is not None + + +def test_pipeline_run_failed(seeded_repo): + """Pipeline run should be marked failed on error.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + config = {"passes": [1], "threshold": 0.1, "dimensions": ["platform", "power_source"]} + run_id = repo.create_pipeline_run(domain.id, config) + + # Manually mark as failed (simulating what the web route does on exception) + repo.update_pipeline_run(run_id, status="failed", error_message="Test error") + + run = repo.get_pipeline_run(run_id) + assert run["status"] == "failed" + assert run["error_message"] == "Test error" + + +def test_resume_skips_completed_combos(seeded_repo): + """Re-running the same passes on the same domain should skip already-completed combos.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + # First run: passes 1-3 + run_id_1 = repo.create_pipeline_run(domain.id, {"passes": [1, 2, 3]}) + result1 = pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], run_id=run_id_1, + ) + assert result1.pass2_estimated > 0 + first_estimated = result1.pass2_estimated + + # Second run: same passes — should skip all combos (already pass_reached >= 3) + run_id_2 = repo.create_pipeline_run(domain.id, {"passes": [1, 2, 3]}) + result2 = pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], run_id=run_id_2, + ) + # pass2_estimated still counted (reloaded from DB) but no new estimation work + # The key thing: the run completes successfully + assert result2.total_generated == result1.total_generated + run2 = repo.get_pipeline_run(run_id_2) + assert run2["status"] == "completed" + + +def test_cancellation_stops_processing(seeded_repo): + """Cancelling a run mid-flight should stop the pipeline gracefully.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + run_id = repo.create_pipeline_run(domain.id, {"passes": [1, 2, 3]}) + + # Pre-cancel the run before it starts processing + repo.update_pipeline_run(run_id, status="running") + repo.update_pipeline_run(run_id, status="cancelled") + + result = pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], run_id=run_id, + ) + + # Should have stopped without processing all combos + run = repo.get_pipeline_run(run_id) + assert run["status"] == "cancelled" + # The pipeline was cancelled before any combo processing could happen + assert result.pass2_estimated == 0 + + +def test_status_guard_no_downgrade_reviewed(seeded_repo): + """update_combination_status should not downgrade 'reviewed' to 'scored'.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + # Run pipeline to get scored combos + result = pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], + ) + + # Find a scored combo and manually mark it as reviewed + scored_combos = repo.list_combinations(status="scored") + assert len(scored_combos) > 0 + + combo = scored_combos[0] + repo.conn.execute( + "UPDATE combinations SET status = 'reviewed' WHERE id = ?", (combo.id,) + ) + repo.conn.commit() + + # Attempt to downgrade to 'scored' + repo.update_combination_status(combo.id, "scored") + + # Should still be 'reviewed' + reloaded = repo.get_combination(combo.id) + assert reloaded.status == "reviewed" + + +def test_human_notes_preserved_on_rerun(seeded_repo): + """Human notes should not be overwritten when re-running the pipeline.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + # First run + pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], + ) + + # Add human notes to a result + results = repo.get_all_results(domain.name) + assert len(results) > 0 + target = results[0] + combo_id = target["combination"].id + domain_id = target["domain_id"] + + repo.save_result( + combo_id, domain_id, + target["composite_score"], + pass_reached=target["pass_reached"], + novelty_flag=target["novelty_flag"], + human_notes="Important human insight", + ) + + # Clear pass_reached so re-run processes this combo again + repo.conn.execute( + """UPDATE combination_results SET pass_reached = 0 + WHERE combination_id = ? AND domain_id = ?""", + (combo_id, domain_id), + ) + repo.conn.commit() + + # Re-run pipeline + pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], + ) + + # Check that human_notes survived + result = repo.get_existing_result(combo_id, domain_id) + assert result["human_notes"] == "Important human insight" + + +def test_list_pipeline_runs(seeded_repo): + """list_pipeline_runs should return runs for a domain or all domains.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + run_id_1 = repo.create_pipeline_run(domain.id, {"passes": [1]}) + run_id_2 = repo.create_pipeline_run(domain.id, {"passes": [1, 2, 3]}) + + all_runs = repo.list_pipeline_runs() + assert len(all_runs) >= 2 + + domain_runs = repo.list_pipeline_runs(domain_id=domain.id) + assert len(domain_runs) >= 2 + assert all(r["domain_id"] == domain.id for r in domain_runs) + + +def test_get_combo_pass_reached(seeded_repo): + """get_combo_pass_reached returns the correct pass level.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], + ) + + # Get a scored combo + scored_combos = repo.list_combinations(status="scored") + assert len(scored_combos) > 0 + combo = scored_combos[0] + + pass_reached = repo.get_combo_pass_reached(combo.id, domain.id) + assert pass_reached == 3 + + # Non-existent combo + assert repo.get_combo_pass_reached(99999, domain.id) is None + + +def test_blocked_combos_have_results(seeded_repo): + """Blocked combinations should still appear in combination_results.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + result = pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], + ) + + assert result.pass1_blocked > 0 + + # All combos (blocked + scored) should have result rows + all_results = repo.get_all_results(domain.name) + total_with_results = len(all_results) + # blocked combos get pass_reached=1 results, non-blocked get pass_reached=3 + assert total_with_results == result.pass1_blocked + result.pass3_scored + + # Blocked combos should have pass_reached=1 and composite_score=0.0 + blocked_results = [r for r in all_results if r["combination"].status == "blocked"] + assert len(blocked_results) == result.pass1_blocked + for br in blocked_results: + assert br["pass_reached"] == 1 + assert br["composite_score"] == 0.0 + + +def test_all_passes_run_and_tracked(seeded_repo): + """With passes [1,2,3], all three should show nonzero counts in run record.""" + repo = seeded_repo + domain = repo.get_domain("urban_commuting") + + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(repo, resolver, scorer) + + run_id = repo.create_pipeline_run(domain.id, {"passes": [1, 2, 3]}) + result = pipeline.run( + domain, ["platform", "power_source"], + score_threshold=0.01, passes=[1, 2, 3], run_id=run_id, + ) + + run = repo.get_pipeline_run(run_id) + assert run["combos_pass1"] > 0, "Pass 1 counter should be nonzero" + assert run["combos_pass2"] > 0, "Pass 2 counter should be nonzero" + assert run["combos_pass3"] > 0, "Pass 3 counter should be nonzero" + + # Pass 2 should equal valid + conditional (blocked don't get estimated) + assert run["combos_pass2"] == result.pass2_estimated + # Pass 3 should equal pass3_scored (all scored combos, not just above threshold) + assert run["combos_pass3"] == result.pass3_scored + + +def test_save_combination_loads_existing_status(seeded_repo): + """save_combination should load the status of an existing combo from DB.""" + repo = seeded_repo + from physcom.models.combination import Combination + from physcom.models.entity import Entity + + entities = repo.list_entities(dimension="platform")[:1] + repo.list_entities(dimension="power_source")[:1] + combo = Combination(entities=entities) + saved = repo.save_combination(combo) + assert saved.status == "pending" + + # Mark it blocked in DB + repo.update_combination_status(saved.id, "blocked", "test reason") + + # Re-saving should pick up the blocked status + combo2 = Combination(entities=entities) + reloaded = repo.save_combination(combo2) + assert reloaded.id == saved.id + assert reloaded.status == "blocked" + assert reloaded.block_reason == "test reason"