Add Flask web UI, Docker Compose, core engine + tests

- physcom core: CLI, 5-pass pipeline, SQLite repo, 37 tests
- physcom_web: Flask app with HTMX for entity/domain/pipeline/results CRUD
- Docker Compose: web + cli services sharing a named volume for the DB
- Clean up local settings to use wildcard permissions

Co-Authored-By: Claude Opus 4.6 <noreply@anthropic.com>
This commit is contained in:
Simonson, Andrew
2026-02-18 13:59:53 -06:00
parent 6e0f82835a
commit 8118a62242
54 changed files with 3505 additions and 1 deletions

1
src/physcom/__init__.py Normal file
View File

@@ -0,0 +1 @@
"""Physical Combinatorics — innovation via attribute mixing."""

4
src/physcom/__main__.py Normal file
View File

@@ -0,0 +1,4 @@
"""Allow `python -m physcom` to work."""
from physcom.cli import main
main()

268
src/physcom/cli.py Normal file
View File

@@ -0,0 +1,268 @@
"""CLI entry point for Physical Combinatorics."""
from __future__ import annotations
import json
from pathlib import Path
import click
from physcom.db.schema import init_db
from physcom.db.repository import Repository
DEFAULT_DB = Path("data/physcom.db")
def _get_repo(db_path: str | None = None) -> Repository:
path = Path(db_path) if db_path else DEFAULT_DB
conn = init_db(path)
return Repository(conn)
@click.group()
@click.option("--db", default=None, help="Path to SQLite database")
@click.pass_context
def main(ctx, db):
"""Physical Combinatorics — innovation via attribute mixing."""
ctx.ensure_object(dict)
ctx.obj["db"] = db
@main.command()
@click.pass_context
def init(ctx):
"""Create database and initialize schema."""
repo = _get_repo(ctx.obj["db"])
click.echo(f"Database initialized at {repo.conn.execute('PRAGMA database_list').fetchone()[2]}")
@main.command()
@click.argument("seed_name")
@click.pass_context
def seed(ctx, seed_name):
"""Load seed data (e.g., 'transport')."""
repo = _get_repo(ctx.obj["db"])
if seed_name == "transport":
from physcom.seed.transport_example import load_transport_seed
counts = load_transport_seed(repo)
click.echo(f"Loaded transport seed: {counts}")
else:
click.echo(f"Unknown seed: {seed_name}. Available: transport")
@main.group()
def entity():
"""Manage entities."""
pass
@entity.command("list")
@click.option("--dimension", "-d", default=None, help="Filter by dimension")
@click.pass_context
def entity_list(ctx, dimension):
"""List entities."""
repo = _get_repo(ctx.obj["db"])
entities = repo.list_entities(dimension=dimension)
if not entities:
click.echo("No entities found.")
return
current_dim = None
for e in entities:
if e.dimension != current_dim:
current_dim = e.dimension
click.echo(f"\n[{current_dim}]")
dep_count = len(e.dependencies)
click.echo(f" {e.id:3d}. {e.name} ({dep_count} deps)")
@entity.command("add")
@click.argument("dimension")
@click.argument("name")
@click.option("--description", "-d", default="", help="Entity description")
@click.pass_context
def entity_add(ctx, dimension, name, description):
"""Add an entity to a dimension."""
from physcom.models.entity import Entity
repo = _get_repo(ctx.obj["db"])
entity = Entity(name=name, dimension=dimension, description=description)
repo.add_entity(entity)
click.echo(f"Added entity '{name}' to dimension '{dimension}' (id={entity.id})")
@main.group()
def domain():
"""Manage domains."""
pass
@domain.command("list")
@click.pass_context
def domain_list(ctx):
"""List domains."""
repo = _get_repo(ctx.obj["db"])
domains = repo.list_domains()
if not domains:
click.echo("No domains found.")
return
for d in domains:
metric_info = ", ".join(
f"{mb.metric_name}(w={mb.weight})" for mb in d.metric_bounds
)
click.echo(f" {d.name}: {d.description}")
click.echo(f" metrics: {metric_info}")
@main.command()
@click.argument("domain_name")
@click.option("--passes", "-p", default="1,2,3", help="Comma-separated pass numbers to run")
@click.option("--threshold", "-t", default=0.1, type=float, help="Score threshold for pass 3")
@click.option("--dimensions", "-d", default="platform,power_source",
help="Comma-separated dimension names")
@click.pass_context
def run(ctx, domain_name, passes, threshold, dimensions):
"""Run the viability pipeline for a domain."""
repo = _get_repo(ctx.obj["db"])
domain = repo.get_domain(domain_name)
if not domain:
click.echo(f"Domain '{domain_name}' not found.")
return
from physcom.engine.constraint_resolver import ConstraintResolver
from physcom.engine.scorer import Scorer
from physcom.engine.pipeline import Pipeline
resolver = ConstraintResolver()
scorer = Scorer(domain)
pass_list = [int(p.strip()) for p in passes.split(",")]
dim_list = [d.strip() for d in dimensions.split(",")]
pipeline = Pipeline(repo, resolver, scorer, llm=None)
click.echo(f"Running pipeline for '{domain_name}' (passes={pass_list}, threshold={threshold})")
click.echo(f"Dimensions: {dim_list}")
result = pipeline.run(domain, dim_list, score_threshold=threshold, passes=pass_list)
click.echo(f"\nResults:")
click.echo(f" Total combinations: {result.total_generated}")
click.echo(f" Pass 1 — valid: {result.pass1_valid}, "
f"conditional: {result.pass1_conditional}, "
f"blocked: {result.pass1_blocked}")
if 2 in pass_list:
click.echo(f" Pass 2 — estimated: {result.pass2_estimated}")
if 3 in pass_list:
click.echo(f" Pass 3 — above threshold: {result.pass3_above_threshold}")
if 4 in pass_list:
click.echo(f" Pass 4 — LLM reviewed: {result.pass4_reviewed}")
@main.command()
@click.argument("domain_name")
@click.option("--top", "-n", default=10, type=int, help="Number of top results to show")
@click.pass_context
def results(ctx, domain_name, top):
"""View top-N results for a domain."""
repo = _get_repo(ctx.obj["db"])
top_results = repo.get_top_results(domain_name, limit=top)
if not top_results:
click.echo(f"No results for domain '{domain_name}'.")
return
click.echo(f"\nTop {len(top_results)} results for '{domain_name}':\n")
for i, r in enumerate(top_results, 1):
combo = r["combination"]
entity_names = " + ".join(e.name for e in combo.entities)
click.echo(f" {i:2d}. [{r['composite_score']:.4f}] {entity_names}")
if r["novelty_flag"]:
click.echo(f" novelty: {r['novelty_flag']}")
if r["llm_review"]:
click.echo(f" review: {r['llm_review'][:100]}")
if r["human_notes"]:
click.echo(f" notes: {r['human_notes'][:100]}")
@main.command()
@click.argument("combination_id", type=int)
@click.pass_context
def review(ctx, combination_id):
"""Interactive human review of a combination."""
repo = _get_repo(ctx.obj["db"])
combo = repo.get_combination(combination_id)
if not combo:
click.echo(f"Combination {combination_id} not found.")
return
entity_names = " + ".join(e.name for e in combo.entities)
click.echo(f"\nCombination #{combo.id}: {entity_names}")
click.echo(f"Status: {combo.status}")
if combo.block_reason:
click.echo(f"Block reason: {combo.block_reason}")
click.echo("\nEntities:")
for e in combo.entities:
click.echo(f" [{e.dimension}] {e.name}: {e.description}")
for dep in e.dependencies:
click.echo(f" {dep.constraint_type}: {dep.key}={dep.value}"
f"{f' ({dep.unit})' if dep.unit else ''}")
novelty = click.prompt(
"\nNovelty flag (novel/exists/researched/skip)", default="skip"
)
notes = click.prompt("Human notes (or empty)", default="")
if novelty != "skip" or notes:
# Get all domains this combo has results for
rows = repo.conn.execute(
"SELECT domain_id, composite_score FROM combination_results WHERE combination_id = ?",
(combo.id,),
).fetchall()
for row in rows:
repo.save_result(
combo.id, row["domain_id"], row["composite_score"],
pass_reached=5,
novelty_flag=novelty if novelty != "skip" else None,
human_notes=notes or None,
)
repo.update_combination_status(combo.id, "reviewed")
click.echo("Review saved.")
@main.command()
@click.argument("domain_name")
@click.option("--format", "fmt", default="md", help="Export format (md)")
@click.option("--top", "-n", default=20, type=int, help="Number of results to export")
@click.option("--output", "-o", default=None, help="Output file path")
@click.pass_context
def export(ctx, domain_name, fmt, top, output):
"""Export results to a report."""
repo = _get_repo(ctx.obj["db"])
top_results = repo.get_top_results(domain_name, limit=top)
if not top_results:
click.echo(f"No results for domain '{domain_name}'.")
return
if fmt == "md":
lines = [f"# {domain_name} — Top {len(top_results)} Concepts\n"]
for i, r in enumerate(top_results, 1):
combo = r["combination"]
entity_names = " + ".join(e.name for e in combo.entities)
lines.append(f"## {i}. {entity_names} (score: {r['composite_score']:.4f})")
if r["novelty_flag"]:
lines.append(f"**Novelty:** {r['novelty_flag']}")
if r["llm_review"]:
lines.append(f"\n{r['llm_review']}")
if r["human_notes"]:
lines.append(f"\n*Notes:* {r['human_notes']}")
lines.append("")
content = "\n".join(lines)
if output:
Path(output).write_text(content)
click.echo(f"Exported to {output}")
else:
click.echo(content)
else:
click.echo(f"Unsupported format: {fmt}")
if __name__ == "__main__":
main()

