diff --git a/src/physcom/cli.py b/src/physcom/cli.py index d7d3a35..83708fb 100644 --- a/src/physcom/cli.py +++ b/src/physcom/cli.py @@ -263,5 +263,54 @@ def export(ctx, domain_name, fmt, top, output): click.echo(f"Unsupported format: {fmt}") +@main.group() +def snapshot(): + """Export/import database snapshots.""" + pass + + +@snapshot.command("export") +@click.argument("file", type=click.Path()) +@click.option("--pretty/--no-pretty", default=True, help="Pretty-print JSON") +@click.pass_context +def snapshot_export(ctx, file, pretty): + """Export full database state to a JSON file.""" + import json + from physcom.snapshot import export_snapshot + + repo = _get_repo(ctx.obj["db"]) + data = export_snapshot(repo) + indent = 2 if pretty else None + Path(file).write_text(json.dumps(data, indent=indent), encoding="utf-8") + click.echo(f"Exported snapshot to {file}") + click.echo(f" {len(data['dimensions'])} dimensions, " + f"{len(data['entities'])} entities, " + f"{len(data['domains'])} domains, " + f"{len(data['combinations'])} combinations, " + f"{len(data['results'])} results, " + f"{len(data['scores'])} scores") + + +@snapshot.command("load") +@click.argument("file", type=click.Path(exists=True)) +@click.option("--clear", is_flag=True, help="Wipe database before importing") +@click.pass_context +def snapshot_load(ctx, file, clear): + """Import a JSON snapshot into the database.""" + import json + from physcom.snapshot import import_snapshot + + repo = _get_repo(ctx.obj["db"]) + data = json.loads(Path(file).read_text(encoding="utf-8")) + counts = import_snapshot(repo, data, clear=clear) + click.echo(f"Imported snapshot from {file}" + (" (cleared DB first)" if clear else "")) + click.echo(f" {counts['dimensions']} dimensions, " + f"{counts['entities']} entities, " + f"{counts['domains']} domains, " + f"{counts['combinations']} combinations, " + f"{counts['results']} results, " + f"{counts['scores']} scores") + + if __name__ == "__main__": main() diff --git a/src/physcom/engine/constraint_resolver.py b/src/physcom/engine/constraint_resolver.py index 3b2a120..d6d101c 100644 --- a/src/physcom/engine/constraint_resolver.py +++ b/src/physcom/engine/constraint_resolver.py @@ -16,6 +16,23 @@ MUTEX_VALUES: dict[str, list[set[str]]] = { "medium": [{"ground"}, {"water"}, {"air"}, {"space"}], } +# Conditions assumed always available (don't need an explicit provides) +AMBIENT_CONDITIONS: set[tuple[str, str]] = { + ("ground_surface", "true"), + ("gravity", "true"), + ("star_proximity", "true"), +} + +# Per-category behavior for unmet requirements: +# "block" = hard violation, "warn" = conditional warning, "skip" = ignore +CATEGORY_SEVERITY: dict[str, str] = { + "energy": "block", + "infrastructure": "skip", +} + +# For provides-vs-range_min: deficit > this ratio = hard block, else warning +DEFICIT_THRESHOLD: float = 0.25 + @dataclass class ConstraintResult: @@ -29,8 +46,19 @@ class ConstraintResult: class ConstraintResolver: """Checks a Combination's entities for dependency contradictions.""" - def __init__(self, mutex_registry: dict[str, list[set[str]]] | None = None) -> None: + def __init__( + self, + mutex_registry=None, + ambient_conditions=None, + category_severity=None, + deficit_threshold=None, + ) -> None: self.mutex = mutex_registry or MUTEX_VALUES + self.ambient = ambient_conditions or AMBIENT_CONDITIONS + self.category_severity = category_severity or CATEGORY_SEVERITY + self.deficit_threshold = ( + deficit_threshold if deficit_threshold is not None else DEFICIT_THRESHOLD + ) def resolve(self, combination: Combination) -> ConstraintResult: result = ConstraintResult() @@ -42,7 +70,7 @@ class ConstraintResolver: self._check_requires_vs_excludes(all_deps, result) self._check_mutual_exclusion(all_deps, result) self._check_range_incompatibility(all_deps, result) - self._check_energy_density(combination, result) + self._check_provides_vs_range(combination, result) self._check_unmet_requirements(all_deps, result) if result.violations: @@ -125,39 +153,39 @@ class ConstraintResolver: f"but {max_name} limits {key} <= {max_val}" ) - def _check_energy_density( + def _check_provides_vs_range( self, combination: Combination, result: ConstraintResult ) -> None: - """Rule 6: If power source energy density << platform minimum → warn/block. - - Uses a 25% threshold: below 25% of required → hard block (> 4x deficit). - """ - density_provided: list[tuple[str, float]] = [] - density_required: list[tuple[str, float]] = [] + """Generic: provides(key, N) < range_min(key, M) → block/warn.""" + provided: dict[str, list[tuple[str, float]]] = {} + required: dict[str, list[tuple[str, float]]] = {} for entity in combination.entities: for dep in entity.dependencies: - if dep.key == "energy_density" and dep.constraint_type == "provides": - density_provided.append((entity.name, float(dep.value))) - elif dep.key == "energy_density" and dep.constraint_type == "range_min": - density_required.append((entity.name, float(dep.value))) + try: + val = float(dep.value) + except (ValueError, TypeError): + continue + if dep.constraint_type == "provides": + provided.setdefault(dep.key, []).append((entity.name, val)) + elif dep.constraint_type == "range_min": + required.setdefault(dep.key, []).append((entity.name, val)) - for req_name, req_density in density_required: - if not density_provided: - continue # No stored energy source in this combo — skip check - for prov_name, prov_density in density_provided: - if prov_density < req_density * 0.25: - result.violations.append( - f"{prov_name} provides {prov_density:.0f} J/kg but " - f"{req_name} requires {req_density:.0f} J/kg " - f"(energy density deficit > 4x)" - ) - elif prov_density < req_density: - result.warnings.append( - f"{prov_name} provides {prov_density:.0f} J/kg but " - f"{req_name} requires {req_density:.0f} J/kg " - f"(under-density)" - ) + for key in set(provided) & set(required): + for req_name, req_val in required[key]: + for prov_name, prov_val in provided[key]: + if prov_val < req_val * self.deficit_threshold: + result.violations.append( + f"{prov_name} provides {key}={prov_val:.0f} but " + f"{req_name} requires {key}>={req_val:.0f} " + f"(deficit > {int(1 / self.deficit_threshold)}x)" + ) + elif prov_val < req_val: + result.warnings.append( + f"{prov_name} provides {key}={prov_val:.0f} but " + f"{req_name} requires {key}>={req_val:.0f} " + f"(under-provision)" + ) def check_domain_constraints( self, combination: Combination, constraints: list[DomainConstraint] @@ -181,31 +209,22 @@ class ConstraintResolver: def _check_unmet_requirements( self, all_deps: list[tuple[str, Dependency]], result: ConstraintResult ) -> None: - """Rule 5: Required condition not provided by any entity → conditional. - - Energy-category requirements (e.g. energy_form) are hard blocks — - you cannot power an actuator with an incompatible energy source. - """ + """Rule 5: Required condition not provided by any entity → conditional.""" provides = {(d.key, d.value) for _, d in all_deps if d.constraint_type == "provides"} - # Ambient conditions that don't need to be explicitly provided - ambient = { - ("ground_surface", "true"), - ("gravity", "true"), - ("star_proximity", "true"), - } for name, dep in all_deps: if dep.constraint_type != "requires": continue - if dep.category == "infrastructure": - continue # Infrastructure is external, not checked here + severity = self.category_severity.get(dep.category, "warn") + if severity == "skip": + continue key_val = (dep.key, dep.value) - if key_val not in provides and key_val not in ambient: + if key_val not in provides and key_val not in self.ambient: msg = ( f"{name} requires {dep.key}={dep.value} " f"but no entity in this combination provides it" ) - if dep.category == "energy": + if severity == "block": result.violations.append(msg) else: result.warnings.append(msg) diff --git a/src/physcom/snapshot.py b/src/physcom/snapshot.py new file mode 100644 index 0000000..df0483f --- /dev/null +++ b/src/physcom/snapshot.py @@ -0,0 +1,312 @@ +"""Database snapshot export/import — JSON-based, uses natural keys.""" + +from __future__ import annotations + +from datetime import datetime, timezone + +from physcom.db.repository import Repository +from physcom.models.entity import Entity, Dependency +from physcom.models.domain import Domain, DomainConstraint, MetricBound +from physcom.models.combination import Combination + + +def export_snapshot(repo: Repository) -> dict: + """Export the full database state to a portable dict (natural keys, no numeric IDs).""" + dimensions = repo.list_dimensions() + entities = repo.list_entities() + domains = repo.list_domains() + combinations = repo.list_combinations() + + # Build entity id → (dimension, name) lookup + eid_to_ref: dict[int, list[str]] = { + e.id: [e.dimension, e.name] for e in entities + } + + # Build metric id → name lookup + metric_rows = repo.conn.execute("SELECT id, name FROM metrics").fetchall() + mid_to_name: dict[int, str] = {r["id"]: r["name"] for r in metric_rows} + + # Export dimensions + dim_list = [ + {"name": d["name"], "description": d.get("description") or ""} + for d in dimensions + ] + + # Export entities with dependencies + entity_list = [] + for e in entities: + deps = [] + for dep in e.dependencies: + deps.append({ + "category": dep.category, + "key": dep.key, + "value": dep.value, + "unit": dep.unit, + "constraint_type": dep.constraint_type, + }) + entity_list.append({ + "name": e.name, + "dimension": e.dimension, + "description": e.description, + "dependencies": deps, + }) + + # Export domains with metric_bounds and constraints + domain_list = [] + for d in domains: + mbs = [] + for mb in d.metric_bounds: + mbs.append({ + "metric_name": mb.metric_name, + "weight": mb.weight, + "norm_min": mb.norm_min, + "norm_max": mb.norm_max, + "unit": mb.unit, + "lower_is_better": mb.lower_is_better, + }) + dcs = [] + for dc in d.constraints: + dcs.append({ + "key": dc.key, + "allowed_values": dc.allowed_values, + }) + domain_list.append({ + "name": d.name, + "description": d.description, + "metric_bounds": mbs, + "constraints": dcs, + }) + + # Export combinations + combo_list = [] + for c in combinations: + entity_refs = [eid_to_ref[e.id] for e in c.entities if e.id in eid_to_ref] + combo_list.append({ + "entity_refs": entity_refs, + "status": c.status, + "block_reason": c.block_reason, + }) + + # Export results and scores (per combo per domain) + result_list = [] + score_list = [] + domain_id_to_name: dict[int, str] = {d.id: d.name for d in domains} + + for c in combinations: + for res in repo.get_results_for_combination(c.id): + domain_name = domain_id_to_name.get(res["domain_id"]) + if not domain_name: + continue + entity_refs = [eid_to_ref[e.id] for e in c.entities if e.id in eid_to_ref] + result_list.append({ + "entity_refs": entity_refs, + "domain": domain_name, + "composite_score": res["composite_score"], + "novelty_flag": res.get("novelty_flag"), + "llm_review": res.get("llm_review"), + "human_notes": res.get("human_notes"), + "pass_reached": res["pass_reached"], + "domain_block_reason": res.get("domain_block_reason"), + }) + + for sc in repo.get_combination_scores(c.id, res["domain_id"]): + score_list.append({ + "entity_refs": entity_refs, + "domain": domain_name, + "metric": mid_to_name.get(sc["metric_id"], ""), + "raw_value": sc["raw_value"], + "normalized_score": sc["normalized_score"], + "estimation_method": sc.get("estimation_method"), + "confidence": sc.get("confidence"), + }) + + return { + "version": 1, + "exported_at": datetime.now(timezone.utc).isoformat(), + "dimensions": dim_list, + "entities": entity_list, + "domains": domain_list, + "combinations": combo_list, + "results": result_list, + "scores": score_list, + } + + +def import_snapshot(repo: Repository, data: dict, *, clear: bool = False) -> dict: + """Import a snapshot dict into the database. + + Args: + repo: Target repository. + data: Snapshot dict (as produced by export_snapshot). + clear: If True, wipe all existing data before importing. + + Returns: + Counts dict with keys: dimensions, entities, domains, combinations, results, scores. + """ + if clear: + repo.clear_all() + + counts = { + "dimensions": 0, + "entities": 0, + "domains": 0, + "combinations": 0, + "results": 0, + "scores": 0, + } + + # 1. Dimensions + for d in data.get("dimensions", []): + repo.ensure_dimension(d["name"], d.get("description", "")) + counts["dimensions"] += 1 + + # 2. Entities (with dependencies) + for e_data in data.get("entities", []): + deps = [ + Dependency( + category=dep["category"], + key=dep["key"], + value=dep["value"], + unit=dep.get("unit"), + constraint_type=dep.get("constraint_type", "requires"), + ) + for dep in e_data.get("dependencies", []) + ] + existing = repo.get_entity_by_name(e_data["dimension"], e_data["name"]) + if existing: + repo.replace_entity_dependencies(existing.id, deps) + else: + entity = Entity( + name=e_data["name"], + dimension=e_data["dimension"], + description=e_data.get("description", ""), + dependencies=deps, + ) + repo.add_entity(entity) + counts["entities"] += 1 + + # 3. Domains (with metric_bounds and constraints) + for d_data in data.get("domains", []): + existing = repo.get_domain(d_data["name"]) + if not existing: + mbs = [ + MetricBound( + metric_name=mb["metric_name"], + weight=mb["weight"], + norm_min=mb["norm_min"], + norm_max=mb["norm_max"], + unit=mb.get("unit", ""), + lower_is_better=mb.get("lower_is_better", False), + ) + for mb in d_data.get("metric_bounds", []) + ] + dcs = [ + DomainConstraint( + key=dc["key"], + allowed_values=dc.get("allowed_values", []), + ) + for dc in d_data.get("constraints", []) + ] + domain = Domain( + name=d_data["name"], + description=d_data.get("description", ""), + metric_bounds=mbs, + constraints=dcs, + ) + repo.add_domain(domain) + counts["domains"] += 1 + + # Build lookup: (dimension, name) → entity id + all_entities = repo.list_entities() + ref_to_eid: dict[tuple[str, str], int] = { + (e.dimension, e.name): e.id for e in all_entities + } + + # Build lookup: domain name → id + all_domains = repo.list_domains() + domain_name_to_id: dict[str, int] = {d.name: d.id for d in all_domains} + + # Build lookup: metric name → id + metric_rows = repo.conn.execute("SELECT id, name FROM metrics").fetchall() + metric_name_to_id: dict[str, int] = {r["name"]: r["id"] for r in metric_rows} + + # 4. Combinations + for c_data in data.get("combinations", []): + entity_refs = c_data.get("entity_refs", []) + entity_objs = [] + skip = False + for ref in entity_refs: + eid = ref_to_eid.get((ref[0], ref[1])) + if eid is None: + skip = True + break + entity_objs.append(Entity(id=eid, name=ref[1], dimension=ref[0])) + if skip: + continue + combo = Combination( + entities=entity_objs, + status=c_data.get("status", "pending"), + block_reason=c_data.get("block_reason"), + ) + repo.save_combination(combo) + counts["combinations"] += 1 + + # 5. Results + for r_data in data.get("results", []): + entity_refs = r_data.get("entity_refs", []) + entity_objs = [] + skip = False + for ref in entity_refs: + eid = ref_to_eid.get((ref[0], ref[1])) + if eid is None: + skip = True + break + entity_objs.append(Entity(id=eid, name=ref[1], dimension=ref[0])) + if skip: + continue + combo = Combination(entities=entity_objs) + combo = repo.save_combination(combo) + domain_id = domain_name_to_id.get(r_data["domain"]) + if combo.id is None or domain_id is None: + continue + repo.save_result( + combo.id, + domain_id, + composite_score=r_data.get("composite_score", 0.0), + pass_reached=r_data.get("pass_reached", 0), + novelty_flag=r_data.get("novelty_flag"), + llm_review=r_data.get("llm_review"), + human_notes=r_data.get("human_notes"), + domain_block_reason=r_data.get("domain_block_reason"), + ) + counts["results"] += 1 + + # 6. Scores + for s_data in data.get("scores", []): + entity_refs = s_data.get("entity_refs", []) + entity_objs = [] + skip = False + for ref in entity_refs: + eid = ref_to_eid.get((ref[0], ref[1])) + if eid is None: + skip = True + break + entity_objs.append(Entity(id=eid, name=ref[1], dimension=ref[0])) + if skip: + continue + combo = Combination(entities=entity_objs) + combo = repo.save_combination(combo) + domain_id = domain_name_to_id.get(s_data["domain"]) + metric_id = metric_name_to_id.get(s_data.get("metric", "")) + if combo.id is None or domain_id is None or metric_id is None: + continue + repo.save_scores(combo.id, domain_id, [{ + "metric_id": metric_id, + "raw_value": s_data.get("raw_value"), + "normalized_score": s_data.get("normalized_score"), + "estimation_method": s_data.get("estimation_method"), + "confidence": s_data.get("confidence"), + }]) + counts["scores"] += 1 + + return counts diff --git a/src/physcom_web/routes/admin.py b/src/physcom_web/routes/admin.py index 426d3df..6907726 100644 --- a/src/physcom_web/routes/admin.py +++ b/src/physcom_web/routes/admin.py @@ -2,9 +2,13 @@ from __future__ import annotations -from flask import Blueprint, flash, redirect, render_template, url_for +import json + +from flask import Blueprint, flash, redirect, render_template, request, url_for +from flask import Response from physcom.seed.transport_example import load_transport_seed +from physcom.snapshot import export_snapshot, import_snapshot from physcom_web.app import get_repo bp = Blueprint("admin", __name__, url_prefix="/admin") @@ -38,14 +42,50 @@ def reseed(): return redirect(url_for("admin.admin_index")) -@bp.route("/wipe-and-reseed", methods=["POST"]) -def wipe_and_reseed(): +@bp.route("/wipe", methods=["POST"]) +def wipe(): repo = get_repo() repo.clear_all() - counts = load_transport_seed(repo) - total = counts["platforms"] + counts["actuators"] + counts["energy_storages"] + flash("Wiped all data.", "success") + return redirect(url_for("admin.admin_index")) + + +@bp.route("/snapshot/export") +def snapshot_export(): + repo = get_repo() + data = export_snapshot(repo) + body = json.dumps(data, indent=2) + return Response( + body, + mimetype="application/json", + headers={"Content-Disposition": "attachment; filename=physcom_snapshot.json"}, + ) + + +@bp.route("/snapshot/import", methods=["POST"]) +def snapshot_import(): + repo = get_repo() + clear = "clear" in request.form + + file = request.files.get("file") + if not file or not file.filename: + flash("No file selected.", "error") + return redirect(url_for("admin.admin_index")) + + try: + raw = file.read().decode("utf-8") + data = json.loads(raw) + except (UnicodeDecodeError, json.JSONDecodeError) as exc: + flash(f"Invalid JSON file: {exc}", "error") + return redirect(url_for("admin.admin_index")) + + counts = import_snapshot(repo, data, clear=clear) + mode = "Cleared DB and imported" if clear else "Merged" flash( - f"Wiped all data and reseeded — {total} entities, {counts['domains']} domains.", + f"{mode} snapshot — {counts['dimensions']} dimensions, " + f"{counts['entities']} entities, {counts['domains']} domains, " + f"{counts['combinations']} combinations, {counts['results']} results, " + f"{counts['scores']} scores.", "success", ) return redirect(url_for("admin.admin_index")) diff --git a/src/physcom_web/templates/admin/index.html b/src/physcom_web/templates/admin/index.html index 8c7f1c8..9746f34 100644 --- a/src/physcom_web/templates/admin/index.html +++ b/src/physcom_web/templates/admin/index.html @@ -37,14 +37,49 @@
-

