Add domain CRUD, energy density constraint, LLM status, reset results, score display fixes

Domain management:
- Add domain list/detail/form templates and full CRUD routes (domains.py)
- Add metric bound add/edit/delete via HTMX partials (_metrics_table.html)

Energy density constraint (Rule 6 in ConstraintResolver):
- Hard-block combos where power source provides <25% of platform's required Wh/kg
- Warn (conditional) when under-density but within 4x
- Solar Sail exempt (no stored energy); Airplane requires 400 Wh/kg, Spaceship 2000 Wh/kg
- Add energy_density_wh_kg provides to all 8 stored-energy power sources in seed data
- 3 new constraint resolver tests

LLM-complete status:
- Pipeline Pass 4 now sets combo status to llm_reviewed after successful LLM review
- update_combination_status guards against downgrading: scored won't overwrite
  llm_reviewed or reviewed; llm_reviewed won't overwrite reviewed
- Add badge-llm_reviewed CSS style (light blue)

Reset results:
- Repository.reset_domain_results() deletes combination_results, combination_scores,
  and pipeline_runs for a domain; pipeline re-evaluates on next run
- POST /results/<domain>/reset route with flash confirmation
- "Reset results" danger button with JS confirm dialog in results list

Fix composite score 0 displaying as --- (Jinja2 falsy 0.0 bug):
- Change `if r.composite_score` to `if r.composite_score is not none`

Co-Authored-By: Claude Sonnet 4.6 <noreply@anthropic.com>
This commit is contained in:
2026-02-19 11:13:00 -06:00
parent ee885b2390
commit 8dfe3607b1
20 changed files with 675 additions and 67 deletions

1
.gitignore vendored
View File

@@ -177,3 +177,4 @@ cython_debug/
# Project data # Project data
data/ data/
.aider*

View File

