test_storage_service.py 2.7 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485
  1. import pytest
  2. from app.models import AtlasAlias, AtlasClaim, AtlasClaimObject, AtlasEntity, AtlasProvenance
  3. from app.storage_service import AtlasStorageService, entity_iri
  4. @pytest.mark.anyio
  5. async def test_write_entity_uses_batch_insert():
  6. calls = []
  7. async def fake_call(tool, payload):
  8. calls.append((tool, payload))
  9. return {"ok": True}
  10. svc = AtlasStorageService(call_tool=fake_call)
  11. entity = AtlasEntity(
  12. atlas_id="atlas:0cqt90abcd123456",
  13. canonical_label="Donald Trump",
  14. canonical_description="45th and 47th U.S. President",
  15. entity_type="Person",
  16. aliases=[AtlasAlias(label="Donald Trump")],
  17. claims=[
  18. AtlasClaim(
  19. claim_id="clm_raw_ident_mid_/m/0cqt90",
  20. subject="atlas:0cqt90abcd123456",
  21. predicate="atlas:hasIdentifier",
  22. object=AtlasClaimObject(kind="identifier", id_type="mid", value="/m/0cqt90"),
  23. layer="raw",
  24. provenance=AtlasProvenance(source="google", retrieval_method="trends-resolution", confidence=0.9),
  25. )
  26. ],
  27. )
  28. result = await svc.write_entity(entity)
  29. assert result["status"] == "ok"
  30. assert calls[0][0] == "batch_insert"
  31. assert "ttl" in calls[0][1]
  32. @pytest.mark.anyio
  33. async def test_read_entity_claims_uses_sparql_query():
  34. calls = []
  35. async def fake_call(tool, payload):
  36. calls.append((tool, payload))
  37. return {"results": {"bindings": []}}
  38. svc = AtlasStorageService(call_tool=fake_call)
  39. result = await svc.read_entity_claims("atlas:0cqt90abcd123456")
  40. assert result["status"] == "ok"
  41. assert calls[0][0] == "sparql_query"
  42. assert entity_iri("atlas:0cqt90abcd123456") in calls[0][1]["query"]
  43. assert 'FILTER(?status = "active")' in calls[0][1]["query"]
  44. @pytest.mark.anyio
  45. async def test_read_entity_claims_include_superseded_removes_filter():
  46. calls = []
  47. async def fake_call(tool, payload):
  48. calls.append((tool, payload))
  49. return {"results": {"bindings": []}}
  50. svc = AtlasStorageService(call_tool=fake_call)
  51. result = await svc.read_entity_claims("atlas:0cqt90abcd123456", include_superseded=True)
  52. assert result["status"] == "ok"
  53. assert calls[0][0] == "sparql_query"
  54. assert 'FILTER(?status = "active")' not in calls[0][1]["query"]
  55. @pytest.mark.anyio
  56. async def test_write_entity_unfinished_on_failure():
  57. async def fake_call(tool, payload):
  58. raise RuntimeError("backend down")
  59. svc = AtlasStorageService(call_tool=fake_call)
  60. entity = AtlasEntity(atlas_id="atlas:x", canonical_label="X")
  61. result = await svc.write_entity(entity)
  62. assert result["status"] == "unfinished"
  63. assert "backend down" in result["error"]