View File

View File

@@ -0,0 +1,414 @@
"""CRUD operations for all entities."""
from __future__ import annotations
import hashlib
import sqlite3
from typing import Sequence
from physcom.models.entity import Dependency, Entity
from physcom.models.domain import Domain, MetricBound
from physcom.models.combination import Combination
class Repository:
"""Thin data-access layer over the SQLite database."""
def __init__(self, conn: sqlite3.Connection) -> None:
self.conn = conn
self.conn.row_factory = sqlite3.Row
# ── Dimensions ──────────────────────────────────────────────
def ensure_dimension(self, name: str, description: str = "") -> int:
"""Insert dimension if it doesn't exist, return its id."""
cur = self.conn.execute(
"INSERT OR IGNORE INTO dimensions (name, description) VALUES (?, ?)",
(name, description),
)
if cur.lastrowid and cur.rowcount:
self.conn.commit()
return cur.lastrowid
row = self.conn.execute(
"SELECT id FROM dimensions WHERE name = ?", (name,)
).fetchone()
return row["id"]
def list_dimensions(self) -> list[dict]:
rows = self.conn.execute("SELECT * FROM dimensions ORDER BY name").fetchall()
return [dict(r) for r in rows]
# ── Entities ────────────────────────────────────────────────
def add_entity(self, entity: Entity) -> Entity:
"""Persist an Entity (and its dependencies). Returns it with id set."""
dim_id = self.ensure_dimension(entity.dimension)
cur = self.conn.execute(
"INSERT INTO entities (dimension_id, name, description) VALUES (?, ?, ?)",
(dim_id, entity.name, entity.description),
)
entity.id = cur.lastrowid
entity.dimension_id = dim_id
for dep in entity.dependencies:
dep_cur = self.conn.execute(
"""INSERT INTO dependencies
(entity_id, category, key, value, unit, constraint_type)
VALUES (?, ?, ?, ?, ?, ?)""",
(entity.id, dep.category, dep.key, dep.value, dep.unit, dep.constraint_type),
)
dep.id = dep_cur.lastrowid
self.conn.commit()
return entity
def get_entity(self, entity_id: int) -> Entity | None:
row = self.conn.execute(
"""SELECT e.id, e.name, e.description, d.name as dimension, e.dimension_id
FROM entities e JOIN dimensions d ON e.dimension_id = d.id
WHERE e.id = ?""",
(entity_id,),
).fetchone()
if not row:
return None
deps = self._load_dependencies(row["id"])
return Entity(
id=row["id"],
name=row["name"],
description=row["description"] or "",
dimension=row["dimension"],
dimension_id=row["dimension_id"],
dependencies=deps,
)
def list_entities(self, dimension: str | None = None) -> list[Entity]:
if dimension:
rows = self.conn.execute(
"""SELECT e.id, e.name, e.description, d.name as dimension, e.dimension_id
FROM entities e JOIN dimensions d ON e.dimension_id = d.id
WHERE d.name = ? ORDER BY e.name""",
(dimension,),
).fetchall()
else:
rows = self.conn.execute(
"""SELECT e.id, e.name, e.description, d.name as dimension, e.dimension_id
FROM entities e JOIN dimensions d ON e.dimension_id = d.id
ORDER BY d.name, e.name"""
).fetchall()
entities = []
for r in rows:
deps = self._load_dependencies(r["id"])
entities.append(Entity(
id=r["id"], name=r["name"], description=r["description"] or "",
dimension=r["dimension"], dimension_id=r["dimension_id"],
dependencies=deps,
))
return entities
def _load_dependencies(self, entity_id: int) -> list[Dependency]:
rows = self.conn.execute(
"SELECT * FROM dependencies WHERE entity_id = ?", (entity_id,)
).fetchall()
return [
Dependency(
id=r["id"], category=r["category"], key=r["key"],
value=r["value"], unit=r["unit"], constraint_type=r["constraint_type"],
)
for r in rows
]
def update_entity(self, entity_id: int, name: str, description: str) -> None:
self.conn.execute(
"UPDATE entities SET name = ?, description = ? WHERE id = ?",
(name, description, entity_id),
)
self.conn.commit()
def delete_entity(self, entity_id: int) -> None:
self.conn.execute("DELETE FROM dependencies WHERE entity_id = ?", (entity_id,))
self.conn.execute("DELETE FROM combination_entities WHERE entity_id = ?", (entity_id,))
self.conn.execute("DELETE FROM entities WHERE id = ?", (entity_id,))
self.conn.commit()
def add_dependency(self, entity_id: int, dep: Dependency) -> Dependency:
cur = self.conn.execute(
"""INSERT INTO dependencies
(entity_id, category, key, value, unit, constraint_type)
VALUES (?, ?, ?, ?, ?, ?)""",
(entity_id, dep.category, dep.key, dep.value, dep.unit, dep.constraint_type),
)
dep.id = cur.lastrowid
self.conn.commit()
return dep
def update_dependency(self, dep_id: int, dep: Dependency) -> None:
self.conn.execute(
"""UPDATE dependencies
SET category = ?, key = ?, value = ?, unit = ?, constraint_type = ?
WHERE id = ?""",
(dep.category, dep.key, dep.value, dep.unit, dep.constraint_type, dep_id),
)
self.conn.commit()
def delete_dependency(self, dep_id: int) -> None:
self.conn.execute("DELETE FROM dependencies WHERE id = ?", (dep_id,))
self.conn.commit()
def get_dependency(self, dep_id: int) -> Dependency | None:
row = self.conn.execute(
"SELECT * FROM dependencies WHERE id = ?", (dep_id,)
).fetchone()
if not row:
return None
return Dependency(
id=row["id"], category=row["category"], key=row["key"],
value=row["value"], unit=row["unit"], constraint_type=row["constraint_type"],
)
# ── Domains & Metrics ───────────────────────────────────────
def ensure_metric(self, name: str, unit: str = "", description: str = "") -> int:
self.conn.execute(
"INSERT OR IGNORE INTO metrics (name, unit, description) VALUES (?, ?, ?)",
(name, unit, description),
)
row = self.conn.execute("SELECT id FROM metrics WHERE name = ?", (name,)).fetchone()
self.conn.commit()
return row["id"]
def add_domain(self, domain: Domain) -> Domain:
cur = self.conn.execute(
"INSERT INTO domains (name, description) VALUES (?, ?)",
(domain.name, domain.description),
)
domain.id = cur.lastrowid
for mb in domain.metric_bounds:
metric_id = self.ensure_metric(mb.metric_name)
mb.metric_id = metric_id
self.conn.execute(
"""INSERT INTO domain_metric_weights
(domain_id, metric_id, weight, norm_min, norm_max)
VALUES (?, ?, ?, ?, ?)""",
(domain.id, metric_id, mb.weight, mb.norm_min, mb.norm_max),
)
self.conn.commit()
return domain
def get_domain(self, name: str) -> Domain | None:
row = self.conn.execute("SELECT * FROM domains WHERE name = ?", (name,)).fetchone()
if not row:
return None
weights = self.conn.execute(
"""SELECT m.name, dmw.weight, dmw.norm_min, dmw.norm_max, dmw.metric_id
FROM domain_metric_weights dmw
JOIN metrics m ON dmw.metric_id = m.id
WHERE dmw.domain_id = ?""",
(row["id"],),
).fetchall()
return Domain(
id=row["id"],
name=row["name"],
description=row["description"] or "",
metric_bounds=[
MetricBound(
metric_name=w["name"], weight=w["weight"],
norm_min=w["norm_min"], norm_max=w["norm_max"],
metric_id=w["metric_id"],
)
for w in weights
],
)
def list_domains(self) -> list[Domain]:
rows = self.conn.execute("SELECT name FROM domains ORDER BY name").fetchall()
return [self.get_domain(r["name"]) for r in rows]
# ── Combinations ────────────────────────────────────────────
@staticmethod
def compute_hash(entity_ids: Sequence[int]) -> str:
key = ",".join(str(eid) for eid in sorted(entity_ids))
return hashlib.sha256(key.encode()).hexdigest()[:16]
def save_combination(self, combination: Combination) -> Combination:
entity_ids = [e.id for e in combination.entities]
combination.hash = self.compute_hash(entity_ids)
existing = self.conn.execute(
"SELECT id FROM combinations WHERE hash = ?", (combination.hash,)
).fetchone()
if existing:
combination.id = existing["id"]
return combination
cur = self.conn.execute(
"INSERT INTO combinations (hash, status, block_reason) VALUES (?, ?, ?)",
(combination.hash, combination.status, combination.block_reason),
)
combination.id = cur.lastrowid
for eid in entity_ids:
self.conn.execute(
"INSERT INTO combination_entities (combination_id, entity_id) VALUES (?, ?)",
(combination.id, eid),
)
self.conn.commit()
return combination
def update_combination_status(
self, combo_id: int, status: str, block_reason: str | None = None
) -> None:
self.conn.execute(
"UPDATE combinations SET status = ?, block_reason = ? WHERE id = ?",
(status, block_reason, combo_id),
)
self.conn.commit()
def get_combination(self, combo_id: int) -> Combination | None:
row = self.conn.execute("SELECT * FROM combinations WHERE id = ?", (combo_id,)).fetchone()
if not row:
return None
entity_rows = self.conn.execute(
"SELECT entity_id FROM combination_entities WHERE combination_id = ?",
(combo_id,),
).fetchall()
entities = [self.get_entity(er["entity_id"]) for er in entity_rows]
return Combination(
id=row["id"], hash=row["hash"], status=row["status"],
block_reason=row["block_reason"], entities=entities,
)
def list_combinations(self, status: str | None = None) -> list[Combination]:
if status:
rows = self.conn.execute(
"SELECT id FROM combinations WHERE status = ? ORDER BY id", (status,)
).fetchall()
else:
rows = self.conn.execute("SELECT id FROM combinations ORDER BY id").fetchall()
return [self.get_combination(r["id"]) for r in rows]
# ── Scores & Results ────────────────────────────────────────
def save_scores(
self,
combo_id: int,
domain_id: int,
scores: list[dict],
) -> None:
"""Save per-metric scores. Each dict: metric_id, raw_value, normalized_score, estimation_method, confidence."""
for s in scores:
self.conn.execute(
"""INSERT OR REPLACE INTO combination_scores
(combination_id, domain_id, metric_id, raw_value, normalized_score,
estimation_method, confidence)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(combo_id, domain_id, s["metric_id"], s["raw_value"],
s["normalized_score"], s["estimation_method"], s["confidence"]),
)
self.conn.commit()
def save_result(
self,
combo_id: int,
domain_id: int,
composite_score: float,
pass_reached: int,
novelty_flag: str | None = None,
llm_review: str | None = None,
human_notes: str | None = None,
) -> None:
self.conn.execute(
"""INSERT OR REPLACE INTO combination_results
(combination_id, domain_id, composite_score, novelty_flag,
llm_review, human_notes, pass_reached)
VALUES (?, ?, ?, ?, ?, ?, ?)""",
(combo_id, domain_id, composite_score, novelty_flag,
llm_review, human_notes, pass_reached),
)
self.conn.commit()
def get_combination_scores(self, combo_id: int, domain_id: int) -> list[dict]:
"""Return per-metric scores for a combination in a domain."""
rows = self.conn.execute(
"""SELECT cs.*, m.name as metric_name
FROM combination_scores cs
JOIN metrics m ON cs.metric_id = m.id
WHERE cs.combination_id = ? AND cs.domain_id = ?""",
(combo_id, domain_id),
).fetchall()
return [dict(r) for r in rows]
def count_combinations_by_status(self) -> dict[str, int]:
rows = self.conn.execute(
"SELECT status, COUNT(*) as cnt FROM combinations GROUP BY status"
).fetchall()
return {r["status"]: r["cnt"] for r in rows}
def get_result(self, combo_id: int, domain_id: int) -> dict | None:
"""Return a single combination_result row."""
row = self.conn.execute(
"""SELECT cr.*, d.name as domain_name
FROM combination_results cr
JOIN domains d ON cr.domain_id = d.id
WHERE cr.combination_id = ? AND cr.domain_id = ?""",
(combo_id, domain_id),
).fetchone()
return dict(row) if row else None
def get_all_results(self, domain_name: str, status: str | None = None) -> list[dict]:
"""Return all results for a domain, optionally filtered by combo status."""
if status:
rows = self.conn.execute(
"""SELECT cr.*, c.hash, c.status as combo_status, d.name as domain_name
FROM combination_results cr
JOIN combinations c ON cr.combination_id = c.id
JOIN domains d ON cr.domain_id = d.id
WHERE d.name = ? AND c.status = ?
ORDER BY cr.composite_score DESC""",
(domain_name, status),
).fetchall()
else:
rows = self.conn.execute(
"""SELECT cr.*, c.hash, c.status as combo_status, d.name as domain_name
FROM combination_results cr
JOIN combinations c ON cr.combination_id = c.id
JOIN domains d ON cr.domain_id = d.id
WHERE d.name = ?
ORDER BY cr.composite_score DESC""",
(domain_name,),
).fetchall()
results = []
for r in rows:
combo = self.get_combination(r["combination_id"])
results.append({
"combination": combo,
"composite_score": r["composite_score"],
"novelty_flag": r["novelty_flag"],
"llm_review": r["llm_review"],
"human_notes": r["human_notes"],
"pass_reached": r["pass_reached"],
"domain_id": r["domain_id"],
})
return results
def get_top_results(self, domain_name: str, limit: int = 10) -> list[dict]:
"""Return top-N results for a domain, ordered by composite_score DESC."""
rows = self.conn.execute(
"""SELECT cr.*, c.hash, c.status, d.name as domain_name
FROM combination_results cr
JOIN combinations c ON cr.combination_id = c.id
JOIN domains d ON cr.domain_id = d.id
WHERE d.name = ?
ORDER BY cr.composite_score DESC
LIMIT ?""",
(domain_name, limit),
).fetchall()
results = []
for r in rows:
combo = self.get_combination(r["combination_id"])
results.append({
"combination": combo,
"composite_score": r["composite_score"],
"novelty_flag": r["novelty_flag"],
"llm_review": r["llm_review"],
"human_notes": r["human_notes"],
"pass_reached": r["pass_reached"],
})
return results

111
src/physcom/db/schema.py Normal file
View File

@@ -0,0 +1,111 @@
"""DDL, table creation, and schema initialization."""
from __future__ import annotations
import sqlite3
from pathlib import Path
DDL = """
CREATE TABLE IF NOT EXISTS dimensions (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT
);
CREATE TABLE IF NOT EXISTS entities (
id INTEGER PRIMARY KEY AUTOINCREMENT,
dimension_id INTEGER NOT NULL REFERENCES dimensions(id),
name TEXT NOT NULL,
description TEXT,
UNIQUE(dimension_id, name)
);
CREATE TABLE IF NOT EXISTS dependencies (
id INTEGER PRIMARY KEY AUTOINCREMENT,
entity_id INTEGER NOT NULL REFERENCES entities(id),
category TEXT NOT NULL,
key TEXT NOT NULL,
value TEXT NOT NULL,
unit TEXT,
constraint_type TEXT NOT NULL
);
CREATE TABLE IF NOT EXISTS domains (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
description TEXT
);
CREATE TABLE IF NOT EXISTS metrics (
id INTEGER PRIMARY KEY AUTOINCREMENT,
name TEXT UNIQUE NOT NULL,
unit TEXT,
description TEXT
);
CREATE TABLE IF NOT EXISTS domain_metric_weights (
id INTEGER PRIMARY KEY AUTOINCREMENT,
domain_id INTEGER NOT NULL REFERENCES domains(id),
metric_id INTEGER NOT NULL REFERENCES metrics(id),
weight REAL NOT NULL,
norm_min REAL,
norm_max REAL,
UNIQUE(domain_id, metric_id)
);
CREATE TABLE IF NOT EXISTS combinations (
id INTEGER PRIMARY KEY AUTOINCREMENT,
hash TEXT UNIQUE NOT NULL,
status TEXT NOT NULL DEFAULT 'pending',
block_reason TEXT,
created_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP
);
CREATE TABLE IF NOT EXISTS combination_entities (
combination_id INTEGER NOT NULL REFERENCES combinations(id),
entity_id INTEGER NOT NULL REFERENCES entities(id),
PRIMARY KEY (combination_id, entity_id)
);
CREATE TABLE IF NOT EXISTS combination_scores (
id INTEGER PRIMARY KEY AUTOINCREMENT,
combination_id INTEGER NOT NULL REFERENCES combinations(id),
domain_id INTEGER NOT NULL REFERENCES domains(id),
metric_id INTEGER NOT NULL REFERENCES metrics(id),
raw_value REAL,
normalized_score REAL,
estimation_method TEXT,
confidence REAL,
UNIQUE(combination_id, domain_id, metric_id)
);
CREATE TABLE IF NOT EXISTS combination_results (
id INTEGER PRIMARY KEY AUTOINCREMENT,
combination_id INTEGER NOT NULL REFERENCES combinations(id),
domain_id INTEGER NOT NULL REFERENCES domains(id),
composite_score REAL,
novelty_flag TEXT,
llm_review TEXT,
human_notes TEXT,
pass_reached INTEGER,
UNIQUE(combination_id, domain_id)
);
CREATE INDEX IF NOT EXISTS idx_deps_entity ON dependencies(entity_id);
CREATE INDEX IF NOT EXISTS idx_deps_category_key ON dependencies(category, key);
CREATE INDEX IF NOT EXISTS idx_combo_status ON combinations(status);
CREATE INDEX IF NOT EXISTS idx_scores_combo_domain ON combination_scores(combination_id, domain_id);
CREATE INDEX IF NOT EXISTS idx_results_domain_score ON combination_results(domain_id, composite_score DESC);
"""
def init_db(db_path: str | Path) -> sqlite3.Connection:
"""Create/open the database and ensure all tables exist."""
db_path = Path(db_path)
db_path.parent.mkdir(parents=True, exist_ok=True)
conn = sqlite3.connect(str(db_path))
conn.execute("PRAGMA journal_mode=WAL")
conn.execute("PRAGMA foreign_keys=ON")
conn.executescript(DDL)
conn.commit()
return conn

View File

View File

@@ -0,0 +1,33 @@
"""Cartesian product generator across dimensions."""
from __future__ import annotations
import itertools
from physcom.models.entity import Entity
from physcom.models.combination import Combination
from physcom.db.repository import Repository
def generate_combinations(
repo: Repository,
dimensions: list[str],
) -> list[Combination]:
"""Generate all Cartesian-product combinations across the given dimensions.
Each combination contains exactly one entity per dimension.
Deduplication is handled by the repository via hash.
"""
entity_groups: list[list[Entity]] = []
for dim in dimensions:
entities = repo.list_entities(dimension=dim)
if not entities:
raise ValueError(f"No entities found for dimension '{dim}'")
entity_groups.append(entities)
combinations = []
for entity_tuple in itertools.product(*entity_groups):
combo = Combination(entities=list(entity_tuple))
combinations.append(combo)
return combinations

View File

@@ -0,0 +1,180 @@
"""Dependency contradiction detection engine."""
from __future__ import annotations
from dataclasses import dataclass, field
from physcom.models.combination import Combination
from physcom.models.entity import Dependency
# Mutual exclusion registry: for a given key, which value-sets contradict.
# Values within the same set are compatible; values in different sets contradict.
MUTEX_VALUES: dict[str, list[set[str]]] = {
"atmosphere": [{"vacuum", "vacuum_or_thin"}, {"dense", "standard"}],
"medium": [{"ground"}, {"water"}, {"air"}, {"space"}],
}
@dataclass
class ConstraintResult:
"""Outcome of constraint resolution for a combination."""
status: str = "valid" # valid, blocked, conditional
violations: list[str] = field(default_factory=list)
warnings: list[str] = field(default_factory=list)
class ConstraintResolver:
"""Checks a Combination's entities for dependency contradictions."""
def __init__(self, mutex_registry: dict[str, list[set[str]]] | None = None) -> None:
self.mutex = mutex_registry or MUTEX_VALUES
def resolve(self, combination: Combination) -> ConstraintResult:
result = ConstraintResult()
all_deps: list[tuple[str, Dependency]] = []
for entity in combination.entities:
for dep in entity.dependencies:
all_deps.append((entity.name, dep))
self._check_requires_vs_excludes(all_deps, result)
self._check_mutual_exclusion(all_deps, result)
self._check_range_incompatibility(all_deps, result)
self._check_force_scale(combination, result)
self._check_unmet_requirements(all_deps, result)
if result.violations:
result.status = "blocked"
elif result.warnings:
result.status = "conditional"
return result
def _check_requires_vs_excludes(
self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult
) -> None:
"""Rule 1: If A requires key=X and B excludes key=X → BLOCKED."""
requires = [(name, d) for name, d in all_deps if d.constraint_type == "requires"]
excludes = [(name, d) for name, d in all_deps if d.constraint_type == "excludes"]
for req_name, req in requires:
for exc_name, exc in excludes:
if req_name == exc_name:
continue
if req.key == exc.key and req.value == exc.value:
result.violations.append(
f"{req_name} requires {req.key}={req.value} "
f"but {exc_name} excludes it"
)
def _check_mutual_exclusion(
self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult
) -> None:
"""Rule 2: If A requires key=X and B requires key=Y where X,Y are mutex → BLOCKED."""
requires = [(name, d) for name, d in all_deps if d.constraint_type == "requires"]
for i, (name_a, dep_a) in enumerate(requires):
for name_b, dep_b in requires[i + 1:]:
if name_a == name_b:
continue
if dep_a.key != dep_b.key:
continue
if dep_a.value == dep_b.value:
continue
# Check if values are in different mutex sets
if dep_a.key in self.mutex:
set_a = self._find_mutex_set(dep_a.key, dep_a.value)
set_b = self._find_mutex_set(dep_b.key, dep_b.value)
if set_a is not None and set_b is not None and set_a is not set_b:
result.violations.append(
f"{name_a} requires {dep_a.key}={dep_a.value} "
f"but {name_b} requires {dep_b.key}={dep_b.value} "
f"(mutually exclusive)"
)
def _find_mutex_set(self, key: str, value: str) -> set[str] | None:
"""Find which mutex set a value belongs to, or None."""
for value_set in self.mutex.get(key, []):
if value in value_set:
return value_set
return None
def _check_range_incompatibility(
self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult
) -> None:
"""Rule 3: If A range_min > B range_max for the same key → BLOCKED."""
range_mins: dict[str, list[tuple[str, float]]] = {}
range_maxs: dict[str, list[tuple[str, float]]] = {}
for name, dep in all_deps:
if dep.constraint_type == "range_min":
range_mins.setdefault(dep.key, []).append((name, float(dep.value)))
elif dep.constraint_type == "range_max":
range_maxs.setdefault(dep.key, []).append((name, float(dep.value)))
for key in set(range_mins) & set(range_maxs):
for min_name, min_val in range_mins[key]:
for max_name, max_val in range_maxs[key]:
if min_name == max_name:
continue
if min_val > max_val:
result.violations.append(
f"{min_name} requires {key} >= {min_val} "
f"but {max_name} limits {key} <= {max_val}"
)
def _check_force_scale(
self, combination: Combination, result: ConstraintResult
) -> None:
"""Rule 4: If power source output << platform requirement → warn/block."""
force_provided: list[tuple[str, float]] = []
force_required: list[tuple[str, float]] = []
for entity in combination.entities:
for dep in entity.dependencies:
if dep.key == "force_output_watts" and dep.constraint_type == "provides":
force_provided.append((entity.name, float(dep.value)))
elif dep.key == "force_required_watts" and dep.constraint_type == "range_min":
force_required.append((entity.name, float(dep.value)))
for req_name, req_watts in force_required:
for prov_name, prov_watts in force_provided:
if prov_watts < req_watts * 0.01:
# Off by more than 100x — hard block
result.violations.append(
f"{prov_name} provides {prov_watts}W but "
f"{req_name} requires {req_watts}W "
f"(force deficit > 100x)"
)
elif prov_watts < req_watts:
# Under-powered but not impossibly so — warn
result.warnings.append(
f"{prov_name} provides {prov_watts}W but "
f"{req_name} requires {req_watts}W "
f"(under-powered)"
)
def _check_unmet_requirements(
self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult
) -> None:
"""Rule 5: Required condition not provided by any entity → conditional."""
provides = {(d.key, d.value) for _, d in all_deps if d.constraint_type == "provides"}
# Ambient conditions that don't need to be explicitly provided
ambient = {
("ground_surface", "true"),
("gravity", "true"),
("star_proximity", "true"),
}
for name, dep in all_deps:
if dep.constraint_type != "requires":
continue
if dep.category == "infrastructure":
continue # Infrastructure is external, not checked here
key_val = (dep.key, dep.value)
if key_val not in provides and key_val not in ambient:
result.warnings.append(
f"{name} requires {dep.key}={dep.value} "
f"but no entity in this combination provides it"
)

View File

@@ -0,0 +1,246 @@
"""Multi-pass pipeline orchestrator."""
from __future__ import annotations
from dataclasses import dataclass, field
from physcom.db.repository import Repository
from physcom.engine.combinator import generate_combinations
from physcom.engine.constraint_resolver import ConstraintResolver, ConstraintResult
from physcom.engine.scorer import Scorer
from physcom.llm.base import LLMProvider
from physcom.llm.prompts import PHYSICS_ESTIMATION_PROMPT, PLAUSIBILITY_REVIEW_PROMPT
from physcom.models.combination import Combination, ScoredResult
from physcom.models.domain import Domain
@dataclass
class PipelineResult:
"""Summary of a pipeline run."""
total_generated: int = 0
pass1_valid: int = 0
pass1_blocked: int = 0
pass1_conditional: int = 0
pass2_estimated: int = 0
pass3_above_threshold: int = 0
pass4_reviewed: int = 0
pass5_human_reviewed: int = 0
top_results: list[dict] = field(default_factory=list)
def _describe_combination(combo: Combination) -> str:
"""Build a natural-language description of a combination."""
parts = [f"{e.dimension}: {e.name}" for e in combo.entities]
descriptions = [e.description for e in combo.entities if e.description]
header = " + ".join(parts)
detail = "; ".join(descriptions)
return f"{header}. {detail}"
class Pipeline:
"""Orchestrates the multi-pass viability pipeline."""
def __init__(
self,
repo: Repository,
resolver: ConstraintResolver,
scorer: Scorer,
llm: LLMProvider | None = None,
) -> None:
self.repo = repo
self.resolver = resolver
self.scorer = scorer
self.llm = llm
def run(
self,
domain: Domain,
dimensions: list[str],
score_threshold: float = 0.1,
passes: list[int] | None = None,
) -> PipelineResult:
if passes is None:
passes = [1, 2, 3, 4, 5]
result = PipelineResult()
# Generate all combinations
combos = generate_combinations(self.repo, dimensions)
result.total_generated = len(combos)
# Save all combinations to DB
for combo in combos:
self.repo.save_combination(combo)
# ── Pass 1: Constraint Resolution ───────────────────────
valid_combos: list[Combination] = []
if 1 in passes:
valid_combos = self._pass1_constraints(combos, result)
else:
valid_combos = combos
# ── Pass 2: Physics Estimation ──────────────────────────
estimated: list[tuple[Combination, dict[str, float]]] = []
if 2 in passes:
estimated = self._pass2_estimation(valid_combos, domain, result)
else:
# Skip estimation, use zeros
estimated = [(c, {}) for c in valid_combos]
# ── Pass 3: Scoring & Ranking ───────────────────────────
scored: list[tuple[Combination, ScoredResult]] = []
if 3 in passes:
scored = self._pass3_scoring(estimated, domain, score_threshold, result)
# ── Pass 4: LLM Review ──────────────────────────────────
if 4 in passes and self.llm:
self._pass4_llm_review(scored, domain, result)
# ── Save results after scoring ─────────────────────────
if 3 in passes:
max_pass = max(p for p in passes if p <= 5)
for combo, sr in scored:
self.repo.save_result(
combo.id, domain.id, sr.composite_score,
pass_reached=max_pass,
novelty_flag=sr.novelty_flag,
llm_review=sr.llm_review,
)
self.repo.update_combination_status(combo.id, "scored")
# Collect top results
result.top_results = self.repo.get_top_results(domain.name, limit=20)
return result
def _pass1_constraints(
self, combos: list[Combination], result: PipelineResult
) -> list[Combination]:
valid = []
for combo in combos:
cr: ConstraintResult = self.resolver.resolve(combo)
if cr.status == "blocked":
combo.status = "blocked"
combo.block_reason = "; ".join(cr.violations)
self.repo.update_combination_status(
combo.id, "blocked", combo.block_reason
)
result.pass1_blocked += 1
elif cr.status == "conditional":
combo.status = "valid"
self.repo.update_combination_status(combo.id, "valid")
valid.append(combo)
result.pass1_conditional += 1
else:
combo.status = "valid"
self.repo.update_combination_status(combo.id, "valid")
valid.append(combo)
result.pass1_valid += 1
return valid
def _pass2_estimation(
self,
combos: list[Combination],
domain: Domain,
result: PipelineResult,
) -> list[tuple[Combination, dict[str, float]]]:
metric_names = [mb.metric_name for mb in domain.metric_bounds]
estimated = []
for combo in combos:
description = _describe_combination(combo)
if self.llm:
raw_metrics = self.llm.estimate_physics(description, metric_names)
else:
# Stub estimation: derive from dependencies where possible
raw_metrics = self._stub_estimate(combo, metric_names)
estimated.append((combo, raw_metrics))
result.pass2_estimated += 1
return estimated
def _pass3_scoring(
self,
estimated: list[tuple[Combination, dict[str, float]]],
domain: Domain,
threshold: float,
result: PipelineResult,
) -> list[tuple[Combination, ScoredResult]]:
scored = []
for combo, raw_metrics in estimated:
sr = self.scorer.score_combination(combo, raw_metrics)
if sr.composite_score >= threshold:
scored.append((combo, sr))
result.pass3_above_threshold += 1
# Persist per-metric scores
score_dicts = []
bounds_by_name = {mb.metric_name: mb for mb in domain.metric_bounds}
for s in sr.scores:
mb = bounds_by_name.get(s.metric_name)
if mb and mb.metric_id:
score_dicts.append({
"metric_id": mb.metric_id,
"raw_value": s.raw_value,
"normalized_score": s.normalized_score,
"estimation_method": s.estimation_method,
"confidence": s.confidence,
})
if score_dicts:
self.repo.save_scores(combo.id, domain.id, score_dicts)
# Sort by composite score descending
scored.sort(key=lambda x: x[1].composite_score, reverse=True)
return scored
def _pass4_llm_review(
self,
scored: list[tuple[Combination, ScoredResult]],
domain: Domain,
result: PipelineResult,
) -> None:
for combo, sr in scored:
description = _describe_combination(combo)
score_dict = {s.metric_name: s.normalized_score for s in sr.scores}
review = self.llm.review_plausibility(description, score_dict)
sr.llm_review = review
result.pass4_reviewed += 1
def _stub_estimate(
self, combo: Combination, metric_names: list[str]
) -> dict[str, float]:
"""Simple heuristic estimation from dependency data."""
raw: dict[str, float] = {m: 0.0 for m in metric_names}
# Extract force output from power source
force_watts = 0.0
mass_kg = 100.0 # default
for entity in combo.entities:
for dep in entity.dependencies:
if dep.key == "force_output_watts" and dep.constraint_type == "provides":
force_watts = max(force_watts, float(dep.value))
if dep.key == "min_mass_kg" and dep.constraint_type == "range_min":
mass_kg = max(mass_kg, float(dep.value))
# Rough speed estimate: F=ma -> v proportional to power/mass
if "speed" in raw and mass_kg > 0:
# Very rough: speed ~ power / (mass * drag_coeff)
raw["speed"] = min(force_watts / mass_kg * 0.5, 300000)
if "cost_efficiency" in raw:
# Lower force = cheaper per km (roughly)
raw["cost_efficiency"] = max(0.01, 2.0 - force_watts / 100000)
if "safety" in raw:
raw["safety"] = 0.5 # default mid-range
if "availability" in raw:
raw["availability"] = 0.5
if "range_fuel" in raw:
# More power = more range (very rough)
raw["range_fuel"] = min(force_watts * 0.01, 1e10)
if "range_degradation" in raw:
raw["range_degradation"] = 365 # 1 year default
return raw

View File

@@ -0,0 +1,89 @@
"""Multiplicative logarithmic scoring engine."""
from __future__ import annotations
import math
from physcom.models.domain import Domain, MetricBound
from physcom.models.combination import Combination, Score, ScoredResult
def normalize(raw_value: float, norm_min: float, norm_max: float) -> float:
"""Log-normalize a raw value to 0-1 within domain bounds.
Values at or below norm_min -> 0.0
Values at or above norm_max -> 1.0
Values between are log-interpolated.
"""
if norm_max <= norm_min:
return 0.0
if raw_value <= norm_min:
return 0.0
if raw_value >= norm_max:
return 1.0
log_min = math.log1p(norm_min)
log_max = math.log1p(norm_max)
log_val = math.log1p(raw_value)
return (log_val - log_min) / (log_max - log_min)
def composite_score(scores: list[float], weights: list[float]) -> float:
"""Weighted geometric mean. Any score near 0 drives the result to 0.
score = product(s_i ^ w_i) for all metrics i
"""
if not scores or not weights:
return 0.0
result = 1.0
for score, weight in zip(scores, weights):
if score <= 0.0:
return 0.0
result *= score ** weight
return result
class Scorer:
"""Scores combinations against a domain's metric bounds."""
def __init__(self, domain: Domain) -> None:
self.domain = domain
self._bounds_by_name: dict[str, MetricBound] = {
mb.metric_name: mb for mb in domain.metric_bounds
}
def score_combination(
self,
combination: Combination,
raw_metrics: dict[str, float],
estimation_method: str = "physics_calc",
confidence: float = 1.0,
) -> ScoredResult:
"""Score a combination given raw metric values.
raw_metrics maps metric_name -> raw estimated value.
"""
scores: list[Score] = []
norm_scores: list[float] = []
weights: list[float] = []
for mb in self.domain.metric_bounds:
raw = raw_metrics.get(mb.metric_name, 0.0)
normed = normalize(raw, mb.norm_min, mb.norm_max)
scores.append(Score(
metric_name=mb.metric_name,
raw_value=raw,
normalized_score=normed,
estimation_method=estimation_method,
confidence=confidence,
))
norm_scores.append(normed)
weights.append(mb.weight)
comp = composite_score(norm_scores, weights)
return ScoredResult(
combination_id=combination.id,
domain_name=self.domain.name,
scores=scores,
composite_score=comp,
)

View File

25
src/physcom/llm/base.py Normal file
View File

@@ -0,0 +1,25 @@
"""Abstract LLM interface — provider-agnostic."""
from __future__ import annotations
from abc import ABC, abstractmethod
class LLMProvider(ABC):
"""Abstract LLM interface for physics estimation and plausibility review."""
@abstractmethod
def estimate_physics(
self, combination_description: str, metrics: list[str]
) -> dict[str, float]:
"""Given a natural-language description of a combination,
estimate raw metric values. Returns {metric_name: estimated_value}."""
...
@abstractmethod
def review_plausibility(
self, combination_description: str, scores: dict[str, float]
) -> str:
"""Given a combination and its scores, return a natural-language
plausibility and novelty assessment."""
...

View File

@@ -0,0 +1,37 @@
"""Prompt templates for LLM-assisted passes."""
PHYSICS_ESTIMATION_PROMPT = """\
You are a physics estimation assistant. Given the following transportation concept, \
estimate the requested metrics using order-of-magnitude physics reasoning.
## Concept
{description}
## Metrics to estimate
{metrics}
## Instructions
- Use real-world physics to estimate each metric.
- If the concept is implausible, still provide your best estimate.
- Return ONLY valid JSON mapping metric names to numeric values.
- Example: {{"speed": 45.0, "cost_efficiency": 0.15, "safety": 0.7}}
"""
PLAUSIBILITY_REVIEW_PROMPT = """\
You are reviewing a novel transportation concept for social and practical viability.
## Concept
{description}
## Metric Scores
{scores}
## Instructions
Review this concept for:
1. Social viability — would people actually use this?
2. Practical barriers — what engineering or regulatory obstacles exist?
3. Novelty — does anything similar already exist?
4. Overall plausibility — is this a genuinely interesting innovation or nonsense?
Provide a concise 2-4 sentence assessment.
"""

View File

View File

@@ -0,0 +1,28 @@
"""Mock LLM provider for testing."""
from __future__ import annotations
from physcom.llm.base import LLMProvider
class MockLLMProvider(LLMProvider):
"""Returns deterministic stub responses for testing."""
def __init__(self, default_estimates: dict[str, float] | None = None) -> None:
self._defaults = default_estimates or {}
def estimate_physics(
self, combination_description: str, metrics: list[str]
) -> dict[str, float]:
result = {}
for metric in metrics:
result[metric] = self._defaults.get(metric, 0.5)
return result
def review_plausibility(
self, combination_description: str, scores: dict[str, float]
) -> str:
avg = sum(scores.values()) / max(len(scores), 1)
if avg > 0.5:
return "This concept appears plausible and worth further investigation."
return "This concept has significant feasibility challenges."

View File

@@ -0,0 +1,9 @@
from physcom.models.entity import Entity, Dependency
from physcom.models.domain import Domain, MetricBound
from physcom.models.combination import Combination, Score, ScoredResult
__all__ = [
"Entity", "Dependency",
"Domain", "MetricBound",
"Combination", "Score", "ScoredResult",
]

View File

@@ -0,0 +1,45 @@
"""Combination and scoring dataclasses."""
from __future__ import annotations
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
if TYPE_CHECKING:
from physcom.models.entity import Entity
@dataclass
class Score:
"""A single metric score for a combination in a domain."""
metric_name: str
raw_value: float
normalized_score: float
estimation_method: str = "physics_calc"
confidence: float = 1.0
@dataclass
class ScoredResult:
"""Final composite result for a combination in a domain."""
combination_id: int | None
domain_name: str
scores: list[Score] = field(default_factory=list)
composite_score: float = 0.0
novelty_flag: str | None = None
llm_review: str | None = None
human_notes: str | None = None
pass_reached: int = 0
@dataclass
class Combination:
"""A generated combination of entities (one per dimension)."""
entities: list[Entity] = field(default_factory=list)
status: str = "pending" # pending → valid/blocked → scored → reviewed
block_reason: str | None = None
hash: str | None = None
id: int | None = None

View File

@@ -0,0 +1,26 @@
"""Domain and MetricBound dataclasses."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class MetricBound:
"""Per-metric weight and normalization bounds within a domain."""
metric_name: str
weight: float # 0.01.0
norm_min: float # Below this → score 0
norm_max: float # Above this → score 1
metric_id: int | None = None
@dataclass
class Domain:
"""A context frame that defines what 'good' means (e.g., urban_commuting)."""
name: str
description: str = ""
metric_bounds: list[MetricBound] = field(default_factory=list)
id: int | None = None

View File

@@ -0,0 +1,37 @@
"""Entity and Dependency dataclasses."""
from __future__ import annotations
from dataclasses import dataclass, field
@dataclass
class Dependency:
"""A single dependency/constraint on an entity.
Constraint types:
requires — this entity needs this condition to function
provides — this entity supplies this condition
range_min — numeric lower bound
range_max — numeric upper bound
excludes — this entity cannot coexist with this condition
"""
category: str # environment, force, material, physical, infrastructure
key: str
value: str
unit: str | None = None
constraint_type: str = "requires" # requires, provides, range_min, range_max, excludes
id: int | None = None
@dataclass
class Entity:
"""An attribute within a dimension (e.g., 'Bicycle' in 'platform')."""
name: str
dimension: str
description: str = ""
dependencies: list[Dependency] = field(default_factory=list)
id: int | None = None
dimension_id: int | None = None

View File

View File

@@ -0,0 +1,286 @@
"""Seed data for the transport example from the README."""
from __future__ import annotations
from physcom.models.entity import Entity, Dependency
from physcom.models.domain import Domain, MetricBound
# ── Platforms ───────────────────────────────────────────────────
PLATFORMS: list[Entity] = [
Entity(
name="Car",
dimension="platform",
description="Four-wheeled enclosed road vehicle",
dependencies=[
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "3000", "kg", "range_max"),
Dependency("physical", "mass_kg", "800", "kg", "range_min"),
Dependency("force", "force_required_watts", "15000", "watts", "range_min"),
Dependency("infrastructure", "road_network", "true", None, "requires"),
Dependency("environment", "medium", "ground", None, "requires"),
],
),
Entity(
name="Airplane",
dimension="platform",
description="Fixed-wing aircraft for atmospheric flight",
dependencies=[
Dependency("environment", "atmosphere", "standard", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "500", "kg", "range_min"),
Dependency("force", "force_required_watts", "100000", "watts", "range_min"),
Dependency("infrastructure", "runway", "true", None, "requires"),
Dependency("environment", "medium", "air", None, "requires"),
],
),
Entity(
name="Train",
dimension="platform",
description="Rail-guided vehicle",
dependencies=[
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "10000", "kg", "range_min"),
Dependency("force", "force_required_watts", "500000", "watts", "range_min"),
Dependency("infrastructure", "rail_network", "true", None, "requires"),
Dependency("environment", "medium", "ground", None, "requires"),
],
),
Entity(
name="Bicycle",
dimension="platform",
description="Two-wheeled human-scale vehicle",
dependencies=[
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "30", "kg", "range_max"),
Dependency("force", "force_required_watts", "75", "watts", "range_min"),
Dependency("infrastructure", "road_network", "true", None, "requires"),
Dependency("environment", "medium", "ground", None, "requires"),
],
),
Entity(
name="Walking",
dimension="platform",
description="Bipedal locomotion",
dependencies=[
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "150", "kg", "range_max"),
Dependency("force", "force_required_watts", "75", "watts", "range_min"),
Dependency("infrastructure", "fuel_infrastructure", "none", None, "requires"),
Dependency("environment", "medium", "ground", None, "requires"),
],
),
Entity(
name="Wheelchair",
dimension="platform",
description="Wheeled chair for seated mobility",
dependencies=[
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "200", "kg", "range_max"),
Dependency("force", "force_required_watts", "50", "watts", "range_min"),
Dependency("infrastructure", "road_network", "true", None, "requires"),
Dependency("environment", "medium", "ground", None, "requires"),
],
),
Entity(
name="Scooter",
dimension="platform",
description="Small two-wheeled standing vehicle",
dependencies=[
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("environment", "gravity", "true", None, "requires"),
Dependency("physical", "mass_kg", "50", "kg", "range_max"),
Dependency("force", "force_required_watts", "200", "watts", "range_min"),
Dependency("infrastructure", "road_network", "true", None, "requires"),
Dependency("environment", "medium", "ground", None, "requires"),
],
),
Entity(
name="Spaceship",
dimension="platform",
description="Vehicle designed for space travel",
dependencies=[
Dependency("environment", "atmosphere", "vacuum_or_thin", None, "requires"),
Dependency("physical", "mass_kg", "5000", "kg", "range_min"),
Dependency("force", "force_required_watts", "1000000", "watts", "range_min"),
Dependency("infrastructure", "launch_facility", "true", None, "requires"),
Dependency("environment", "medium", "space", None, "requires"),
],
),
Entity(
name="Teleporter",
dimension="platform",
description="Hypothetical matter transmission device",
dependencies=[
Dependency("physical", "mass_kg", "0", "kg", "range_min"),
Dependency("force", "force_required_watts", "1000000000", "watts", "range_min"),
Dependency("infrastructure", "teleport_network", "true", None, "requires"),
],
),
]
# ── Power Sources ───────────────────────────────────────────────
POWER_SOURCES: list[Entity] = [
Entity(
name="Internal Combustion Engine",
dimension="power_source",
description="Gas/petrol-powered engine",
dependencies=[
Dependency("force", "force_output_watts", "100000", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "gas_station", None, "requires"),
Dependency("environment", "atmosphere", "standard", None, "requires"),
Dependency("physical", "mass_kg", "50", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_continuous", None, "provides"),
],
),
Entity(
name="Lithium Ion Battery",
dimension="power_source",
description="Rechargeable electric battery pack",
dependencies=[
Dependency("force", "force_output_watts", "50000", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "charging_station", None, "requires"),
Dependency("physical", "mass_kg", "10", "kg", "range_min"),
Dependency("force", "thrust_profile", "moderate_continuous", None, "provides"),
],
),
Entity(
name="Hydrogen Combustion Engine",
dimension="power_source",
description="Hydrogen fuel cell or combustion engine",
dependencies=[
Dependency("force", "force_output_watts", "80000", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "hydrogen_station", None, "requires"),
Dependency("physical", "mass_kg", "30", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_continuous", None, "provides"),
],
),
Entity(
name="Human Pedalling",
dimension="power_source",
description="Human-powered via pedalling mechanism",
dependencies=[
Dependency("force", "force_output_watts", "75", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "none", None, "requires"),
Dependency("physical", "mass_kg", "0", "kg", "range_min"),
Dependency("force", "thrust_profile", "low_continuous", None, "provides"),
],
),
Entity(
name="Modular Nuclear Reactor",
dimension="power_source",
description="Small modular nuclear fission reactor",
dependencies=[
Dependency("force", "force_output_watts", "50000000", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "nuclear_fuel", None, "requires"),
Dependency("physical", "mass_kg", "2000", "kg", "range_min"),
Dependency("force", "thrust_profile", "extreme_continuous", None, "provides"),
Dependency("material", "radiation_shielding", "true", None, "requires"),
],
),
Entity(
name="Coal Steam Engine",
dimension="power_source",
description="Coal-fired boiler with steam locomotion",
dependencies=[
Dependency("force", "force_output_watts", "200000", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "coal_supply", None, "requires"),
Dependency("environment", "atmosphere", "standard", None, "requires"),
Dependency("physical", "mass_kg", "500", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_continuous", None, "provides"),
],
),
Entity(
name="Solar Sail",
dimension="power_source",
description="Propulsion via radiation pressure from a star",
dependencies=[
Dependency("environment", "atmosphere", "vacuum_or_thin", None, "requires"),
Dependency("environment", "star_proximity", "true", None, "requires"),
Dependency("physical", "surface_area_m2", "100", "m^2", "range_min"),
Dependency("force", "force_output_watts", "1", "watts", "provides"),
Dependency("force", "thrust_profile", "continuous_low", None, "provides"),
Dependency("environment", "medium", "space", None, "requires"),
],
),
Entity(
name="Cannonfire Recoil",
dimension="power_source",
description="Propulsion via sequential cannon blasts",
dependencies=[
Dependency("force", "force_output_watts", "500000", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "ammunition", None, "requires"),
Dependency("physical", "mass_kg", "100", "kg", "range_min"),
Dependency("force", "thrust_profile", "high_burst", None, "provides"),
],
),
Entity(
name="Pushed by a Friend",
dimension="power_source",
description="External human pushing the vehicle",
dependencies=[
Dependency("force", "force_output_watts", "50", "watts", "provides"),
Dependency("infrastructure", "fuel_infrastructure", "none", None, "requires"),
Dependency("physical", "mass_kg", "0", "kg", "range_min"),
Dependency("environment", "ground_surface", "true", None, "requires"),
Dependency("force", "thrust_profile", "low_continuous", None, "provides"),
],
),
]
# ── Domains ─────────────────────────────────────────────────────
URBAN_COMMUTING = Domain(
name="urban_commuting",
description="Daily travel within a city, 1-50km range",
metric_bounds=[
MetricBound("speed", weight=0.25, norm_min=5, norm_max=120),
MetricBound("cost_efficiency", weight=0.25, norm_min=0.01, norm_max=2.0),
MetricBound("safety", weight=0.25, norm_min=0.0, norm_max=1.0),
MetricBound("availability", weight=0.15, norm_min=0.0, norm_max=1.0),
MetricBound("range_fuel", weight=0.10, norm_min=5, norm_max=500),
],
)
INTERPLANETARY = Domain(
name="interplanetary_travel",
description="Travel between planets within a solar system",
metric_bounds=[
MetricBound("speed", weight=0.30, norm_min=1000, norm_max=300000),
MetricBound("range_fuel", weight=0.30, norm_min=1e6, norm_max=1e10),
MetricBound("safety", weight=0.20, norm_min=0.0, norm_max=1.0),
MetricBound("cost_efficiency", weight=0.10, norm_min=1e3, norm_max=1e9),
MetricBound("range_degradation", weight=0.10, norm_min=100, norm_max=36500),
],
)
def load_transport_seed(repo) -> dict:
"""Load all transport seed data into the repository. Returns counts."""
from physcom.db.repository import Repository
repo: Repository
counts = {"platforms": 0, "power_sources": 0, "domains": 0}
for entity in PLATFORMS:
repo.add_entity(entity)
counts["platforms"] += 1
for entity in POWER_SOURCES:
repo.add_entity(entity)
counts["power_sources"] += 1
repo.add_domain(URBAN_COMMUTING)
repo.add_domain(INTERPLANETARY)
counts["domains"] = 2
return counts