@@ -125,8 +125,20 @@ class Repository:
self.conn.commit() self.conn.commit()
def delete_entity(self, entity_id: int) -> None: def delete_entity(self, entity_id: int) -> None:
combo_ids = [
r["combination_id"]
for r in self.conn.execute(
"SELECT combination_id FROM combination_entities WHERE entity_id = ?",
(entity_id,),
).fetchall()
]
if combo_ids:
ph = ",".join("?" * len(combo_ids))
self.conn.execute(f"DELETE FROM combination_results WHERE combination_id IN ({ph})", combo_ids)
self.conn.execute(f"DELETE FROM combination_scores WHERE combination_id IN ({ph})", combo_ids)
self.conn.execute(f"DELETE FROM combination_entities WHERE combination_id IN ({ph})", combo_ids)
self.conn.execute(f"DELETE FROM combinations WHERE id IN ({ph})", combo_ids)
self.conn.execute("DELETE FROM dependencies WHERE entity_id = ?", (entity_id,)) self.conn.execute("DELETE FROM dependencies WHERE entity_id = ?", (entity_id,))
self.conn.execute("DELETE FROM combination_entities WHERE entity_id = ?", (entity_id,))
self.conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,)) self.conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,))
self.conn.commit() self.conn.commit()
@@ -228,6 +240,99 @@ class Repository:
rows = self.conn.execute("SELECT name FROM domains ORDER BY name").fetchall() rows = self.conn.execute("SELECT name FROM domains ORDER BY name").fetchall()
return [self.get_domain(r["name"]) for r in rows] return [self.get_domain(r["name"]) for r in rows]
def get_domain_by_id(self, domain_id: int) -> Domain | None:
row = self.conn.execute("SELECT * FROM domains WHERE id = ?", (domain_id,)).fetchone()
if not row:
return None
weights = self.conn.execute(
"""SELECT m.name, m.unit, dmw.weight, dmw.norm_min, dmw.norm_max, dmw.metric_id
FROM domain_metric_weights dmw
JOIN metrics m ON dmw.metric_id = m.id
WHERE dmw.domain_id = ?""",
(row["id"],),
).fetchall()
return Domain(
id=row["id"],
name=row["name"],
description=row["description"] or "",
metric_bounds=[
MetricBound(
metric_name=w["name"], weight=w["weight"],
norm_min=w["norm_min"], norm_max=w["norm_max"],
metric_id=w["metric_id"], unit=w["unit"] or "",
)
for w in weights
],
)
def update_domain(self, domain_id: int, name: str, description: str) -> None:
self.conn.execute(
"UPDATE domains SET name = ?, description = ? WHERE id = ?",
(name, description, domain_id),
)
self.conn.commit()
def add_metric_bound(self, domain_id: int, mb: MetricBound) -> MetricBound:
metric_id = self.ensure_metric(mb.metric_name, mb.unit)
mb.metric_id = metric_id
self.conn.execute(
"""INSERT OR REPLACE INTO domain_metric_weights
(domain_id, metric_id, weight, norm_min, norm_max)
VALUES (?, ?, ?, ?, ?)""",
(domain_id, metric_id, mb.weight, mb.norm_min, mb.norm_max),
)
self.conn.commit()
return mb
def update_metric_bound(
self, domain_id: int, metric_id: int, weight: float, norm_min: float, norm_max: float, unit: str
) -> None:
self.conn.execute(
"""UPDATE domain_metric_weights
SET weight = ?, norm_min = ?, norm_max = ?
WHERE domain_id = ? AND metric_id = ?""",
(weight, norm_min, norm_max, domain_id, metric_id),
)
if unit:
self.conn.execute(
"UPDATE metrics SET unit = ? WHERE id = ?",
(unit, metric_id),
)
self.conn.commit()
def delete_metric_bound(self, domain_id: int, metric_id: int) -> None:
self.conn.execute(
"DELETE FROM domain_metric_weights WHERE domain_id = ? AND metric_id = ?",
(domain_id, metric_id),
)
self.conn.commit()
def delete_domain(self, domain_id: int) -> None:
self.conn.execute("DELETE FROM pipeline_runs WHERE domain_id = ?", (domain_id,))
self.conn.execute("DELETE FROM combination_results WHERE domain_id = ?", (domain_id,))
self.conn.execute("DELETE FROM combination_scores WHERE domain_id = ?", (domain_id,))
self.conn.execute("DELETE FROM domain_metric_weights WHERE domain_id = ?", (domain_id,))
self.conn.execute("DELETE FROM domains WHERE id = ?", (domain_id,))
self.conn.commit()
def reset_domain_results(self, domain_name: str) -> int:
"""Delete all pipeline results for a domain so it can be re-run from scratch.
Returns the number of result rows deleted.
"""
domain = self.get_domain(domain_name)
if not domain:
return 0
count = self.conn.execute(
"SELECT COUNT(*) FROM combination_results WHERE domain_id = ?",
(domain.id,),
).fetchone()[0]
self.conn.execute("DELETE FROM combination_scores WHERE domain_id = ?", (domain.id,))
self.conn.execute("DELETE FROM combination_results WHERE domain_id = ?", (domain.id,))
self.conn.execute("DELETE FROM pipeline_runs WHERE domain_id = ?", (domain.id,))
self.conn.commit()
return count
# ── Combinations ──────────────────────────────────────────── # ── Combinations ────────────────────────────────────────────
@staticmethod @staticmethod
@@ -265,12 +370,16 @@ class Repository:
def update_combination_status( def update_combination_status(
self, combo_id: int, status: str, block_reason: str | None = None self, combo_id: int, status: str, block_reason: str | None = None
) -> None: ) -> None:
# Don't downgrade 'reviewed' to 'scored' — preserve human review state # Don't downgrade from higher pass states — preserves human/LLM review data
if status == "scored": if status in ("scored", "llm_reviewed"):
row = self.conn.execute( row = self.conn.execute(
"SELECT status FROM combinations WHERE id = ?", (combo_id,) "SELECT status FROM combinations WHERE id = ?", (combo_id,)
).fetchone() ).fetchone()
if row and row["status"] == "reviewed": if row:
cur = row["status"]
if status == "scored" and cur in ("llm_reviewed", "reviewed"):
return
if status == "llm_reviewed" and cur == "reviewed":
return return
self.conn.execute( self.conn.execute(
"UPDATE combinations SET status = ?, block_reason = ? WHERE id = ?", "UPDATE combinations SET status = ?, block_reason = ? WHERE id = ?",
@@ -292,6 +401,60 @@ class Repository:
block_reason=row["block_reason"], entities=entities, block_reason=row["block_reason"], entities=entities,
) )
def _bulk_load_combinations(self, combo_ids: list[int]) -> dict[int, Combination]:
"""Load multiple Combinations in O(4) queries instead of O(N*M)."""
if not combo_ids:
return {}
ph = ",".join("?" * len(combo_ids))
combo_rows = self.conn.execute(
f"SELECT * FROM combinations WHERE id IN ({ph})", combo_ids
).fetchall()
combos: dict[int, Combination] = {
r["id"]: Combination(
id=r["id"], hash=r["hash"], status=r["status"],
block_reason=r["block_reason"], entities=[],
)
for r in combo_rows
}
ce_rows = self.conn.execute(
f"SELECT combination_id, entity_id FROM combination_entities WHERE combination_id IN ({ph})",
combo_ids,
).fetchall()
combo_to_eids: dict[int, list[int]] = {}
for r in ce_rows:
combo_to_eids.setdefault(r["combination_id"], []).append(r["entity_id"])
entity_ids = list({r["entity_id"] for r in ce_rows})
if entity_ids:
eph = ",".join("?" * len(entity_ids))
entity_rows = self.conn.execute(
f"""SELECT e.id, e.name, e.description, d.name as dimension, e.dimension_id
FROM entities e JOIN dimensions d ON e.dimension_id = d.id
WHERE e.id IN ({eph})""",
entity_ids,
).fetchall()
dep_rows = self.conn.execute(
f"SELECT * FROM dependencies WHERE entity_id IN ({eph})", entity_ids
).fetchall()
deps_by_entity: dict[int, list[Dependency]] = {}
for r in dep_rows:
deps_by_entity.setdefault(r["entity_id"], []).append(Dependency(
id=r["id"], category=r["category"], key=r["key"],
value=r["value"], unit=r["unit"], constraint_type=r["constraint_type"],
))
entities_by_id: dict[int, Entity] = {
r["id"]: Entity(
id=r["id"], name=r["name"], description=r["description"] or "",
dimension=r["dimension"], dimension_id=r["dimension_id"],
dependencies=deps_by_entity.get(r["id"], []),
)
for r in entity_rows
}
for cid, eids in combo_to_eids.items():
if cid in combos:
combos[cid].entities = [entities_by_id[eid] for eid in eids if eid in entities_by_id]
return combos
def list_combinations(self, status: str | None = None) -> list[Combination]: def list_combinations(self, status: str | None = None) -> list[Combination]:
if status: if status:
rows = self.conn.execute( rows = self.conn.execute(
@@ -299,7 +462,9 @@ class Repository:
).fetchall() ).fetchall()
else: else:
rows = self.conn.execute("SELECT id FROM combinations ORDER BY id").fetchall() rows = self.conn.execute("SELECT id FROM combinations ORDER BY id").fetchall()
return [self.get_combination(r["id"]) for r in rows] ids = [r["id"] for r in rows]
combos = self._bulk_load_combinations(ids)
return [combos[i] for i in ids if i in combos]
# ── Scores & Results ──────────────────────────────────────── # ── Scores & Results ────────────────────────────────────────
@@ -385,9 +550,13 @@ class Repository:
).fetchone() ).fetchone()
if not row or row["total"] == 0: if not row or row["total"] == 0:
return None return None
# Also count blocked combos (they have no results but exist)
blocked = self.conn.execute( blocked = self.conn.execute(
"SELECT COUNT(*) as cnt FROM combinations WHERE status = 'blocked'" """SELECT COUNT(*) as cnt
FROM combinations c
JOIN combination_results cr ON cr.combination_id = c.id
JOIN domains d ON cr.domain_id = d.id
WHERE c.status = 'blocked' AND d.name = ?""",
(domain_name,),
).fetchone() ).fetchone()
return { return {
"total_results": row["total"], "total_results": row["total"],
@@ -422,19 +591,20 @@ class Repository:
params.append(status) params.append(status)
query += " ORDER BY cr.composite_score DESC" query += " ORDER BY cr.composite_score DESC"
rows = self.conn.execute(query, params).fetchall() rows = self.conn.execute(query, params).fetchall()
results = [] combo_ids = [r["combination_id"] for r in rows]
for r in rows: combos = self._bulk_load_combinations(combo_ids)
combo = self.get_combination(r["combination_id"]) return [
results.append({ {
"combination": combo, "combination": combos.get(r["combination_id"]),
"composite_score": r["composite_score"], "composite_score": r["composite_score"],
"novelty_flag": r["novelty_flag"], "novelty_flag": r["novelty_flag"],
"llm_review": r["llm_review"], "llm_review": r["llm_review"],
"human_notes": r["human_notes"], "human_notes": r["human_notes"],
"pass_reached": r["pass_reached"], "pass_reached": r["pass_reached"],
"domain_id": r["domain_id"], "domain_id": r["domain_id"],
}) }
return results for r in rows
]
def get_top_results(self, domain_name: str, limit: int = 10) -> list[dict]: def get_top_results(self, domain_name: str, limit: int = 10) -> list[dict]:
"""Return top-N results for a domain, ordered by composite_score DESC.""" """Return top-N results for a domain, ordered by composite_score DESC."""
@@ -448,18 +618,30 @@ class Repository:
LIMIT ?""", LIMIT ?""",
(domain_name, limit), (domain_name, limit),
).fetchall() ).fetchall()
results = [] combo_ids = [r["combination_id"] for r in rows]
for r in rows: combos = self._bulk_load_combinations(combo_ids)
combo = self.get_combination(r["combination_id"]) return [
results.append({ {
"combination": combo, "combination": combos.get(r["combination_id"]),
"composite_score": r["composite_score"], "composite_score": r["composite_score"],
"novelty_flag": r["novelty_flag"], "novelty_flag": r["novelty_flag"],
"llm_review": r["llm_review"], "llm_review": r["llm_review"],
"human_notes": r["human_notes"], "human_notes": r["human_notes"],
"pass_reached": r["pass_reached"], "pass_reached": r["pass_reached"],
}) }
return results for r in rows
]
def get_results_for_combination(self, combo_id: int) -> list[dict]:
"""Return all domain results for a combination."""
rows = self.conn.execute(
"""SELECT cr.*, d.name as domain_name
FROM combination_results cr
JOIN domains d ON cr.domain_id = d.id
WHERE cr.combination_id = ?""",
(combo_id,),
).fetchall()
return [dict(r) for r in rows]
# ── Pipeline Runs ──────────────────────────────────────── # ── Pipeline Runs ────────────────────────────────────────
@@ -473,10 +655,19 @@ class Repository:
self.conn.commit() self.conn.commit()
return cur.lastrowid return cur.lastrowid
_PIPELINE_RUN_UPDATABLE = frozenset({
"status", "total_combos", "combos_pass1", "combos_pass2",
"combos_pass3", "combos_pass4", "current_pass",
"error_message", "started_at", "completed_at",
})
def update_pipeline_run(self, run_id: int, **fields) -> None: def update_pipeline_run(self, run_id: int, **fields) -> None:
"""Update arbitrary fields on a pipeline_run.""" """Update fields on a pipeline_run. Only allowlisted column names are accepted."""
if not fields: if not fields:
return return
invalid = set(fields) - self._PIPELINE_RUN_UPDATABLE
if invalid:
raise ValueError(f"Invalid pipeline_run fields: {invalid}")
set_clause = ", ".join(f"{k} = ?" for k in fields) set_clause = ", ".join(f"{k} = ?" for k in fields)
values = list(fields.values()) values = list(fields.values())
values.append(run_id) values.append(run_id)

