Add async pipeline with progress monitoring, resumability, and result transparency

Pipeline engine rewritten with combo-first loop: each combination is processed
through all requested passes before moving to the next, with incremental DB
saves after every step (crash-safe). Blocked combos now get result rows so they
appear in the results page with constraint violation reasons.

New pipeline_runs table tracks run lifecycle (pending/running/completed/failed/
cancelled). Web route launches pipeline in a background thread with its own DB
connection. HTMX polling partial shows live progress with per-pass breakdown.

Also: status guard prevents reviewed->scored downgrade, save_combination loads
existing status on dedup for correct resume, per-metric scores show domain
bounds + units + position bars, ensure_metric backfills units on existing rows.

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Simonson, Andrew
2026-02-18 15:30:52 -06:00
parent 8118a62242
commit d2028a642b
17 changed files with 1263 additions and 217 deletions

View File

@@ -3,7 +3,9 @@
from __future__ import annotations
import hashlib
import json
import sqlite3
from datetime import datetime, timezone
from typing import Sequence
from physcom.models.entity import Dependency, Entity
@@ -170,6 +172,11 @@ class Repository:
"INSERT OR IGNORE INTO metrics (name, unit, description) VALUES (?, ?, ?)",
(name, unit, description),
)
if unit:
self.conn.execute(
"UPDATE metrics SET unit = ? WHERE name = ? AND (unit IS NULL OR unit = '')",
(unit, name),
)
row = self.conn.execute("SELECT id FROM metrics WHERE name = ?", (name,)).fetchone()
self.conn.commit()
return row["id"]
@@ -181,7 +188,7 @@ class Repository:
)
domain.id = cur.lastrowid
for mb in domain.metric_bounds:
metric_id = self.ensure_metric(mb.metric_name)
metric_id = self.ensure_metric(mb.metric_name, unit=mb.unit)
mb.metric_id = metric_id
self.conn.execute(
"""INSERT INTO domain_metric_weights
@@ -233,10 +240,13 @@ class Repository:
combination.hash = self.compute_hash(entity_ids)
existing = self.conn.execute(
"SELECT id FROM combinations WHERE hash = ?", (combination.hash,)
"SELECT id, status, block_reason FROM combinations WHERE hash = ?",
(combination.hash,),
).fetchone()
if existing:
combination.id = existing["id"]
combination.status = existing["status"]
combination.block_reason = existing["block_reason"]
return combination
cur = self.conn.execute(
@@ -255,6 +265,13 @@ class Repository:
def update_combination_status(
self, combo_id: int, status: str, block_reason: str | None = None
) -> None:
# Don't downgrade 'reviewed' to 'scored' — preserve human review state
if status == "scored":
row = self.conn.execute(
"SELECT status FROM combinations WHERE id = ?", (combo_id,)
).fetchone()
if row and row["status"] == "reviewed":
return
self.conn.execute(
"UPDATE combinations SET status = ?, block_reason = ? WHERE id = ?",
(status, block_reason, combo_id),
@@ -327,7 +344,7 @@ class Repository:
def get_combination_scores(self, combo_id: int, domain_id: int) -> list[dict]:
"""Return per-metric scores for a combination in a domain."""
rows = self.conn.execute(
"""SELECT cs.*, m.name as metric_name
"""SELECT cs.*, m.name as metric_name, m.unit as metric_unit
FROM combination_scores cs
JOIN metrics m ON cs.metric_id = m.id
WHERE cs.combination_id = ? AND cs.domain_id = ?""",
@@ -335,12 +352,52 @@ class Repository:
).fetchall()
return [dict(r) for r in rows]
def count_combinations_by_status(self) -> dict[str, int]:
rows = self.conn.execute(
"SELECT status, COUNT(*) as cnt FROM combinations GROUP BY status"
).fetchall()
def count_combinations_by_status(self, domain_name: str | None = None) -> dict[str, int]:
"""Count combos by status. If domain_name given, only combos with results in that domain."""
if domain_name:
rows = self.conn.execute(
"""SELECT c.status, COUNT(*) as cnt
FROM combination_results cr
JOIN combinations c ON cr.combination_id = c.id
JOIN domains d ON cr.domain_id = d.id
WHERE d.name = ?
GROUP BY c.status""",
(domain_name,),
).fetchall()
else:
rows = self.conn.execute(
"SELECT status, COUNT(*) as cnt FROM combinations GROUP BY status"
).fetchall()
return {r["status"]: r["cnt"] for r in rows}
def get_pipeline_summary(self, domain_name: str) -> dict | None:
"""Return a summary of results for a domain, or None if no results."""
row = self.conn.execute(
"""SELECT COUNT(*) as total,
AVG(cr.composite_score) as avg_score,
MAX(cr.composite_score) as max_score,
MIN(cr.composite_score) as min_score,
MAX(cr.pass_reached) as last_pass
FROM combination_results cr
JOIN domains d ON cr.domain_id = d.id
WHERE d.name = ?""",
(domain_name,),
).fetchone()
if not row or row["total"] == 0:
return None
# Also count blocked combos (they have no results but exist)
blocked = self.conn.execute(
"SELECT COUNT(*) as cnt FROM combinations WHERE status = 'blocked'"
).fetchone()
return {
"total_results": row["total"],
"avg_score": row["avg_score"],
"max_score": row["max_score"],
"min_score": row["min_score"],
"last_pass": row["last_pass"],
"blocked": blocked["cnt"] if blocked else 0,
}
def get_result(self, combo_id: int, domain_id: int) -> dict | None:
"""Return a single combination_result row."""
row = self.conn.execute(
@@ -412,3 +469,88 @@ class Repository:
"pass_reached": r["pass_reached"],
})
return results
# ── Pipeline Runs ────────────────────────────────────────
def create_pipeline_run(self, domain_id: int, config: dict) -> int:
"""Create a new pipeline_run record. Returns the run id."""
cur = self.conn.execute(
"""INSERT INTO pipeline_runs (domain_id, status, config, created_at)
VALUES (?, 'pending', ?, ?)""",
(domain_id, json.dumps(config), datetime.now(timezone.utc).isoformat()),
)
self.conn.commit()
return cur.lastrowid
def update_pipeline_run(self, run_id: int, **fields) -> None:
"""Update arbitrary fields on a pipeline_run."""
if not fields:
return
set_clause = ", ".join(f"{k} = ?" for k in fields)
values = list(fields.values())
values.append(run_id)
self.conn.execute(
f"UPDATE pipeline_runs SET {set_clause} WHERE id = ?", values
)
self.conn.commit()
def get_pipeline_run(self, run_id: int) -> dict | None:
row = self.conn.execute(
"SELECT * FROM pipeline_runs WHERE id = ?", (run_id,)
).fetchone()
return dict(row) if row else None
def list_pipeline_runs(self, domain_id: int | None = None) -> list[dict]:
if domain_id is not None:
rows = self.conn.execute(
"""SELECT pr.*, d.name as domain_name
FROM pipeline_runs pr
JOIN domains d ON pr.domain_id = d.id
WHERE pr.domain_id = ?
ORDER BY pr.created_at DESC""",
(domain_id,),
).fetchall()
else:
rows = self.conn.execute(
"""SELECT pr.*, d.name as domain_name
FROM pipeline_runs pr
JOIN domains d ON pr.domain_id = d.id
ORDER BY pr.created_at DESC"""
).fetchall()
return [dict(r) for r in rows]
def get_combo_pass_reached(self, combo_id: int, domain_id: int) -> int | None:
"""Return the pass_reached for a combo in a domain, or None if no result."""
row = self.conn.execute(
"""SELECT pass_reached FROM combination_results
WHERE combination_id = ? AND domain_id = ?""",
(combo_id, domain_id),
).fetchone()
return row["pass_reached"] if row else None
def save_raw_estimates(
self, combo_id: int, domain_id: int, estimates: list[dict]
) -> None:
"""Save raw metric estimates (pass 2) with normalized_score=NULL.
Each dict: metric_id, raw_value, estimation_method, confidence.
"""
for e in estimates:
self.conn.execute(
"""INSERT OR REPLACE INTO combination_scores
(combination_id, domain_id, metric_id, raw_value, normalized_score,
estimation_method, confidence)
VALUES (?, ?, ?, ?, NULL, ?, ?)""",
(combo_id, domain_id, e["metric_id"], e["raw_value"],
e["estimation_method"], e["confidence"]),
)
self.conn.commit()
def get_existing_result(self, combo_id: int, domain_id: int) -> dict | None:
"""Return the full combination_results row for resume logic."""
row = self.conn.execute(
"""SELECT * FROM combination_results
WHERE combination_id = ? AND domain_id = ?""",
(combo_id, domain_id),
).fetchone()
return dict(row) if row else None

