"""Tests for snapshot export/import.""" from __future__ import annotations import pytest from physcom.db.schema import init_db from physcom.db.repository import Repository from physcom.models.entity import Entity, Dependency from physcom.models.domain import Domain, DomainConstraint, MetricBound from physcom.models.combination import Combination from physcom.snapshot import export_snapshot, import_snapshot def test_export_roundtrip(seeded_repo, tmp_path): """Export from seeded_repo, import into fresh repo, verify counts match.""" data = export_snapshot(seeded_repo) # Fresh repo conn = init_db(tmp_path / "fresh.db") fresh = Repository(conn) counts = import_snapshot(fresh, data, clear=True) assert counts["dimensions"] == len(data["dimensions"]) assert counts["entities"] == len(data["entities"]) assert counts["domains"] == len(data["domains"]) # Verify actual DB contents match assert len(fresh.list_dimensions()) == len(seeded_repo.list_dimensions()) assert len(fresh.list_entities()) == len(seeded_repo.list_entities()) assert len(fresh.list_domains()) == len(seeded_repo.list_domains()) def test_export_contains_entities(seeded_repo): """Exported data has expected entity structure.""" data = export_snapshot(seeded_repo) assert data["version"] == 1 assert "exported_at" in data entity_names = {e["name"] for e in data["entities"]} # Transport seed should include at least these assert "Bicycle" in entity_names or len(entity_names) > 0 # Each entity has the right keys for e in data["entities"]: assert "name" in e assert "dimension" in e assert "description" in e assert "dependencies" in e assert isinstance(e["dependencies"], list) def test_export_contains_domains(seeded_repo): """Exported domains have metric_bounds and constraints.""" data = export_snapshot(seeded_repo) assert len(data["domains"]) > 0 for d in data["domains"]: assert "name" in d assert "metric_bounds" in d assert "constraints" in d for mb in d["metric_bounds"]: assert "metric_name" in mb assert "weight" in mb assert "norm_min" in mb assert "norm_max" in mb def test_import_clear_mode(seeded_repo, tmp_path): """Import with clear=True wipes old data and loads new.""" # Export seeded data data = export_snapshot(seeded_repo) # Create a second repo with its own data conn = init_db(tmp_path / "other.db") other = Repository(conn) other.add_entity(Entity( name="Zeppelin", dimension="platform", description="Lighter-than-air craft", )) assert other.get_entity_by_name("platform", "Zeppelin") is not None # Import with clear — old data should be gone import_snapshot(other, data, clear=True) assert other.get_entity_by_name("platform", "Zeppelin") is None assert len(other.list_entities()) == len(data["entities"]) def test_import_merge_replaces_deps(repo): """Merge import replaces dependencies on existing entities.""" # Add an entity with one dep entity = Entity( name="Widget", dimension="gadget", description="A test widget", dependencies=[ Dependency("physical", "mass", "10", "kg", "range_min"), ], ) repo.add_entity(entity) assert len(repo.get_entity_by_name("gadget", "Widget").dependencies) == 1 # Build snapshot with different deps for that entity data = { "version": 1, "exported_at": "2026-01-01T00:00:00+00:00", "dimensions": [{"name": "gadget", "description": ""}], "entities": [{ "name": "Widget", "dimension": "gadget", "description": "A test widget", "dependencies": [ {"category": "physical", "key": "mass", "value": "50", "unit": "kg", "constraint_type": "range_min"}, {"category": "energy", "key": "energy_form", "value": "electric", "unit": None, "constraint_type": "requires"}, ], }], "domains": [], "combinations": [], "results": [], "scores": [], } counts = import_snapshot(repo, data) widget = repo.get_entity_by_name("gadget", "Widget") assert len(widget.dependencies) == 2 dep_keys = {(d.category, d.key) for d in widget.dependencies} assert ("physical", "mass") in dep_keys assert ("energy", "energy_form") in dep_keys def test_import_with_combinations(seeded_repo, tmp_path): """Export combos+results from a pipeline run, import into fresh DB.""" # Run pipeline to generate combos and results from physcom.engine.constraint_resolver import ConstraintResolver from physcom.engine.scorer import Scorer from physcom.engine.pipeline import Pipeline from physcom.llm.providers.mock import MockLLMProvider domain = seeded_repo.get_domain("urban_commuting") resolver = ConstraintResolver() scorer = Scorer(domain) pipeline = Pipeline(seeded_repo, resolver, scorer, llm=MockLLMProvider()) pipeline.run(domain, ["platform", "actuator", "energy_storage"], score_threshold=0.0, passes=[1, 2, 3]) # Export data = export_snapshot(seeded_repo) assert len(data["combinations"]) > 0 assert len(data["results"]) > 0 # Import into fresh DB conn = init_db(tmp_path / "fresh.db") fresh = Repository(conn) counts = import_snapshot(fresh, data, clear=True) assert counts["combinations"] > 0 assert counts["results"] > 0 # Verify some results are present in the fresh DB fresh_combos = fresh.list_combinations() assert len(fresh_combos) == len(data["combinations"]) def test_import_merge_skips_existing_domain(repo): """Merge import skips domains that already exist.""" domain = Domain( name="test_domain", description="Original description", metric_bounds=[ MetricBound("speed", weight=1.0, norm_min=0, norm_max=100), ], ) repo.add_domain(domain) data = { "version": 1, "exported_at": "2026-01-01T00:00:00+00:00", "dimensions": [], "entities": [], "domains": [{ "name": "test_domain", "description": "Updated description", "metric_bounds": [ {"metric_name": "speed", "weight": 0.5, "norm_min": 0, "norm_max": 200, "unit": "", "lower_is_better": False}, ], "constraints": [], }], "combinations": [], "results": [], "scores": [], } import_snapshot(repo, data) # Domain should keep original description (merge skips existing) d = repo.get_domain("test_domain") assert d.description == "Original description" assert d.metric_bounds[0].weight == 1.0