View File

@@ -42,6 +42,7 @@ class ConstraintResolver:
self._check_mutual_exclusion(all_deps, result) self._check_mutual_exclusion(all_deps, result)
self._check_range_incompatibility(all_deps, result) self._check_range_incompatibility(all_deps, result)
self._check_force_scale(combination, result) self._check_force_scale(combination, result)
self._check_energy_density(combination, result)
self._check_unmet_requirements(all_deps, result) self._check_unmet_requirements(all_deps, result)
if result.violations: if result.violations:
@@ -155,6 +156,40 @@ class ConstraintResolver:
f"(under-powered)" f"(under-powered)"
) )
def _check_energy_density(
self, combination: Combination, result: ConstraintResult
) -> None:
"""Rule 6: If power source energy density << platform minimum → warn/block.
Uses a 25% threshold: below 25% of required → hard block (> 4x deficit).
"""
density_provided: list[tuple[str, float]] = []
density_required: list[tuple[str, float]] = []
for entity in combination.entities:
for dep in entity.dependencies:
if dep.key == "energy_density_wh_kg" and dep.constraint_type == "provides":
density_provided.append((entity.name, float(dep.value)))
elif dep.key == "energy_density_wh_kg" and dep.constraint_type == "range_min":
density_required.append((entity.name, float(dep.value)))
for req_name, req_density in density_required:
if not density_provided:
continue # No stored energy source in this combo — skip check
for prov_name, prov_density in density_provided:
if prov_density < req_density * 0.25:
result.violations.append(
f"{prov_name} provides {prov_density:.0f} Wh/kg but "
f"{req_name} requires {req_density:.0f} Wh/kg "
f"(energy density deficit > 4x)"
)
elif prov_density < req_density:
result.warnings.append(
f"{prov_name} provides {prov_density:.0f} Wh/kg but "
f"{req_name} requires {req_density:.0f} Wh/kg "
f"(under-density)"
)
def _check_unmet_requirements( def _check_unmet_requirements(
self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult
) -> None: ) -> None:

View File

