| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677 |
- import pytest
- from app.models import AtlasAlias, AtlasEntity, AtlasIdentifier, AtlasProvenance
- from app.storage_service import AtlasStorageService, entity_iri
- @pytest.mark.anyio
- async def test_write_entity_uses_batch_insert():
- calls = []
- async def fake_call(tool, payload):
- calls.append((tool, payload))
- return {"ok": True}
- svc = AtlasStorageService(call_tool=fake_call)
- entity = AtlasEntity(
- atlas_id="atlas:mid:/m/0cqt90",
- canonical_label="Donald Trump",
- canonical_description="45th and 47th U.S. President",
- entity_type="Person",
- aliases=[AtlasAlias(label="Donald Trump")],
- identifiers=[AtlasIdentifier(value="/m/0cqt90", source="google", identifier_type="mid")],
- provenance=[AtlasProvenance(source="google", retrieval_method="trends-resolution", confidence=0.9)],
- )
- result = await svc.write_entity(entity)
- assert result["status"] == "ok"
- assert calls[0][0] == "batch_insert"
- assert "ttl" in calls[0][1]
- @pytest.mark.anyio
- async def test_read_entity_claims_uses_sparql_query():
- calls = []
- async def fake_call(tool, payload):
- calls.append((tool, payload))
- return {"results": {"bindings": []}}
- svc = AtlasStorageService(call_tool=fake_call)
- result = await svc.read_entity_claims("atlas:mid:/m/0cqt90")
- assert result["status"] == "ok"
- assert calls[0][0] == "sparql_query"
- assert entity_iri("atlas:mid:/m/0cqt90") in calls[0][1]["query"]
- assert 'FILTER(?status = "active")' in calls[0][1]["query"]
- @pytest.mark.anyio
- async def test_read_entity_claims_include_superseded_removes_filter():
- calls = []
- async def fake_call(tool, payload):
- calls.append((tool, payload))
- return {"results": {"bindings": []}}
- svc = AtlasStorageService(call_tool=fake_call)
- result = await svc.read_entity_claims("atlas:mid:/m/0cqt90", include_superseded=True)
- assert result["status"] == "ok"
- assert calls[0][0] == "sparql_query"
- assert 'FILTER(?status = "active")' not in calls[0][1]["query"]
- @pytest.mark.anyio
- async def test_write_entity_unfinished_on_failure():
- async def fake_call(tool, payload):
- raise RuntimeError("backend down")
- svc = AtlasStorageService(call_tool=fake_call)
- entity = AtlasEntity(atlas_id="atlas:x", canonical_label="X")
- result = await svc.write_entity(entity)
- assert result["status"] == "unfinished"
- assert "backend down" in result["error"]
|