sqlite_store.py 46 KB

12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130
  1. from __future__ import annotations
  2. import json
  3. import sqlite3
  4. from dataclasses import dataclass
  5. from datetime import datetime, timezone, timedelta
  6. from pathlib import Path
  7. from typing import Any
  8. from email.utils import parsedate_to_datetime
  9. from news_mcp.article_identity import article_key
  10. from news_mcp.config import (
  11. NEWS_PRUNE_INTERVAL_HOURS,
  12. NEWS_PRUNING_ENABLED,
  13. NEWS_REFRESH_INTERVAL_SECONDS,
  14. NEWS_RETENTION_DAYS,
  15. )
  16. from news_mcp.entity_normalize import normalize_entities
  17. from news_mcp.trends_resolution import resolve_entity_via_trends
  18. def _normalize_ts(ts: Any) -> str:
  19. """Parse any timestamp string and return ISO 8601 UTC.
  20. Handles ISO 8601, RFC 2822/HTTP-date, and unix epoch seconds.
  21. Returns empty string if unparseable.
  22. """
  23. if ts is None:
  24. return ""
  25. if isinstance(ts, (int, float)):
  26. try:
  27. dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
  28. return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
  29. except Exception:
  30. return ""
  31. text = str(ts).strip()
  32. if not text:
  33. return ""
  34. # Try ISO 8601
  35. try:
  36. dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
  37. if dt.tzinfo is None:
  38. dt = dt.replace(tzinfo=timezone.utc)
  39. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  40. except Exception:
  41. pass
  42. # Try RFC 2822 / HTTP-date
  43. try:
  44. dt = parsedate_to_datetime(text)
  45. if dt.tzinfo is None:
  46. dt = dt.replace(tzinfo=timezone.utc)
  47. return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  48. except Exception:
  49. pass
  50. # Return original if we can't parse — better than losing data
  51. return text
  52. def _read_ts(ts: Any) -> float | None:
  53. """Parse a stored, already-normalized ISO 8601 UTC timestamp to a unix float.
  54. All payload.timestamp / payload.first_seen / payload.last_updated values
  55. are guaranteed YYYY-MM-DDTHH:MM:SS+00:00 at write time (enforced by
  56. sanitize_cluster_payload → _normalize_ts). Only datetime.fromisoformat is
  57. needed here. Do NOT add RFC 2822 / parsedate_to_datetime fallbacks — if
  58. this function can't parse a stored timestamp it means the normalization
  59. pipeline has a bug that should be fixed there, not papered over here.
  60. """
  61. if not ts:
  62. return None
  63. try:
  64. dt = datetime.fromisoformat(str(ts).strip())
  65. if dt.tzinfo is None:
  66. dt = dt.replace(tzinfo=timezone.utc)
  67. return dt.astimezone(timezone.utc).timestamp()
  68. except Exception:
  69. return None
  70. @dataclass
  71. class ClusterRow:
  72. cluster_id: str
  73. topic: str
  74. payload: dict
  75. updated_at: datetime
  76. META_LAST_PRUNE_AT = "last_prune_at"
  77. # For internal use — canonical name is article_key(article) from article_identity
  78. _article_key = article_key
  79. def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
  80. seen: set[str] = set()
  81. out: list[dict[str, Any]] = []
  82. for article in articles:
  83. key = _article_key(article)
  84. if key in seen:
  85. continue
  86. seen.add(key)
  87. out.append(article)
  88. return out
  89. def _has_valid_entity_resolutions(resolutions: Any, entities: list[str]) -> bool:
  90. if not isinstance(resolutions, list):
  91. return False
  92. if len(resolutions) != len(entities):
  93. return False
  94. for res in resolutions:
  95. if not isinstance(res, dict):
  96. return False
  97. if not res.get("normalized") or not res.get("canonical_label"):
  98. return False
  99. return True
  100. def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bool = True) -> dict[str, Any]:
  101. """Normalize cluster payload so every stored payload is internally consistent."""
  102. out = dict(cluster)
  103. raw_articles = out.get("articles", []) or []
  104. articles = [a for a in raw_articles if isinstance(a, dict)]
  105. # Normalize article timestamps, clamping future dates to now.
  106. now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  107. for a in articles:
  108. if "timestamp" in a:
  109. a["timestamp"] = _normalize_ts(a["timestamp"])
  110. if a["timestamp"] > now_str:
  111. a["timestamp"] = now_str
  112. out["articles"] = _dedup_articles(articles)
  113. raw_entities = out.get("entities", []) or []
  114. entities = normalize_entities(raw_entities)
  115. out["entities"] = entities
  116. # Normalize cluster-level timestamps, clamping future dates to now.
  117. now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  118. for field in ("timestamp", "last_updated", "first_seen"):
  119. if field in out and out[field]:
  120. ts = _normalize_ts(out[field])
  121. if ts > now_str:
  122. ts = now_str
  123. out[field] = ts
  124. # Ensure timestamp is always present for the generated column index.
  125. # Prefer existing timestamp, then first_seen, then last_updated, then now.
  126. for src in ("timestamp", "first_seen", "last_updated"):
  127. if out.get(src):
  128. out.setdefault("timestamp", out[src])
  129. break
  130. if not out.get("timestamp"):
  131. out["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
  132. # Preserve enrichment metadata across sanitization so the
  133. # poller's enriched_at cache check works on DB round-trips.
  134. for _fld in ("enriched_at", "enrichment_failed_at", "enrichment_retry_count"):
  135. if _fld in cluster:
  136. out.setdefault(_fld, cluster[_fld])
  137. if not include_resolutions:
  138. return out
  139. resolutions = out.get("entityResolutions", None)
  140. if entities:
  141. if not _has_valid_entity_resolutions(resolutions, entities):
  142. out["entityResolutions"] = [resolve_entity_via_trends(e) for e in entities]
  143. else:
  144. # Keep the empty case explicit and stable.
  145. out["entityResolutions"] = []
  146. return out
  147. class SQLiteClusterStore:
  148. def __init__(self, db_path: str | Path):
  149. self.db_path = str(db_path)
  150. self._init_db()
  151. def _conn(self) -> sqlite3.Connection:
  152. return sqlite3.connect(self.db_path)
  153. def _init_db(self) -> None:
  154. Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
  155. with self._conn() as conn:
  156. conn.execute("PRAGMA journal_mode=WAL")
  157. conn.execute("PRAGMA synchronous=NORMAL")
  158. conn.execute("PRAGMA busy_timeout=5000")
  159. conn.execute(
  160. """
  161. CREATE TABLE IF NOT EXISTS clusters (
  162. cluster_id TEXT PRIMARY KEY,
  163. topic TEXT NOT NULL,
  164. payload TEXT NOT NULL,
  165. updated_at TEXT NOT NULL,
  166. summary_payload TEXT,
  167. summary_updated_at TEXT
  168. )
  169. """
  170. )
  171. # If the table already exists without the summary columns,
  172. # add them (SQLite-friendly incremental migrations).
  173. for col_def in [
  174. "summary_payload TEXT",
  175. "summary_updated_at TEXT",
  176. ]:
  177. col = col_def.split()[0]
  178. try:
  179. conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}")
  180. except sqlite3.OperationalError:
  181. pass
  182. conn.execute(
  183. "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
  184. )
  185. conn.execute(
  186. "CREATE INDEX IF NOT EXISTS idx_clusters_updated_at ON clusters(updated_at)"
  187. )
  188. # Generated column for indexed event-time filtering (VIRTUAL for compatibility)
  189. try:
  190. conn.execute(
  191. "ALTER TABLE clusters ADD COLUMN payload_ts "
  192. "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
  193. )
  194. except sqlite3.OperationalError:
  195. pass # column already exists
  196. conn.execute(
  197. "CREATE INDEX IF NOT EXISTS idx_clusters_payload_ts ON clusters(payload_ts)"
  198. )
  199. # Junction tables for SQL-level entity/keyword search
  200. conn.execute(
  201. """
  202. CREATE TABLE IF NOT EXISTS cluster_entities (
  203. cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
  204. entity TEXT NOT NULL,
  205. PRIMARY KEY (cluster_id, entity)
  206. )
  207. """
  208. )
  209. conn.execute(
  210. "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
  211. )
  212. conn.execute(
  213. """
  214. CREATE TABLE IF NOT EXISTS cluster_keywords (
  215. cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
  216. keyword TEXT NOT NULL,
  217. PRIMARY KEY (cluster_id, keyword)
  218. )
  219. """
  220. )
  221. conn.execute(
  222. "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
  223. )
  224. # Seen-articles table: tracks every article_key that has been
  225. # clustered, so the poller can skip already-processed articles
  226. # entirely (no re-clustering, no re-enrichment).
  227. conn.execute(
  228. """
  229. CREATE TABLE IF NOT EXISTS seen_articles (
  230. article_key TEXT PRIMARY KEY,
  231. cluster_id TEXT NOT NULL,
  232. first_seen TEXT NOT NULL,
  233. url TEXT NOT NULL DEFAULT '',
  234. content_hash TEXT NOT NULL DEFAULT ''
  235. )
  236. """
  237. )
  238. # Migration: add content_hash column if missing (existing DBs)
  239. try:
  240. conn.execute("ALTER TABLE seen_articles ADD COLUMN content_hash TEXT NOT NULL DEFAULT ''")
  241. except sqlite3.OperationalError:
  242. pass # column already exists
  243. try:
  244. cur = conn.execute("PRAGMA table_info(entity_metadata)")
  245. cols = [row[1] for row in cur.fetchall()]
  246. if cols and "entity_id" not in cols:
  247. conn.execute("DROP TABLE entity_metadata")
  248. except sqlite3.OperationalError:
  249. pass
  250. conn.execute(
  251. """
  252. CREATE TABLE IF NOT EXISTS entity_metadata (
  253. entity_id TEXT PRIMARY KEY,
  254. normalized_label TEXT NOT NULL,
  255. canonical_label TEXT,
  256. mid TEXT,
  257. sources_json TEXT,
  258. updated_at TEXT,
  259. last_requested_at TEXT
  260. )
  261. """
  262. )
  263. conn.execute(
  264. "CREATE UNIQUE INDEX IF NOT EXISTS idx_entity_metadata_mid ON entity_metadata(mid) WHERE mid IS NOT NULL"
  265. )
  266. conn.execute(
  267. """
  268. CREATE TABLE IF NOT EXISTS feed_state (
  269. feed_key TEXT PRIMARY KEY,
  270. last_hash TEXT NOT NULL,
  271. last_item_count INTEGER,
  272. enabled INTEGER DEFAULT 1,
  273. updated_at TEXT NOT NULL
  274. )
  275. """
  276. )
  277. try:
  278. conn.execute("ALTER TABLE feed_state ADD COLUMN enabled INTEGER DEFAULT 1")
  279. except sqlite3.OperationalError:
  280. pass
  281. conn.execute(
  282. """
  283. CREATE TABLE IF NOT EXISTS meta (
  284. key TEXT PRIMARY KEY,
  285. value TEXT NOT NULL
  286. )
  287. """
  288. )
  289. # Seed site_config from .env / defaults (no-op if already populated)
  290. from news_mcp.site_config import seed_site_config
  291. seeded = seed_site_config(conn)
  292. if seeded:
  293. import logging
  294. logging.getLogger(__name__).info("site_config: seeded %d rows from env/defaults", seeded)
  295. def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
  296. now = datetime.now(timezone.utc)
  297. with self._conn() as conn:
  298. for c in clusters:
  299. c = sanitize_cluster_payload(c)
  300. cluster_id = c["cluster_id"]
  301. payload = json.dumps(c, ensure_ascii=False)
  302. conn.execute(
  303. "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
  304. "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
  305. (cluster_id, topic, payload, now.isoformat()),
  306. )
  307. # Populate junction tables for SQL-level entity/keyword search.
  308. # DELETE first so re-enrichment replaces stale entries.
  309. conn.execute("DELETE FROM cluster_entities WHERE cluster_id=?", (cluster_id,))
  310. conn.execute("DELETE FROM cluster_keywords WHERE cluster_id=?", (cluster_id,))
  311. for entity in c.get("entities", []):
  312. ent_norm = str(entity).strip().lower()
  313. if ent_norm:
  314. conn.execute(
  315. "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
  316. (cluster_id, ent_norm),
  317. )
  318. for kw in c.get("keywords", []):
  319. kw_norm = str(kw).strip().lower()
  320. if kw_norm:
  321. conn.execute(
  322. "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
  323. (cluster_id, kw_norm),
  324. )
  325. # Record every article in seen_articles so the poller can
  326. # skip already-processed articles on future cycles.
  327. for art in c.get("articles", []):
  328. akey = _article_key(art)
  329. if akey:
  330. art_url = str(art.get("url") or "").strip()
  331. from news_mcp.article_identity import article_content_hash as _chash
  332. ahash = _chash(art)
  333. conn.execute(
  334. "INSERT INTO seen_articles(article_key, cluster_id, first_seen, url, content_hash) VALUES(?,?,?,?,?) "
  335. "ON CONFLICT(article_key) DO UPDATE SET cluster_id=excluded.cluster_id, url=excluded.url, content_hash=excluded.content_hash",
  336. (akey, cluster_id, now.isoformat(), art_url, ahash),
  337. )
  338. def upsert_cluster_summary(
  339. self,
  340. cluster_id: str,
  341. summary_payload: dict,
  342. ) -> None:
  343. now = datetime.now(timezone.utc).isoformat()
  344. with self._conn() as conn:
  345. conn.execute(
  346. "UPDATE clusters SET summary_payload=?, summary_updated_at=? WHERE cluster_id=?",
  347. (
  348. json.dumps(summary_payload, ensure_ascii=False),
  349. now,
  350. cluster_id,
  351. ),
  352. )
  353. def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None:
  354. cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
  355. cutoff_iso = cutoff.isoformat()
  356. with self._conn() as conn:
  357. cur = conn.execute(
  358. "SELECT summary_payload, summary_updated_at FROM clusters "
  359. "WHERE cluster_id=? AND summary_updated_at >= ?",
  360. (cluster_id, cutoff_iso),
  361. )
  362. row = cur.fetchone()
  363. if not row or not row[0]:
  364. return None
  365. return json.loads(row[0])
  366. def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
  367. """Return newest clusters by event timestamp, filtered via SQL payload_ts index."""
  368. cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat()
  369. with self._conn() as conn:
  370. cur = conn.execute(
  371. "SELECT payload FROM clusters WHERE topic=? AND payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?",
  372. (topic, cutoff, int(limit)),
  373. )
  374. return [json.loads(r[0]) for r in cur.fetchall()]
  375. def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
  376. """Return newest clusters across all topics, filtered via SQL payload_ts index."""
  377. cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat()
  378. with self._conn() as conn:
  379. cur = conn.execute(
  380. "SELECT payload FROM clusters WHERE payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?",
  381. (cutoff, int(limit)),
  382. )
  383. return [json.loads(r[0]) for r in cur.fetchall()]
  384. def get_cluster_by_id(self, cluster_id: str) -> dict | None:
  385. with self._conn() as conn:
  386. cur = conn.execute(
  387. "SELECT payload FROM clusters WHERE cluster_id=?",
  388. (cluster_id,),
  389. )
  390. row = cur.fetchone()
  391. return json.loads(row[0]) if row else None
  392. def get_feed_hash(self, feed_key: str) -> str | None:
  393. with self._conn() as conn:
  394. cur = conn.execute(
  395. "SELECT last_hash FROM feed_state WHERE feed_key=?",
  396. (feed_key,),
  397. )
  398. row = cur.fetchone()
  399. return row[0] if row else None
  400. def set_feed_hash(self, feed_key: str, last_hash: str) -> None:
  401. self.set_feed_state(feed_key, last_hash, None)
  402. def set_feed_state(self, feed_key: str, last_hash: str, last_item_count: int | None = None) -> None:
  403. now = datetime.now(timezone.utc).isoformat()
  404. with self._conn() as conn:
  405. conn.execute(
  406. "INSERT INTO feed_state(feed_key, last_hash, last_item_count, updated_at) VALUES(?,?,?,?) "
  407. "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, last_item_count=excluded.last_item_count, updated_at=excluded.updated_at",
  408. (feed_key, last_hash, last_item_count, now),
  409. )
  410. def get_feed_state(self, feed_key: str) -> dict | None:
  411. with self._conn() as conn:
  412. cur = conn.execute(
  413. "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?",
  414. (feed_key,),
  415. )
  416. row = cur.fetchone()
  417. if not row:
  418. return None
  419. return {"last_hash": row[0], "updated_at": row[1]}
  420. def get_all_feed_states(self) -> list[dict[str, Any]]:
  421. """All feed_state rows."""
  422. with self._conn() as conn:
  423. cur = conn.execute(
  424. "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"
  425. )
  426. return [
  427. {
  428. "feed_key": row[0],
  429. "last_hash": row[1],
  430. "last_item_count": row[2],
  431. "enabled": bool(row[3]),
  432. "updated_at": row[4],
  433. }
  434. for row in cur.fetchall()
  435. ]
  436. def feed_ensure_seeded(self, feed_urls: list[str]) -> None:
  437. """Insert any feed URLs not yet present in feed_state (enabled by default)."""
  438. if not feed_urls:
  439. return
  440. with self._conn() as conn:
  441. for url in feed_urls:
  442. conn.execute(
  443. "INSERT OR IGNORE INTO feed_state(feed_key, last_hash, last_item_count, enabled, updated_at) VALUES(?, '', 0, 1, '')",
  444. (url,),
  445. )
  446. def get_feed_state_list(self) -> list[dict[str, Any]]:
  447. """Return all feeds with enabled/disabled status for the dashboard."""
  448. return self.get_all_feed_states()
  449. def set_feed_enabled(self, feed_url: str, enabled: bool) -> bool:
  450. """Toggle a feed's enabled state. Returns True if the feed existed and was updated."""
  451. with self._conn() as conn:
  452. cur = conn.execute(
  453. "UPDATE feed_state SET enabled = ? WHERE feed_key = ?",
  454. (1 if enabled else 0, feed_url),
  455. )
  456. return cur.rowcount > 0
  457. def get_enabled_feed_urls(self, feed_urls: list[str]) -> list[str]:
  458. """From a list of configured feed URLs, return only those that are enabled in feed_state.
  459. URLs not yet present in feed_state are seeded as enabled.
  460. """
  461. self.feed_ensure_seeded(feed_urls)
  462. with self._conn() as conn:
  463. placeholders = ",".join("?" for _ in feed_urls) if feed_urls else ""
  464. if placeholders:
  465. cur = conn.execute(
  466. f"SELECT feed_key FROM feed_state WHERE feed_key IN ({placeholders}) AND enabled = 1",
  467. feed_urls,
  468. )
  469. else:
  470. cur = conn.execute("SELECT feed_key FROM feed_state WHERE enabled = 1")
  471. return [row[0] for row in cur.fetchall()]
  472. # ------------------------------------------------------------------ #
  473. # Seen-articles: skip already-processed articles at ingestion
  474. # ------------------------------------------------------------------ #
  475. def filter_already_seen(self, articles: list[dict]) -> tuple[list[dict], list[dict], list[dict]]:
  476. """Split articles into (new, seen_unchanged, seen_changed) based on seen_articles.
  477. Uses _article_key (URL) as identity and article_content_hash to detect
  478. in-place content updates (e.g. a stub that gets fleshed out).
  479. Returns:
  480. new_articles: never seen before → full clustering + enrichment
  481. seen_unchanged: same key, same content hash → skip entirely
  482. seen_changed: same key, different content hash → re-cluster to update
  483. the existing cluster payload (will trigger re-enrichment)
  484. """
  485. from news_mcp.article_identity import article_content_hash as _content_hash
  486. keys = [_article_key(a) for a in articles]
  487. if not keys:
  488. return [], [], []
  489. with self._conn() as conn:
  490. placeholders = ",".join("?" for _ in keys)
  491. cur = conn.execute(
  492. f"SELECT article_key, content_hash FROM seen_articles WHERE article_key IN ({placeholders})",
  493. keys,
  494. )
  495. seen_map = {row[0]: row[1] for row in cur.fetchall()}
  496. new_articles = []
  497. seen_unchanged = []
  498. seen_changed = []
  499. for art, key in zip(articles, keys):
  500. if key not in seen_map:
  501. new_articles.append(art)
  502. elif seen_map[key] == _content_hash(art):
  503. seen_unchanged.append(art)
  504. elif not seen_map[key]:
  505. # Stored hash is empty (pre-migration row). Treat as unchanged
  506. # and let the next upsert populate the real hash.
  507. seen_unchanged.append(art)
  508. else:
  509. seen_changed.append(art)
  510. return new_articles, seen_unchanged, seen_changed
  511. def get_seen_article_count(self) -> int:
  512. """Total rows in seen_articles (for diagnostics)."""
  513. with self._conn() as conn:
  514. return conn.execute("SELECT count(*) FROM seen_articles").fetchone()[0]
  515. def get_meta(self, key: str) -> str | None:
  516. with self._conn() as conn:
  517. cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,))
  518. row = cur.fetchone()
  519. return row[0] if row else None
  520. def set_meta(self, key: str, value: str) -> None:
  521. with self._conn() as conn:
  522. conn.execute(
  523. "INSERT INTO meta(key, value) VALUES(?, ?) "
  524. "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
  525. (key, value),
  526. )
  527. def upsert_entity_metadata(
  528. self,
  529. normalized_label: str,
  530. canonical_label: str | None = None,
  531. mid: str | None = None,
  532. sources: list[str] | None = None,
  533. ) -> None:
  534. normalized_label = str(normalized_label or "").strip()
  535. if not normalized_label:
  536. return
  537. canonical_label = str(canonical_label).strip() if canonical_label else None
  538. mid = str(mid).strip() if mid else None
  539. entity_id = mid if mid else f"local:{normalized_label}"
  540. sources = sorted({s for s in (sources or []) if s})
  541. sources_json = json.dumps(sources, ensure_ascii=False)
  542. now = datetime.now(timezone.utc).isoformat()
  543. with self._conn() as conn:
  544. conn.execute(
  545. """
  546. INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at)
  547. VALUES(?,?,?,?,?,?)
  548. ON CONFLICT(entity_id) DO UPDATE SET
  549. canonical_label=excluded.canonical_label,
  550. mid=excluded.mid,
  551. sources_json=excluded.sources_json,
  552. updated_at=excluded.updated_at
  553. """,
  554. (entity_id, normalized_label, canonical_label, mid, sources_json, now),
  555. )
  556. def get_entity_metadata(self, normalized_label: str) -> dict[str, Any] | None:
  557. normalized_label = str(normalized_label or "").strip()
  558. if not normalized_label:
  559. return None
  560. with self._conn() as conn:
  561. cur = conn.execute(
  562. "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at "
  563. "FROM entity_metadata "
  564. "WHERE normalized_label=? "
  565. "ORDER BY CASE WHEN mid IS NOT NULL THEN 0 ELSE 1 END, "
  566. "COALESCE(last_requested_at, updated_at) DESC, updated_at DESC "
  567. "LIMIT 1",
  568. (normalized_label,),
  569. )
  570. row = cur.fetchone()
  571. if not row:
  572. return None
  573. sources = []
  574. if row[2]:
  575. try:
  576. sources = json.loads(row[2])
  577. except Exception:
  578. sources = []
  579. return {
  580. "entity_id": row[0],
  581. "normalized_label": normalized_label,
  582. "canonical_label": row[1],
  583. "mid": row[2],
  584. "sources": sources,
  585. "updated_at": row[4],
  586. "last_requested_at": row[5],
  587. }
  588. def record_entity_request(self, normalized_label: str, mid: str | None = None) -> None:
  589. normalized_label = str(normalized_label or "").strip()
  590. if not normalized_label:
  591. return
  592. mid = str(mid).strip() if mid else None
  593. entity_id = mid if mid else f"local:{normalized_label}"
  594. now = datetime.now(timezone.utc).isoformat()
  595. with self._conn() as conn:
  596. conn.execute(
  597. """
  598. INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at, last_requested_at)
  599. VALUES(?,?,?,?,?,?,?)
  600. ON CONFLICT(entity_id) DO UPDATE SET
  601. last_requested_at=excluded.last_requested_at
  602. """,
  603. (entity_id, normalized_label, None, mid, json.dumps([], ensure_ascii=False), now, now),
  604. )
  605. def get_failed_enrichment_clusters(self, max_retries: int = 3) -> list[dict]:
  606. """Return clusters whose last enrichment failed and haven't exceeded max_retries.
  607. These are candidates for re-enrichment on the next polling cycle.
  608. """
  609. with self._conn() as conn:
  610. cur = conn.execute(
  611. "SELECT payload FROM clusters "
  612. "WHERE json_extract(payload, '$.enrichment_failed_at') IS NOT NULL "
  613. "AND (json_extract(payload, '$.enrichment_retry_count') IS NULL "
  614. " OR json_extract(payload, '$.enrichment_retry_count') < ?) "
  615. "ORDER BY updated_at DESC LIMIT 500",
  616. (max_retries,),
  617. )
  618. rows = cur.fetchall()
  619. return [json.loads(r[0]) for r in rows]
  620. def prune_clusters(self, retention_days: float) -> int:
  621. retention_days = float(retention_days)
  622. if retention_days <= 0:
  623. return 0
  624. cutoff = datetime.now(timezone.utc) - timedelta(days=retention_days)
  625. cutoff_iso = cutoff.isoformat()
  626. pruned_at = datetime.now(timezone.utc).isoformat()
  627. with self._conn() as conn:
  628. # Use payload_ts (event time from payload.timestamp) not updated_at
  629. # (row write time). updated_at is refreshed on every upsert, which
  630. # would keep re-ingested old articles alive forever.
  631. # Collect cluster_ids being pruned so we can clean seen_articles.
  632. pruned_ids = [
  633. row[0] for row in conn.execute(
  634. "SELECT cluster_id FROM clusters WHERE payload_ts < ?", (cutoff_iso,)
  635. ).fetchall()
  636. ]
  637. cur = conn.execute("DELETE FROM clusters WHERE payload_ts < ?", (cutoff_iso,))
  638. deleted = int(cur.rowcount or 0)
  639. # Clean up seen_articles rows pointing to pruned clusters
  640. if pruned_ids:
  641. placeholders = ",".join("?" for _ in pruned_ids)
  642. conn.execute(
  643. f"DELETE FROM seen_articles WHERE cluster_id IN ({placeholders})",
  644. pruned_ids,
  645. )
  646. conn.execute(
  647. "INSERT INTO meta(key, value) VALUES(?, ?) "
  648. "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
  649. (META_LAST_PRUNE_AT, pruned_at),
  650. )
  651. return deleted
  652. def prune_if_due(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
  653. retention_days = float(retention_days)
  654. interval_hours = float(interval_hours)
  655. if (not pruning_enabled) or retention_days <= 0:
  656. return {
  657. "enabled": bool(pruning_enabled),
  658. "deleted": 0,
  659. "due": False,
  660. "retention_days": retention_days,
  661. "interval_hours": interval_hours,
  662. "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
  663. }
  664. last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
  665. now = datetime.now(timezone.utc)
  666. due = True
  667. if last_prune_at:
  668. try:
  669. last_dt = datetime.fromisoformat(last_prune_at)
  670. due = now - last_dt >= timedelta(hours=max(1.0, interval_hours))
  671. except Exception:
  672. due = True
  673. if not due:
  674. return {
  675. "enabled": True,
  676. "deleted": 0,
  677. "due": False,
  678. "retention_days": retention_days,
  679. "interval_hours": interval_hours,
  680. "last_prune_at": last_prune_at,
  681. }
  682. deleted = self.prune_clusters(retention_days)
  683. last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
  684. return {
  685. "enabled": True,
  686. "deleted": deleted,
  687. "due": True,
  688. "retention_days": retention_days,
  689. "interval_hours": interval_hours,
  690. "last_prune_at": last_prune_at,
  691. }
  692. def get_prune_state(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
  693. return {
  694. "enabled": bool(pruning_enabled),
  695. "retention_days": float(retention_days),
  696. "interval_hours": float(interval_hours),
  697. "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
  698. }
  699. # ------------------------------------------------------------------
  700. # Dashboard query helpers
  701. # ------------------------------------------------------------------
  702. def get_dashboard_stats(self) -> dict[str, Any]:
  703. """Aggregate status numbers for the health panel."""
  704. with self._conn() as conn:
  705. total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
  706. total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
  707. cluster_entities = conn.execute(
  708. "SELECT COUNT(DISTINCT e.value) "
  709. "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
  710. ).fetchone()[0]
  711. topic_counts = dict(conn.execute(
  712. "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
  713. ).fetchall())
  714. last_refresh = self.get_meta("last_refresh_at")
  715. feeds = {}
  716. for row in conn.execute(
  717. "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"
  718. ):
  719. feeds[row[0]] = {
  720. "last_hash": row[1], "last_item_count": row[2],
  721. "enabled": bool(row[3]), "updated_at": row[4],
  722. }
  723. # Freshness: did a refresh happen recently? (within 2x the configured interval)
  724. fresh = False
  725. if last_refresh:
  726. try:
  727. dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
  728. if dt.tzinfo is None:
  729. dt = dt.replace(tzinfo=timezone.utc)
  730. age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
  731. fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
  732. except Exception:
  733. pass
  734. last_prune = self.get_meta(META_LAST_PRUNE_AT)
  735. prune_state = self.get_prune_state(
  736. pruning_enabled=NEWS_PRUNING_ENABLED,
  737. retention_days=NEWS_RETENTION_DAYS,
  738. interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
  739. )
  740. return {
  741. "total_clusters": total_clusters,
  742. "total_entities": total_entities,
  743. "cluster_entities": cluster_entities,
  744. "clusters_by_topic": topic_counts,
  745. "last_refresh_at": last_refresh,
  746. "last_prune_at": last_prune,
  747. "data_fresh": fresh,
  748. "feeds": feeds,
  749. "feed_count": len(feeds),
  750. "seen_article_count": self.get_seen_article_count(),
  751. "prune_state": prune_state,
  752. }
  753. def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
  754. """Dashboard-optimized cluster detail fetch."""
  755. with self._conn() as conn:
  756. cur = conn.execute(
  757. "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
  758. )
  759. row = cur.fetchone()
  760. if not row:
  761. return None
  762. c = json.loads(row[0])
  763. summary = None
  764. if c.get("summary_payload"):
  765. try:
  766. summary = json.loads(c["summary_payload"])
  767. except Exception:
  768. pass
  769. return {
  770. "cluster_id": c.get("cluster_id"),
  771. "headline": c.get("headline", ""),
  772. "summary": c.get("summary", ""),
  773. "topic": c.get("topic", ""),
  774. "sentiment": c.get("sentiment", "neutral"),
  775. "sentimentScore": c.get("sentimentScore"),
  776. "importance": c.get("importance", 0),
  777. "entities": c.get("entities", []),
  778. "entityResolutions": c.get("entityResolutions", []),
  779. "keywords": c.get("keywords", []),
  780. "sources": c.get("sources", []),
  781. "timestamp": c.get("timestamp", ""),
  782. "first_seen": c.get("first_seen", ""),
  783. "last_updated": c.get("last_updated", ""),
  784. "article_count": len(c.get("articles", [])),
  785. "articles": c.get("articles", []),
  786. "summary_text": summary.get("mergedSummary", "") if summary else "",
  787. "key_facts": summary.get("keyFacts", []) if summary else [],
  788. }
  789. # ── Paginated Clusters ────────────────────────────────────────────
  790. def get_clusters_page(
  791. self,
  792. topic: str | None = None,
  793. hours: float = 24,
  794. limit: int = 20,
  795. offset: int = 0,
  796. ) -> dict[str, Any]:
  797. """Paginated cluster listing filtered by SQL payload_ts index.
  798. Returns {"clusters": [...], "total": int}.
  799. """
  800. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  801. query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
  802. params: list = [cutoff]
  803. if topic and topic != "all":
  804. query += " AND topic = ?"
  805. params.append(topic)
  806. total = self._conn().execute(
  807. f"SELECT COUNT(*) FROM ({query})", params
  808. ).fetchone()[0]
  809. query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?"
  810. params.extend([limit, offset])
  811. with self._conn() as conn:
  812. rows = conn.execute(query, params).fetchall()
  813. return {
  814. "clusters": [
  815. {
  816. "cluster_id": c.get("cluster_id", ""),
  817. "headline": c.get("headline", ""),
  818. "topic": c.get("topic", ""),
  819. "sentiment": c.get("sentiment", "neutral"),
  820. "sentimentScore": c.get("sentimentScore"),
  821. "importance": c.get("importance", 0),
  822. "entities": c.get("entities", []),
  823. "sources": c.get("sources", []),
  824. "timestamp": c.get("timestamp", ""),
  825. "keywords": c.get("keywords", []),
  826. "article_count": len(c.get("articles", [])),
  827. }
  828. for c in [json.loads(r[0]) for r in rows]
  829. ],
  830. "total": total,
  831. }
  832. # ── Sentiment Series ──────────────────────────────────────────────
  833. def get_sentiment_series(
  834. self,
  835. topic: str | None = None,
  836. hours: float = 24,
  837. bucket_hours: float = 1,
  838. ) -> list[dict[str, Any]]:
  839. """Sentiment score averaged per time bucket. Filters by payload_ts SQL index."""
  840. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  841. query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
  842. params: list = [cutoff]
  843. if topic and topic != "all":
  844. query += " AND topic = ?"
  845. params.append(topic)
  846. query += " ORDER BY payload_ts ASC"
  847. with self._conn() as conn:
  848. rows = conn.execute(query, params).fetchall()
  849. buckets: dict[datetime, list[float]] = {}
  850. for (payload_text,) in rows:
  851. c = json.loads(payload_text)
  852. ts_str = c.get("timestamp")
  853. score = c.get("sentimentScore")
  854. if not ts_str or score is None:
  855. continue
  856. dt = datetime.fromisoformat(str(ts_str).strip())
  857. if dt.tzinfo is None:
  858. dt = dt.replace(tzinfo=timezone.utc)
  859. dt = dt.astimezone(timezone.utc)
  860. bucket_key = dt.replace(minute=0, second=0, microsecond=0)
  861. if bucket_hours > 1:
  862. bucket_key = bucket_key.replace(
  863. hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
  864. )
  865. buckets.setdefault(bucket_key, []).append(float(score))
  866. return [
  867. {
  868. "time": bucket_key.isoformat(),
  869. "avg_sentiment": round(sum(scores) / len(scores), 3),
  870. "count": len(scores),
  871. "min": round(min(scores), 3),
  872. "max": round(max(scores), 3),
  873. }
  874. for bucket_key, scores in sorted(buckets.items())
  875. ]
  876. # ── Entity / Keyword Frequencies ──────────────────────────────────
  877. def get_entity_frequencies(
  878. self,
  879. hours: float = 24,
  880. limit: int = 30,
  881. ) -> list[dict[str, Any]]:
  882. """Top entities by mention count, using SQL junction table + payload_ts index."""
  883. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  884. with self._conn() as conn:
  885. rows = conn.execute(
  886. """\
  887. SELECT ce.entity, COUNT(*) as cnt
  888. FROM cluster_entities ce
  889. JOIN clusters c ON c.cluster_id = ce.cluster_id
  890. WHERE c.payload_ts >= ?
  891. GROUP BY ce.entity
  892. ORDER BY cnt DESC
  893. LIMIT ?
  894. """,
  895. (cutoff, limit),
  896. ).fetchall()
  897. result: list[dict[str, Any]] = []
  898. for label, count in rows:
  899. meta = self.get_entity_metadata(label)
  900. result.append({
  901. "label": label,
  902. "count": count,
  903. "canonical_label": meta["canonical_label"] if meta else label,
  904. "mid": meta["mid"] if meta else None,
  905. })
  906. return result
  907. def get_keyword_frequencies(
  908. self,
  909. hours: float = 24,
  910. limit: int = 30,
  911. ) -> list[dict[str, Any]]:
  912. """Top keywords by mention count, using SQL junction table + payload_ts index.
  913. Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
  914. """
  915. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  916. _topic_labels = {"crypto", "macro", "regulation", "ai", "other"}
  917. with self._conn() as conn:
  918. rows = conn.execute(
  919. """\
  920. SELECT ck.keyword, COUNT(*) as cnt
  921. FROM cluster_keywords ck
  922. JOIN clusters c ON c.cluster_id = ck.cluster_id
  923. WHERE c.payload_ts >= ?
  924. GROUP BY ck.keyword
  925. ORDER BY cnt DESC
  926. LIMIT ?
  927. """,
  928. (cutoff, limit),
  929. ).fetchall()
  930. return [
  931. {"label": label, "count": count}
  932. for label, count in rows
  933. if label.lower() not in _topic_labels
  934. ]
  935. # ── Junction-Table Entity / Keyword Cluster Search ────────────────
  936. def get_clusters_by_entity(
  937. self,
  938. entity: str,
  939. hours: float = 168,
  940. limit: int = 50,
  941. offset: int = 0,
  942. ) -> dict[str, Any]:
  943. """Return clusters matching an entity, SQL-level filter via junction table.
  944. Returns {"entity": ..., "clusters": [...], "total": int, "hours": float}.
  945. """
  946. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  947. entity_norm = entity.strip().lower()
  948. with self._conn() as conn:
  949. total = conn.execute(
  950. "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
  951. "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
  952. "WHERE c.payload_ts >= ? AND ce.entity = ?",
  953. (cutoff, entity_norm),
  954. ).fetchone()[0]
  955. rows = conn.execute(
  956. "SELECT DISTINCT c.payload FROM clusters c "
  957. "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
  958. "WHERE c.payload_ts >= ? AND ce.entity = ? "
  959. "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
  960. (cutoff, entity_norm, limit, offset),
  961. ).fetchall()
  962. return {
  963. "entity": entity_norm,
  964. "clusters": [json.loads(r[0]) for r in rows],
  965. "total": total,
  966. "hours": hours,
  967. }
  968. def get_clusters_by_keyword(
  969. self,
  970. keyword: str,
  971. hours: float = 168,
  972. limit: int = 50,
  973. offset: int = 0,
  974. ) -> dict[str, Any]:
  975. """Return clusters matching a keyword, SQL-level filter via junction table."""
  976. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  977. kw_norm = keyword.strip().lower()
  978. with self._conn() as conn:
  979. total = conn.execute(
  980. "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
  981. "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
  982. "WHERE c.payload_ts >= ? AND ck.keyword = ?",
  983. (cutoff, kw_norm),
  984. ).fetchone()[0]
  985. rows = conn.execute(
  986. "SELECT DISTINCT c.payload FROM clusters c "
  987. "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
  988. "WHERE c.payload_ts >= ? AND ck.keyword = ? "
  989. "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
  990. (cutoff, kw_norm, limit, offset),
  991. ).fetchall()
  992. return {
  993. "keyword": kw_norm,
  994. "clusters": [json.loads(r[0]) for r in rows],
  995. "total": total,
  996. "hours": hours,
  997. }
  998. def get_clusters_by_entity_or_keyword(
  999. self,
  1000. query_terms: set[str],
  1001. hours: float,
  1002. limit: int,
  1003. ) -> list[dict]:
  1004. """Search clusters by matching ANY query term against entities OR keywords.
  1005. Uses SQL-level junction-table filtering — no row-limit blind spot.
  1006. Returns clusters sorted by recency.
  1007. """
  1008. terms = [q.strip().lower() for q in query_terms if q.strip()]
  1009. if not terms:
  1010. return []
  1011. cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
  1012. placeholders = ",".join("?" for _ in terms)
  1013. with self._conn() as conn:
  1014. rows = conn.execute(
  1015. f"SELECT DISTINCT c.payload FROM clusters c "
  1016. f"LEFT JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
  1017. f"LEFT JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
  1018. f"WHERE c.payload_ts >= ? "
  1019. f" AND (ce.entity IN ({placeholders}) OR ck.keyword IN ({placeholders})) "
  1020. f"ORDER BY c.payload_ts DESC LIMIT ?",
  1021. (cutoff, *terms, *terms, limit),
  1022. ).fetchall()
  1023. return [json.loads(r[0]) for r in rows]