View File

@@ -91,11 +91,29 @@ CREATE TABLE IF NOT EXISTS combination_results (
UNIQUE(combination_id, domain_id)
);
CREATE TABLE IF NOT EXISTS pipeline_runs (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain_id INTEGER NOT NULL REFERENCES domains(id),
status TEXT NOT NULL DEFAULT 'pending',
config TEXT,
total_combos INTEGER DEFAULT 0,
combos_pass1 INTEGER DEFAULT 0,
combos_pass2 INTEGER DEFAULT 0,
combos_pass3 INTEGER DEFAULT 0,
combos_pass4 INTEGER DEFAULT 0,
current_pass INTEGER,
error_message TEXT,
started_at TIMESTAMP,
completed_at TIMESTAMP,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE INDEX IF NOT EXISTS idx_deps_entity ON dependencies(entity_id);
CREATE INDEX IF NOT EXISTS idx_deps_category_key ON dependencies(category, key);
CREATE INDEX IF NOT EXISTS idx_combo_status ON combinations(status);
CREATE INDEX IF NOT EXISTS idx_scores_combo_domain ON combination_scores(combination_id, domain_id);
CREATE INDEX IF NOT EXISTS idx_results_domain_score ON combination_results(domain_id, composite_score DESC);
CREATE INDEX IF NOT EXISTS idx_pipeline_runs_domain ON pipeline_runs(domain_id);
"""