| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485 |
- import pytest
- from app.models import AtlasAlias, AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
- from app.storage_service import AtlasStorageService, entity_iri
- @pytest.mark.anyio
- async def test_write_entity_uses_batch_insert():
- calls = []
- async def fake_call(tool, payload):
- calls.append((tool, payload))
- return {"ok": True}
- svc = AtlasStorageService(call_tool=fake_call)
- entity = AtlasEntity(
- atlas_id="atlas:0cqt90abcd123456",
- canonical_label="Donald Trump",
- canonical_description="45th and 47th U.S. President",
- entity_type="Person",
- aliases=[AtlasAlias(label="Donald Trump")],
- claims=[
- AtlasClaim(
- claim_id="clm_raw_ident_mid_/m/0cqt90",
- subject="atlas:0cqt90abcd123456",
- predicate="atlas:hasIdentifier",
- object=AtlasClaimObject(kind="identifier", id_type="mid", value="/m/0cqt90"),
- layer="raw",
- provenance=AtlasProvenance(source="google", retrieval_method="trends-resolution", confidence=0.9),
- )
- ],
- )
- result = await svc.write_entity(entity)
- assert result["status"] == "ok"
- assert calls[0][0] == "batch_insert"
- assert "ttl" in calls[0][1]
- @pytest.mark.anyio
- async def test_read_entity_claims_uses_sparql_query():
- calls = []
- async def fake_call(tool, payload):
- calls.append((tool, payload))
- return {"results": {"bindings": []}}
- svc = AtlasStorageService(call_tool=fake_call)
- result = await svc.read_entity_claims("atlas:0cqt90abcd123456")
- assert result["status"] == "ok"
- assert calls[0][0] == "sparql_query"
- assert entity_iri("atlas:0cqt90abcd123456") in calls[0][1]["query"]
- assert 'FILTER(?status = "active")' in calls[0][1]["query"]
- @pytest.mark.anyio
- async def test_read_entity_claims_include_superseded_removes_filter():
- calls = []
- async def fake_call(tool, payload):
- calls.append((tool, payload))
- return {"results": {"bindings": []}}
- svc = AtlasStorageService(call_tool=fake_call)
- result = await svc.read_entity_claims("atlas:0cqt90abcd123456", include_superseded=True)
- assert result["status"] == "ok"
- assert calls[0][0] == "sparql_query"
- assert 'FILTER(?status = "active")' not in calls[0][1]["query"]
- @pytest.mark.anyio
- async def test_write_entity_unfinished_on_failure():
- async def fake_call(tool, payload):
- raise RuntimeError("backend down")
- svc = AtlasStorageService(call_tool=fake_call)
- entity = AtlasEntity(atlas_id="atlas:x", canonical_label="X")
- result = await svc.write_entity(entity)
- assert result["status"] == "unfinished"
- assert "backend down" in result["error"]
|