@@ -2,6 +2,7 @@
from __future__ import annotations from __future__ import annotations
import time
from dataclasses import dataclass, field from dataclasses import dataclass, field
from datetime import datetime, timezone from datetime import datetime, timezone
@@ -293,9 +294,20 @@ class Pipeline:
for s in db_scores for s in db_scores
if s["normalized_score"] is not None if s["normalized_score"] is not None
} }
review: str | None = None
try:
review = self.llm.review_plausibility( review = self.llm.review_plausibility(
description, score_dict description, score_dict
) )
except LLMRateLimitError as exc:
self._wait_for_rate_limit(run_id, exc.retry_after)
try:
review = self.llm.review_plausibility(
description, score_dict
)
except LLMRateLimitError:
pass # still limited; skip, retry next run
if review is not None:
self.repo.save_result( self.repo.save_result(
combo.id, combo.id,
domain.id, domain.id,
@@ -305,6 +317,9 @@ class Pipeline:
llm_review=review, llm_review=review,
human_notes=cur_result.get("human_notes"), human_notes=cur_result.get("human_notes"),
) )
self.repo.update_combination_status(
combo.id, "llm_reviewed"
)
result.pass4_reviewed += 1 result.pass4_reviewed += 1
self._update_run_counters( self._update_run_counters(
run_id, result, current_pass=4 run_id, result, current_pass=4
@@ -319,17 +334,6 @@ class Pipeline:
) )
result.top_results = self.repo.get_top_results(domain.name, limit=20) result.top_results = self.repo.get_top_results(domain.name, limit=20)
return result return result
except LLMRateLimitError:
# Rate limit hit — save progress and let the user re-run to continue.
# Already-reviewed combos are persisted; resumability skips them next time.
if run_id is not None:
self.repo.update_pipeline_run(
run_id,
status="completed",
completed_at=datetime.now(timezone.utc).isoformat(),
)
result.top_results = self.repo.get_top_results(domain.name, limit=20)
return result
# Mark run as completed # Mark run as completed
if run_id is not None: if run_id is not None:
@@ -342,6 +346,18 @@ class Pipeline:
result.top_results = self.repo.get_top_results(domain.name, limit=20) result.top_results = self.repo.get_top_results(domain.name, limit=20)
return result return result
def _wait_for_rate_limit(self, run_id: int | None, retry_after: int) -> None:
"""Mark run rate_limited, sleep with cancel checks, then resume."""
if run_id is not None:
self.repo.update_pipeline_run(run_id, status="rate_limited")
waited = 0
while waited < retry_after:
time.sleep(5)
waited += 5
self._check_cancelled(run_id)
if run_id is not None:
self.repo.update_pipeline_run(run_id, status="running")
def _stub_estimate( def _stub_estimate(
self, combo: Combination, metric_names: list[str] self, combo: Combination, metric_names: list[str]
) -> dict[str, float]: ) -> dict[str, float]:

View File

@@ -8,10 +8,14 @@ from abc import ABC, abstractmethod
class LLMRateLimitError(Exception): class LLMRateLimitError(Exception):
"""Raised by a provider when the API rate limit is exceeded. """Raised by a provider when the API rate limit is exceeded.
The pipeline catches this to stop gracefully and let the user re-run The pipeline catches this, waits retry_after seconds (checking for
to continue from where it left off. cancellation), then retries. retry_after defaults to 65s if unknown.
""" """
def __init__(self, message: str, retry_after: int = 65) -> None:
super().__init__(message)
self.retry_after = retry_after
class LLMProvider(ABC): class LLMProvider(ABC):
"""Abstract LLM interface for physics estimation and plausibility review.""" """Abstract LLM interface for physics estimation and plausibility review."""

View File

@@ -4,6 +4,7 @@ from __future__ import annotations
import json import json
import re import re
import math
from physcom.llm.base import LLMProvider, LLMRateLimitError from physcom.llm.base import LLMProvider, LLMRateLimitError
from physcom.llm.prompts import PHYSICS_ESTIMATION_PROMPT, PLAUSIBILITY_REVIEW_PROMPT from physcom.llm.prompts import PHYSICS_ESTIMATION_PROMPT, PLAUSIBILITY_REVIEW_PROMPT
@@ -35,7 +36,7 @@ class GeminiLLMProvider(LLMProvider):
) )
except Exception as exc: except Exception as exc:
if "429" in str(exc) or "RESOURCE_EXHAUSTED" in str(exc): if "429" in str(exc) or "RESOURCE_EXHAUSTED" in str(exc):
raise LLMRateLimitError(str(exc)) from exc raise LLMRateLimitError(str(exc), self._parse_retry_after(exc)) from exc
raise raise
return self._parse_json(response.text, metrics) return self._parse_json(response.text, metrics)
@@ -53,10 +54,15 @@ class GeminiLLMProvider(LLMProvider):
) )
except Exception as exc: except Exception as exc:
if "429" in str(exc) or "RESOURCE_EXHAUSTED" in str(exc): if "429" in str(exc) or "RESOURCE_EXHAUSTED" in str(exc):
raise LLMRateLimitError(str(exc)) from exc raise LLMRateLimitError(str(exc), self._parse_retry_after(exc)) from exc
raise raise
return response.text.strip() return response.text.strip()
def _parse_retry_after(self, exc: Exception) -> int:
"""Extract retry delay from the error message, with a safe default."""
m = re.search(r"retry in (\d+(?:\.\d+)?)", str(exc))
return math.ceil(float(m.group(1))) + 5 if m else 65
def _parse_json(self, text: str, metrics: list[str]) -> dict[str, float]: def _parse_json(self, text: str, metrics: list[str]) -> dict[str, float]:
"""Strip markdown fences and parse JSON; fall back to 0.5 per metric on error.""" """Strip markdown fences and parse JSON; fall back to 0.5 per metric on error."""
text = re.sub(r"```(?:json)?\s*", "", text).strip().rstrip("`").strip() text = re.sub(r"```(?:json)?\s*", "", text).strip().rstrip("`").strip()

View File