Wipe & Reseed

+

Wipe All Data

Delete all data — entities, domains, combinations, - pipeline runs — then reload seed data from scratch. + pipeline runs. Use Reseed afterwards to restore seed data.

-
- + + +
+
+ + +

Snapshots

+
+
+

Export Snapshot

+

+ Download the full database state as a JSON file — entities, + domains, combinations, results, and scores. +

+
+ Download JSON +
+
+ +
+

Import Snapshot

+

+ Upload a previously exported JSON snapshot. Merge adds new data + alongside existing; clear wipes everything first. +

+
+
+ +
+
+ + +
diff --git a/tests/test_constraint_resolver.py b/tests/test_constraint_resolver.py index eec4975..ce8e1c7 100644 --- a/tests/test_constraint_resolver.py +++ b/tests/test_constraint_resolver.py @@ -97,7 +97,7 @@ def test_energy_density_deficit_blocks(): combo = Combination(entities=[platform, storage]) result = resolver.resolve(combo) assert result.status == "p1_fail" - assert any("energy density deficit" in v for v in result.violations) + assert any("deficit" in v for v in result.violations) def test_energy_density_under_density_warning(): @@ -118,7 +118,7 @@ def test_energy_density_under_density_warning(): combo = Combination(entities=[platform, storage]) result = resolver.resolve(combo) assert result.status != "p1_fail" - assert any("under-density" in w for w in result.warnings) + assert any("under-provision" in w for w in result.warnings) def test_energy_density_no_constraint_if_no_provider(): @@ -139,7 +139,7 @@ def test_energy_density_no_constraint_if_no_provider(): resolver = ConstraintResolver() combo = Combination(entities=[platform, actuator]) result = resolver.resolve(combo) - density_violations = [v for v in result.violations if "energy density" in v] + density_violations = [v for v in result.violations if "energy_density" in v] assert len(density_violations) == 0 diff --git a/tests/test_snapshot.py b/tests/test_snapshot.py new file mode 100644 index 0000000..c8612ae --- /dev/null +++ b/tests/test_snapshot.py @@ -0,0 +1,207 @@ +"""Tests for snapshot export/import.""" + +from __future__ import annotations + +import pytest + +from physcom.db.schema import init_db +from physcom.db.repository import Repository +from physcom.models.entity import Entity, Dependency +from physcom.models.domain import Domain, DomainConstraint, MetricBound +from physcom.models.combination import Combination +from physcom.snapshot import export_snapshot, import_snapshot + + +def test_export_roundtrip(seeded_repo, tmp_path): + """Export from seeded_repo, import into fresh repo, verify counts match.""" + data = export_snapshot(seeded_repo) + + # Fresh repo + conn = init_db(tmp_path / "fresh.db") + fresh = Repository(conn) + + counts = import_snapshot(fresh, data, clear=True) + + assert counts["dimensions"] == len(data["dimensions"]) + assert counts["entities"] == len(data["entities"]) + assert counts["domains"] == len(data["domains"]) + + # Verify actual DB contents match + assert len(fresh.list_dimensions()) == len(seeded_repo.list_dimensions()) + assert len(fresh.list_entities()) == len(seeded_repo.list_entities()) + assert len(fresh.list_domains()) == len(seeded_repo.list_domains()) + + +def test_export_contains_entities(seeded_repo): + """Exported data has expected entity structure.""" + data = export_snapshot(seeded_repo) + + assert data["version"] == 1 + assert "exported_at" in data + + entity_names = {e["name"] for e in data["entities"]} + # Transport seed should include at least these + assert "Bicycle" in entity_names or len(entity_names) > 0 + + # Each entity has the right keys + for e in data["entities"]: + assert "name" in e + assert "dimension" in e + assert "description" in e + assert "dependencies" in e + assert isinstance(e["dependencies"], list) + + +def test_export_contains_domains(seeded_repo): + """Exported domains have metric_bounds and constraints.""" + data = export_snapshot(seeded_repo) + + assert len(data["domains"]) > 0 + for d in data["domains"]: + assert "name" in d + assert "metric_bounds" in d + assert "constraints" in d + for mb in d["metric_bounds"]: + assert "metric_name" in mb + assert "weight" in mb + assert "norm_min" in mb + assert "norm_max" in mb + + +def test_import_clear_mode(seeded_repo, tmp_path): + """Import with clear=True wipes old data and loads new.""" + # Export seeded data + data = export_snapshot(seeded_repo) + + # Create a second repo with its own data + conn = init_db(tmp_path / "other.db") + other = Repository(conn) + other.add_entity(Entity( + name="Zeppelin", + dimension="platform", + description="Lighter-than-air craft", + )) + + assert other.get_entity_by_name("platform", "Zeppelin") is not None + + # Import with clear — old data should be gone + import_snapshot(other, data, clear=True) + + assert other.get_entity_by_name("platform", "Zeppelin") is None + assert len(other.list_entities()) == len(data["entities"]) + + +def test_import_merge_replaces_deps(repo): + """Merge import replaces dependencies on existing entities.""" + # Add an entity with one dep + entity = Entity( + name="Widget", + dimension="gadget", + description="A test widget", + dependencies=[ + Dependency("physical", "mass", "10", "kg", "range_min"), + ], + ) + repo.add_entity(entity) + assert len(repo.get_entity_by_name("gadget", "Widget").dependencies) == 1 + + # Build snapshot with different deps for that entity + data = { + "version": 1, + "exported_at": "2026-01-01T00:00:00+00:00", + "dimensions": [{"name": "gadget", "description": ""}], + "entities": [{ + "name": "Widget", + "dimension": "gadget", + "description": "A test widget", + "dependencies": [ + {"category": "physical", "key": "mass", "value": "50", + "unit": "kg", "constraint_type": "range_min"}, + {"category": "energy", "key": "energy_form", "value": "electric", + "unit": None, "constraint_type": "requires"}, + ], + }], + "domains": [], + "combinations": [], + "results": [], + "scores": [], + } + + counts = import_snapshot(repo, data) + + widget = repo.get_entity_by_name("gadget", "Widget") + assert len(widget.dependencies) == 2 + dep_keys = {(d.category, d.key) for d in widget.dependencies} + assert ("physical", "mass") in dep_keys + assert ("energy", "energy_form") in dep_keys + + +def test_import_with_combinations(seeded_repo, tmp_path): + """Export combos+results from a pipeline run, import into fresh DB.""" + # Run pipeline to generate combos and results + from physcom.engine.constraint_resolver import ConstraintResolver + from physcom.engine.scorer import Scorer + from physcom.engine.pipeline import Pipeline + from physcom.llm.providers.mock import MockLLMProvider + + domain = seeded_repo.get_domain("urban_commuting") + resolver = ConstraintResolver() + scorer = Scorer(domain) + pipeline = Pipeline(seeded_repo, resolver, scorer, llm=MockLLMProvider()) + pipeline.run(domain, ["platform", "actuator", "energy_storage"], + score_threshold=0.0, passes=[1, 2, 3]) + + # Export + data = export_snapshot(seeded_repo) + assert len(data["combinations"]) > 0 + assert len(data["results"]) > 0 + + # Import into fresh DB + conn = init_db(tmp_path / "fresh.db") + fresh = Repository(conn) + counts = import_snapshot(fresh, data, clear=True) + + assert counts["combinations"] > 0 + assert counts["results"] > 0 + + # Verify some results are present in the fresh DB + fresh_combos = fresh.list_combinations() + assert len(fresh_combos) == len(data["combinations"]) + + +def test_import_merge_skips_existing_domain(repo): + """Merge import skips domains that already exist.""" + domain = Domain( + name="test_domain", + description="Original description", + metric_bounds=[ + MetricBound("speed", weight=1.0, norm_min=0, norm_max=100), + ], + ) + repo.add_domain(domain) + + data = { + "version": 1, + "exported_at": "2026-01-01T00:00:00+00:00", + "dimensions": [], + "entities": [], + "domains": [{ + "name": "test_domain", + "description": "Updated description", + "metric_bounds": [ + {"metric_name": "speed", "weight": 0.5, "norm_min": 0, + "norm_max": 200, "unit": "", "lower_is_better": False}, + ], + "constraints": [], + }], + "combinations": [], + "results": [], + "scores": [], + } + + import_snapshot(repo, data) + + # Domain should keep original description (merge skips existing) + d = repo.get_domain("test_domain") + assert d.description == "Original description" + assert d.metric_bounds[0].weight == 1.0