@@ -34,6 +34,7 @@ PLATFORMS: list[Entity] = [
Dependency("force", "force_required_watts", "100000", "watts", "range_min"), Dependency("force", "force_required_watts", "100000", "watts", "range_min"),
Dependency("infrastructure", "runway", "true", None, "requires"), Dependency("infrastructure", "runway", "true", None, "requires"),
Dependency("environment", "medium", "air", None, "requires"), Dependency("environment", "medium", "air", None, "requires"),
Dependency("physical", "energy_density_wh_kg", "400", "Wh/kg", "range_min"),
], ],
), ),
Entity( Entity(
@@ -111,6 +112,7 @@ PLATFORMS: list[Entity] = [
Dependency("force", "force_required_watts", "1000000", "watts", "range_min"), Dependency("force", "force_required_watts", "1000000", "watts", "range_min"),
Dependency("infrastructure", "launch_facility", "true", None, "requires"), Dependency("infrastructure", "launch_facility", "true", None, "requires"),
Dependency("environment", "medium", "space", None, "requires"), Dependency("environment", "medium", "space", None, "requires"),
Dependency("physical", "energy_density_wh_kg", "2000", "Wh/kg", "range_min"),
], ],
), ),
Entity( Entity(
@@ -139,6 +141,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("environment", "atmosphere", "standard", None, "requires"), Dependency("environment", "atmosphere", "standard", None, "requires"),
Dependency("physical", "mass_kg", "50", "kg", "range_min"), Dependency("physical", "mass_kg", "50", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_continuous", None, "provides"), Dependency("force", "thrust_profile", "high_continuous", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "1500", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -150,6 +153,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("infrastructure", "fuel_infrastructure", "charging_station", None, "requires"), Dependency("infrastructure", "fuel_infrastructure", "charging_station", None, "requires"),
Dependency("physical", "mass_kg", "10", "kg", "range_min"), Dependency("physical", "mass_kg", "10", "kg", "range_min"),
Dependency("force", "thrust_profile", "moderate_continuous", None, "provides"), Dependency("force", "thrust_profile", "moderate_continuous", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "200", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -161,6 +165,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("infrastructure", "fuel_infrastructure", "hydrogen_station", None, "requires"), Dependency("infrastructure", "fuel_infrastructure", "hydrogen_station", None, "requires"),
Dependency("physical", "mass_kg", "30", "kg", "range_min"), Dependency("physical", "mass_kg", "30", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_continuous", None, "provides"), Dependency("force", "thrust_profile", "high_continuous", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "600", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -172,6 +177,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("infrastructure", "fuel_infrastructure", "none", None, "requires"), Dependency("infrastructure", "fuel_infrastructure", "none", None, "requires"),
Dependency("physical", "mass_kg", "0", "kg", "range_min"), Dependency("physical", "mass_kg", "0", "kg", "range_min"),
Dependency("force", "thrust_profile", "low_continuous", None, "provides"), Dependency("force", "thrust_profile", "low_continuous", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "200", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -184,6 +190,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("physical", "mass_kg", "2000", "kg", "range_min"), Dependency("physical", "mass_kg", "2000", "kg", "range_min"),
Dependency("force", "thrust_profile", "extreme_continuous", None, "provides"), Dependency("force", "thrust_profile", "extreme_continuous", None, "provides"),
Dependency("material", "radiation_shielding", "true", None, "requires"), Dependency("material", "radiation_shielding", "true", None, "requires"),
Dependency("physical", "energy_density_wh_kg", "500000", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -196,6 +203,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("environment", "atmosphere", "standard", None, "requires"), Dependency("environment", "atmosphere", "standard", None, "requires"),
Dependency("physical", "mass_kg", "500", "kg", "range_min"), Dependency("physical", "mass_kg", "500", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_continuous", None, "provides"), Dependency("force", "thrust_profile", "high_continuous", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "400", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -220,6 +228,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("infrastructure", "fuel_infrastructure", "ammunition", None, "requires"), Dependency("infrastructure", "fuel_infrastructure", "ammunition", None, "requires"),
Dependency("physical", "mass_kg", "100", "kg", "range_min"), Dependency("physical", "mass_kg", "100", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_burst", None, "provides"), Dependency("force", "thrust_profile", "high_burst", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "200", "Wh/kg", "provides"),
], ],
), ),
Entity( Entity(
@@ -232,6 +241,7 @@ POWER_SOURCES: list[Entity] = [
Dependency("physical", "mass_kg", "0", "kg", "range_min"), Dependency("physical", "mass_kg", "0", "kg", "range_min"),
Dependency("environment", "ground_surface", "true", None, "requires"), Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("force", "thrust_profile", "low_continuous", None, "provides"), Dependency("force", "thrust_profile", "low_continuous", None, "provides"),
Dependency("physical", "energy_density_wh_kg", "200", "Wh/kg", "provides"),
], ],
), ),
] ]

View File

@@ -3,6 +3,7 @@
from __future__ import annotations from __future__ import annotations
import os import os
import secrets
from pathlib import Path from pathlib import Path
from flask import Flask, g from flask import Flask, g
@@ -12,6 +13,29 @@ from physcom.db.repository import Repository
DEFAULT_DB = Path("data/physcom.db") DEFAULT_DB = Path("data/physcom.db")
_ENV_FILE = Path(".env")
def _load_or_generate_secret_key() -> str:
"""Return FLASK_SECRET_KEY from env, .env file, or auto-generate and persist it."""
key = os.environ.get("FLASK_SECRET_KEY", "").strip()
if key:
return key
# Try to load from .env
if _ENV_FILE.exists():
for line in _ENV_FILE.read_text().splitlines():
line = line.strip()
if line.startswith("FLASK_SECRET_KEY="):
key = line[len("FLASK_SECRET_KEY="):].strip()
if key:
return key
# Generate, persist, and return
key = secrets.token_hex(32)
with _ENV_FILE.open("a") as f:
f.write(f"FLASK_SECRET_KEY={key}\n")
return key
def get_repo() -> Repository: def get_repo() -> Repository:
@@ -31,7 +55,7 @@ def close_db(exc: BaseException | None = None) -> None:
def create_app() -> Flask: def create_app() -> Flask:
app = Flask(__name__) app = Flask(__name__)
app.secret_key = os.environ.get("FLASK_SECRET_KEY", "physcom-dev-key") app.secret_key = _load_or_generate_secret_key()
app.teardown_appcontext(close_db) app.teardown_appcontext(close_db)
@@ -57,4 +81,4 @@ def create_app() -> Flask:
def run() -> None: def run() -> None:
"""Entry point for `physcom-web` script.""" """Entry point for `physcom-web` script."""
app = create_app() app = create_app()
app.run(debug=True, port=int(os.environ.get("PORT", "5000"))) app.run(host="0.0.0.0", debug=True, port=int(os.environ.get("PORT", "5000")))

View File

@@ -1,9 +1,10 @@
"""Domain listing routes.""" """Domain CRUD routes."""
from __future__ import annotations from __future__ import annotations
from flask import Blueprint, render_template from flask import Blueprint, flash, redirect, render_template, request, url_for
from physcom.models.domain import Domain, MetricBound
from physcom_web.app import get_repo from physcom_web.app import get_repo
bp = Blueprint("domains", __name__, url_prefix="/domains") bp = Blueprint("domains", __name__, url_prefix="/domains")
@@ -14,3 +15,100 @@ def domain_list():
repo = get_repo() repo = get_repo()
domains = repo.list_domains() domains = repo.list_domains()
return render_template("domains/list.html", domains=domains) return render_template("domains/list.html", domains=domains)
@bp.route("/new", methods=["GET", "POST"])
def domain_new():
repo = get_repo()
if request.method == "POST":
name = request.form["name"].strip()
description = request.form.get("description", "").strip()
if not name:
flash("Name is required.", "error")
return render_template("domains/form.html")
domain = repo.add_domain(Domain(name=name, description=description))
flash(f"Domain '{name}' created.", "success")
return redirect(url_for("domains.domain_detail", domain_id=domain.id))
return render_template("domains/form.html")
@bp.route("/<int:domain_id>", methods=["GET", "POST"])
def domain_detail(domain_id: int):
repo = get_repo()
domain = repo.get_domain_by_id(domain_id)
if not domain:
flash("Domain not found.", "error")
return redirect(url_for("domains.domain_list"))
if request.method == "POST":
name = request.form["name"].strip()
description = request.form.get("description", "").strip()
if not name:
flash("Name is required.", "error")
else:
repo.update_domain(domain_id, name, description)
flash(f"Domain '{name}' updated.", "success")
domain = repo.get_domain_by_id(domain_id)
return render_template("domains/detail.html", domain=domain)
@bp.route("/<int:domain_id>/delete", methods=["POST"])
def domain_delete(domain_id: int):
repo = get_repo()
domain = repo.get_domain_by_id(domain_id)
if domain:
repo.delete_domain(domain_id)
flash(f"Domain '{domain.name}' deleted.", "success")
return redirect(url_for("domains.domain_list"))
# ── Metric bound CRUD (HTMX partials) ────────────────────────
@bp.route("/<int:domain_id>/metrics/add", methods=["POST"])
def metric_add(domain_id: int):
repo = get_repo()
metric_name = request.form["metric_name"].strip()
unit = request.form.get("unit", "").strip()
try:
weight = float(request.form.get("weight", "1.0"))
norm_min = float(request.form.get("norm_min", "0.0"))
norm_max = float(request.form.get("norm_max", "1.0"))
except ValueError:
flash("Weight, norm_min, and norm_max must be numbers.", "error")
domain = repo.get_domain_by_id(domain_id)
return render_template("domains/_metrics_table.html", domain=domain)
if not metric_name:
flash("Metric name is required.", "error")
else:
mb = MetricBound(metric_name=metric_name, weight=weight, norm_min=norm_min, norm_max=norm_max, unit=unit)
repo.add_metric_bound(domain_id, mb)
flash("Metric added.", "success")
domain = repo.get_domain_by_id(domain_id)
return render_template("domains/_metrics_table.html", domain=domain)
@bp.route("/<int:domain_id>/metrics/<int:metric_id>/edit", methods=["POST"])
def metric_edit(domain_id: int, metric_id: int):
repo = get_repo()
try:
weight = float(request.form.get("weight", "1.0"))
norm_min = float(request.form.get("norm_min", "0.0"))
norm_max = float(request.form.get("norm_max", "1.0"))
except ValueError:
flash("Weight, norm_min, and norm_max must be numbers.", "error")
domain = repo.get_domain_by_id(domain_id)
return render_template("domains/_metrics_table.html", domain=domain)
unit = request.form.get("unit", "").strip()
repo.update_metric_bound(domain_id, metric_id, weight, norm_min, norm_max, unit)
flash("Metric updated.", "success")
domain = repo.get_domain_by_id(domain_id)
return render_template("domains/_metrics_table.html", domain=domain)
@bp.route("/<int:domain_id>/metrics/<int:metric_id>/delete", methods=["POST"])
def metric_delete(domain_id: int, metric_id: int):
repo = get_repo()
repo.delete_metric_bound(domain_id, metric_id)
flash("Metric removed.", "success")
domain = repo.get_domain_by_id(domain_id)
return render_template("domains/_metrics_table.html", domain=domain)

View File

@@ -151,7 +151,7 @@ def run_cancel(run_id: int):
"""Set a running pipeline to cancelled. The pipeline checks this flag.""" """Set a running pipeline to cancelled. The pipeline checks this flag."""
repo = get_repo() repo = get_repo()
run = repo.get_pipeline_run(run_id) run = repo.get_pipeline_run(run_id)
if run and run["status"] == "running": if run and run["status"] in ("running", "rate_limited"):
repo.update_pipeline_run(run_id, status="cancelled") repo.update_pipeline_run(run_id, status="cancelled")
flash(f"Run #{run_id} cancellation requested.", "info") flash(f"Run #{run_id} cancellation requested.", "info")
return redirect(url_for("pipeline.pipeline_form")) return redirect(url_for("pipeline.pipeline_form"))

View File

@@ -65,6 +65,18 @@ def result_detail(domain_name: str, combo_id: int):
) )
@bp.route("/<domain_name>/reset", methods=["POST"])
def reset_results(domain_name: str):
repo = get_repo()
domain = repo.get_domain(domain_name)
if not domain:
flash(f"Domain '{domain_name}' not found.", "error")
return redirect(url_for("results.results_index"))
count = repo.reset_domain_results(domain_name)
flash(f"Reset {count} result rows for '{domain_name}'. Ready to re-run.", "success")
return redirect(url_for("results.results_domain", domain_name=domain_name))
@bp.route("/<domain_name>/<int:combo_id>/review", methods=["POST"]) @bp.route("/<domain_name>/<int:combo_id>/review", methods=["POST"])
def submit_review(domain_name: str, combo_id: int): def submit_review(domain_name: str, combo_id: int):
repo = get_repo() repo = get_repo()
@@ -84,6 +96,7 @@ def submit_review(domain_name: str, combo_id: int):
combo_id, domain.id, composite_score, combo_id, domain.id, composite_score,
pass_reached=5, pass_reached=5,
novelty_flag=novelty_flag, novelty_flag=novelty_flag,
llm_review=existing.get("llm_review") if existing else None,
human_notes=human_notes, human_notes=human_notes,
) )
repo.update_combination_status(combo_id, "reviewed") repo.update_combination_status(combo_id, "reviewed")

View File

@@ -81,6 +81,7 @@ table.compact th, table.compact td { padding: 0.25rem 0.4rem; font-size: 0.85rem
.badge-valid { background: #dcfce7; color: #166534; } .badge-valid { background: #dcfce7; color: #166534; }
.badge-blocked { background: #fee2e2; color: #991b1b; } .badge-blocked { background: #fee2e2; color: #991b1b; }
.badge-scored { background: #dbeafe; color: #1e40af; } .badge-scored { background: #dbeafe; color: #1e40af; }
.badge-llm_reviewed { background: #e0f2fe; color: #0369a1; }
.badge-reviewed { background: #f3e8ff; color: #6b21a8; } .badge-reviewed { background: #f3e8ff; color: #6b21a8; }
.badge-pending { background: #fef3c7; color: #92400e; } .badge-pending { background: #fef3c7; color: #92400e; }
@@ -171,6 +172,7 @@ dd { font-size: 0.9rem; }
.badge-completed { background: #dcfce7; color: #166534; } .badge-completed { background: #dcfce7; color: #166534; }
.badge-failed { background: #fee2e2; color: #991b1b; } .badge-failed { background: #fee2e2; color: #991b1b; }
.badge-cancelled { background: #fef3c7; color: #92400e; } .badge-cancelled { background: #fef3c7; color: #92400e; }
.badge-rate_limited { background: #ffedd5; color: #9a3412; }
.run-status { padding: 0.25rem 0; } .run-status { padding: 0.25rem 0; }
.run-status-header { display: flex; align-items: center; gap: 0.5rem; margin-bottom: 0.5rem; } .run-status-header { display: flex; align-items: center; gap: 0.5rem; margin-bottom: 0.5rem; }

View File

@@ -0,0 +1,68 @@
<table id="metrics-table">
<thead>
<tr>
<th>Metric</th>
<th>Unit</th>
<th>Weight</th>
<th>Norm Min</th>
<th>Norm Max</th>
<th></th>
</tr>
</thead>
<tbody>
{% for mb in domain.metric_bounds %}
<tr>
<td>{{ mb.metric_name }}</td>
<td>{{ mb.unit or '—' }}</td>
<td>{{ mb.weight }}</td>
<td>{{ mb.norm_min }}</td>
<td>{{ mb.norm_max }}</td>
<td class="actions">
<button class="btn btn-sm"
onclick="this.closest('tr').nextElementSibling.style.display='table-row'; this.closest('tr').style.display='none'">
Edit
</button>
<form method="post"
hx-post="{{ url_for('domains.metric_delete', domain_id=domain.id, metric_id=mb.metric_id) }}"
hx-target="#metrics-section" hx-swap="innerHTML"
class="inline-form">
<button type="submit" class="btn btn-sm btn-danger">Del</button>
</form>
</td>
</tr>
<tr class="edit-row" style="display:none">
<form method="post"
hx-post="{{ url_for('domains.metric_edit', domain_id=domain.id, metric_id=mb.metric_id) }}"
hx-target="#metrics-section" hx-swap="innerHTML">
<td>{{ mb.metric_name }}</td>
<td><input name="unit" value="{{ mb.unit or '' }}"></td>
<td><input name="weight" type="number" step="any" value="{{ mb.weight }}" required></td>
<td><input name="norm_min" type="number" step="any" value="{{ mb.norm_min }}" required></td>
<td><input name="norm_max" type="number" step="any" value="{{ mb.norm_max }}" required></td>
<td>
<button type="submit" class="btn btn-sm btn-primary">Save</button>
<button type="button" class="btn btn-sm"
onclick="this.closest('tr').style.display='none'; this.closest('tr').previousElementSibling.style.display=''">
Cancel
</button>
</td>
</form>
</tr>
{% endfor %}
</tbody>
</table>
<h3>Add Metric</h3>
<form method="post"
hx-post="{{ url_for('domains.metric_add', domain_id=domain.id) }}"
hx-target="#metrics-section" hx-swap="innerHTML"
class="dep-add-form">
<div class="form-row">
<input name="metric_name" placeholder="metric name" required>
<input name="unit" placeholder="unit">
<input name="weight" type="number" step="any" placeholder="weight" value="1.0" required>
<input name="norm_min" type="number" step="any" placeholder="norm min" value="0.0" required>
<input name="norm_max" type="number" step="any" placeholder="norm max" value="1.0" required>
<button type="submit" class="btn btn-primary">Add</button>
</div>
</form>

View File

@@ -0,0 +1,36 @@
{% extends "base.html" %}
{% block title %}{{ domain.name }} — PhysCom{% endblock %}
{% block content %}
<div class="page-header">
<h1>{{ domain.name }}</h1>
<form method="post" action="{{ url_for('domains.domain_delete', domain_id=domain.id) }}" class="inline-form"
onsubmit="return confirm('Delete this domain and all its pipeline runs and results?')">
<button type="submit" class="btn btn-danger">Delete Domain</button>
</form>
</div>
<div class="card">
<form method="post" action="{{ url_for('domains.domain_detail', domain_id=domain.id) }}">
<div class="form-row">
<div class="form-group" style="flex:1">
<label for="name">Name</label>
<input type="text" id="name" name="name" value="{{ domain.name }}" required>
</div>
<div class="form-group" style="flex:2">
<label for="description">Description</label>
<input type="text" id="description" name="description" value="{{ domain.description }}">
</div>
<div class="form-group" style="align-self:end">
<button type="submit" class="btn btn-primary">Save</button>
</div>
</div>
</form>
</div>
<h2>Metrics</h2>
<div id="metrics-section">
{% include "domains/_metrics_table.html" %}
</div>
{% endblock %}

View File

@@ -0,0 +1,24 @@
{% extends "base.html" %}
{% block title %}New Domain — PhysCom{% endblock %}
{% block content %}
<h1>New Domain</h1>
<div class="card">
<form method="post">
<div class="form-group">
<label for="name">Name</label>
<input type="text" id="name" name="name"
value="{{ request.form.get('name', '') }}" required placeholder="e.g. Urban Transit">
</div>
<div class="form-group">
<label for="description">Description</label>
<textarea id="description" name="description" rows="3">{{ request.form.get('description', '') }}</textarea>
</div>
<div class="form-actions">
<button type="submit" class="btn btn-primary">Create</button>
<a href="{{ url_for('domains.domain_list') }}" class="btn">Cancel</a>
</div>
</form>
</div>
{% endblock %}

View File

@@ -2,14 +2,17 @@
{% block title %}Domains — PhysCom{% endblock %} {% block title %}Domains — PhysCom{% endblock %}
{% block content %} {% block content %}
<div class="page-header">
<h1>Domains</h1> <h1>Domains</h1>
<a href="{{ url_for('domains.domain_new') }}" class="btn btn-primary">New Domain</a>
</div>
{% if not domains %} {% if not domains %}
<p class="empty">No domains found. Seed data via CLI first.</p> <p class="empty">No domains found. <a href="{{ url_for('domains.domain_new') }}">Create one</a> or seed data via CLI.</p>
{% else %} {% else %}
{% for d in domains %} {% for d in domains %}
<div class="card"> <div class="card">
<h2>{{ d.name }}</h2> <h2><a href="{{ url_for('domains.domain_detail', domain_id=d.id) }}">{{ d.name }}</a></h2>
<p>{{ d.description }}</p> <p>{{ d.description }}</p>
<table> <table>
<thead> <thead>

View File

@@ -1,6 +1,6 @@
{# HTMX partial: live status for a single pipeline run #} {# HTMX partial: live status for a single pipeline run #}
<div class="run-status run-status-{{ run.status }}" <div class="run-status run-status-{{ run.status }}"
{% if run.status == 'running' or run.status == 'pending' %} {% if run.status in ('running', 'pending', 'rate_limited') %}
hx-get="{{ url_for('pipeline.run_status', run_id=run.id) }}" hx-get="{{ url_for('pipeline.run_status', run_id=run.id) }}"
hx-trigger="every 2s" hx-trigger="every 2s"
hx-swap="outerHTML" hx-swap="outerHTML"
@@ -65,8 +65,14 @@
<div class="flash flash-error" style="margin-top:0.5rem">{{ run.error_message }}</div> <div class="flash flash-error" style="margin-top:0.5rem">{{ run.error_message }}</div>
{% endif %} {% endif %}
{% if run.status == 'rate_limited' %}
<div class="flash flash-info" style="margin-top:0.5rem">
Rate limited — waiting for quota to refresh, then resuming automatically.
</div>
{% endif %}
<div class="run-status-actions"> <div class="run-status-actions">
{% if run.status == 'running' %} {% if run.status in ('running', 'rate_limited') %}
<form method="post" action="{{ url_for('pipeline.run_cancel', run_id=run.id) }}" class="inline-form"> <form method="post" action="{{ url_for('pipeline.run_cancel', run_id=run.id) }}" class="inline-form">
<button type="submit" class="btn btn-danger btn-sm">Cancel</button> <button type="submit" class="btn btn-danger btn-sm">Cancel</button>
</form> </form>

View File

@@ -74,7 +74,7 @@
</form> </form>
</div> </div>
{% set active_runs = runs | selectattr('status', 'in', ['pending', 'running']) | list %} {% set active_runs = runs | selectattr('status', 'in', ['pending', 'running', 'rate_limited']) | list %}
{% if active_runs %} {% if active_runs %}
<h2>Active Runs</h2> <h2>Active Runs</h2>
{% for run in active_runs %} {% for run in active_runs %}

View File

@@ -15,7 +15,14 @@
{% if domain and results is not none %} {% if domain and results is not none %}
<div class="card"> <div class="card">
<div class="page-header">
<h2>{{ domain.name }} <span class="subtitle">{{ domain.description }}</span></h2> <h2>{{ domain.name }} <span class="subtitle">{{ domain.description }}</span></h2>
<form method="post" action="{{ url_for('results.reset_results', domain_name=domain.name) }}"
class="inline-form"
onsubmit="return confirm('Delete all results for {{ domain.name }}? This cannot be undone.')">
<button type="submit" class="btn btn-danger btn-sm">Reset results</button>
</form>
</div>
{% if statuses %} {% if statuses %}
<div class="filter-row"> <div class="filter-row">
@@ -53,7 +60,7 @@
{% for r in results %} {% for r in results %}
<tr> <tr>
<td>{{ loop.index }}</td> <td>{{ loop.index }}</td>
<td class="score-cell">{{ "%.4f"|format(r.composite_score) if r.composite_score else '—' }}</td> <td class="score-cell">{{ "%.4f"|format(r.composite_score) if r.composite_score is not none else '—' }}</td>
<td>{{ r.combination.entities|map(attribute='name')|join(' + ') }}</td> <td>{{ r.combination.entities|map(attribute='name')|join(' + ') }}</td>
<td><span class="badge badge-{{ r.combination.status }}">{{ r.combination.status }}</span></td> <td><span class="badge badge-{{ r.combination.status }}">{{ r.combination.status }}</span></td>
<td class="block-reason-cell"> <td class="block-reason-cell">

View File

@@ -119,3 +119,67 @@ def test_hydrogen_bicycle_valid(bicycle, hydrogen_engine):
# This is actually a borderline case — let's just verify no hard physics blocks # This is actually a borderline case — let's just verify no hard physics blocks
range_blocks = [v for v in result.violations if "mutually exclusive" in v or "atmosphere" in v] range_blocks = [v for v in result.violations if "mutually exclusive" in v or "atmosphere" in v]
assert len(range_blocks) == 0 assert len(range_blocks) == 0
def test_energy_density_deficit_blocks():
"""A platform needing 2000 Wh/kg paired with a 200 Wh/kg battery → blocked."""
platform = Entity(
name="Spaceship", dimension="platform",
dependencies=[
Dependency("physical", "energy_density_wh_kg", "2000", "Wh/kg", "range_min"),
],
)
power = Entity(
name="Battery", dimension="power_source",
dependencies=[
Dependency("physical", "energy_density_wh_kg", "200", "Wh/kg", "provides"),
],
)
resolver = ConstraintResolver()
combo = Combination(entities=[platform, power])
result = resolver.resolve(combo)
assert result.status == "blocked"
assert any("energy density deficit" in v for v in result.violations)
def test_energy_density_under_density_warning():
"""A platform needing 400 Wh/kg paired with a 200 Wh/kg battery → conditional."""
platform = Entity(
name="Airplane", dimension="platform",
dependencies=[
Dependency("physical", "energy_density_wh_kg", "400", "Wh/kg", "range_min"),
],
)
power = Entity(
name="Battery", dimension="power_source",
dependencies=[
Dependency("physical", "energy_density_wh_kg", "200", "Wh/kg", "provides"),
],
)
resolver = ConstraintResolver()
combo = Combination(entities=[platform, power])
result = resolver.resolve(combo)
assert result.status != "blocked"
assert any("under-density" in w for w in result.warnings)
def test_energy_density_no_constraint_if_no_provider():
"""A platform with energy density requirement but no declared provider → no violation."""
platform = Entity(
name="Spaceship", dimension="platform",
dependencies=[
Dependency("physical", "energy_density_wh_kg", "2000", "Wh/kg", "range_min"),
],
)
# Solar Sail-style: no energy_density_wh_kg declared
power = Entity(
name="Solar Sail", dimension="power_source",
dependencies=[
Dependency("force", "force_output_watts", "1", "watts", "provides"),
],
)
resolver = ConstraintResolver()
combo = Combination(entities=[platform, power])
result = resolver.resolve(combo)
density_violations = [v for v in result.violations if "energy density" in v]
assert len(density_violations) == 0