| 12345678910111213141516171819202122232425262728293031323334353637383940414243444546474849505152535455565758596061626364656667686970717273747576777879808182838485868788899091929394959697989910010110210310410510610710810911011111211311411511611711811912012112212312412512612712812913013113213313413513613713813914014114214314414514614714814915015115215315415515615715815916016116216316416516616716816917017117217317417517617717817918018118218318418518618718818919019119219319419519619719819920020120220320420520620720820921021121221321421521621721821922022122222322422522622722822923023123223323423523623723823924024124224324424524624724824925025125225325425525625725825926026126226326426526626726826927027127227327427527627727827928028128228328428528628728828929029129229329429529629729829930030130230330430530630730830931031131231331431531631731831932032132232332432532632732832933033133233333433533633733833934034134234334434534634734834935035135235335435535635735835936036136236336436536636736836937037137237337437537637737837938038138238338438538638738838939039139239339439539639739839940040140240340440540640740840941041141241341441541641741841942042142242342442542642742842943043143243343443543643743843944044144244344444544644744844945045145245345445545645745845946046146246346446546646746846947047147247347447547647747847948048148248348448548648748848949049149249349449549649749849950050150250350450550650750850951051151251351451551651751851952052152252352452552652752852953053153253353453553653753853954054154254354454554654754854955055155255355455555655755855956056156256356456556656756856957057157257357457557657757857958058158258358458558658758858959059159259359459559659759859960060160260360460560660760860961061161261361461561661761861962062162262362462562662762862963063163263363463563663763863964064164264364464564664764864965065165265365465565665765865966066166266366466566666766866967067167267367467567667767867968068168268368468568668768868969069169269369469569669769869970070170270370470570670770870971071171271371471571671771871972072172272372472572672772872973073173273373473573673773873974074174274374474574674774874975075175275375475575675775875976076176276376476576676776876977077177277377477577677777877978078178278378478578678778878979079179279379479579679779879980080180280380480580680780880981081181281381481581681781881982082182282382482582682782882983083183283383483583683783883984084184284384484584684784884985085185285385485585685785885986086186286386486586686786886987087187287387487587687787887988088188288388488588688788888989089189289389489589689789889990090190290390490590690790890991091191291391491591691791891992092192292392492592692792892993093193293393493593693793893994094194294394494594694794894995095195295395495595695795895996096196296396496596696796896997097197297397497597697797897998098198298398498598698798898999099199299399499599699799899910001001100210031004100510061007100810091010101110121013101410151016101710181019102010211022102310241025102610271028102910301031103210331034103510361037103810391040104110421043104410451046104710481049105010511052105310541055105610571058105910601061106210631064106510661067106810691070107110721073107410751076107710781079108010811082108310841085108610871088108910901091109210931094109510961097109810991100110111021103110411051106110711081109111011111112111311141115111611171118111911201121112211231124112511261127112811291130 |
- from __future__ import annotations
- import json
- import sqlite3
- from dataclasses import dataclass
- from datetime import datetime, timezone, timedelta
- from pathlib import Path
- from typing import Any
- from email.utils import parsedate_to_datetime
- from news_mcp.article_identity import article_key
- from news_mcp.config import (
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_REFRESH_INTERVAL_SECONDS,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.entity_normalize import normalize_entities
- from news_mcp.trends_resolution import resolve_entity_via_trends
- def _normalize_ts(ts: Any) -> str:
- """Parse any timestamp string and return ISO 8601 UTC.
-
- Handles ISO 8601, RFC 2822/HTTP-date, and unix epoch seconds.
- Returns empty string if unparseable.
- """
- if ts is None:
- return ""
- if isinstance(ts, (int, float)):
- try:
- dt = datetime.fromtimestamp(float(ts), tz=timezone.utc)
- return dt.strftime("%Y-%m-%dT%H:%M:%S+00:00")
- except Exception:
- return ""
- text = str(ts).strip()
- if not text:
- return ""
- # Try ISO 8601
- try:
- dt = datetime.fromisoformat(text.replace("Z", "+00:00"))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- except Exception:
- pass
- # Try RFC 2822 / HTTP-date
- try:
- dt = parsedate_to_datetime(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- except Exception:
- pass
- # Return original if we can't parse — better than losing data
- return text
- def _read_ts(ts: Any) -> float | None:
- """Parse a stored, already-normalized ISO 8601 UTC timestamp to a unix float.
- All payload.timestamp / payload.first_seen / payload.last_updated values
- are guaranteed YYYY-MM-DDTHH:MM:SS+00:00 at write time (enforced by
- sanitize_cluster_payload → _normalize_ts). Only datetime.fromisoformat is
- needed here. Do NOT add RFC 2822 / parsedate_to_datetime fallbacks — if
- this function can't parse a stored timestamp it means the normalization
- pipeline has a bug that should be fixed there, not papered over here.
- """
- if not ts:
- return None
- try:
- dt = datetime.fromisoformat(str(ts).strip())
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).timestamp()
- except Exception:
- return None
- @dataclass
- class ClusterRow:
- cluster_id: str
- topic: str
- payload: dict
- updated_at: datetime
- META_LAST_PRUNE_AT = "last_prune_at"
- # For internal use — canonical name is article_key(article) from article_identity
- _article_key = article_key
- def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
- seen: set[str] = set()
- out: list[dict[str, Any]] = []
- for article in articles:
- key = _article_key(article)
- if key in seen:
- continue
- seen.add(key)
- out.append(article)
- return out
- def _has_valid_entity_resolutions(resolutions: Any, entities: list[str]) -> bool:
- if not isinstance(resolutions, list):
- return False
- if len(resolutions) != len(entities):
- return False
- for res in resolutions:
- if not isinstance(res, dict):
- return False
- if not res.get("normalized") or not res.get("canonical_label"):
- return False
- return True
- def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bool = True) -> dict[str, Any]:
- """Normalize cluster payload so every stored payload is internally consistent."""
- out = dict(cluster)
- raw_articles = out.get("articles", []) or []
- articles = [a for a in raw_articles if isinstance(a, dict)]
- # Normalize article timestamps, clamping future dates to now.
- now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- for a in articles:
- if "timestamp" in a:
- a["timestamp"] = _normalize_ts(a["timestamp"])
- if a["timestamp"] > now_str:
- a["timestamp"] = now_str
- out["articles"] = _dedup_articles(articles)
- raw_entities = out.get("entities", []) or []
- entities = normalize_entities(raw_entities)
- out["entities"] = entities
- # Normalize cluster-level timestamps, clamping future dates to now.
- now_str = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- for field in ("timestamp", "last_updated", "first_seen"):
- if field in out and out[field]:
- ts = _normalize_ts(out[field])
- if ts > now_str:
- ts = now_str
- out[field] = ts
- # Ensure timestamp is always present for the generated column index.
- # Prefer existing timestamp, then first_seen, then last_updated, then now.
- for src in ("timestamp", "first_seen", "last_updated"):
- if out.get(src):
- out.setdefault("timestamp", out[src])
- break
- if not out.get("timestamp"):
- out["timestamp"] = datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%S+00:00")
- # Preserve enrichment metadata across sanitization so the
- # poller's enriched_at cache check works on DB round-trips.
- for _fld in ("enriched_at", "enrichment_failed_at", "enrichment_retry_count"):
- if _fld in cluster:
- out.setdefault(_fld, cluster[_fld])
- if not include_resolutions:
- return out
- resolutions = out.get("entityResolutions", None)
- if entities:
- if not _has_valid_entity_resolutions(resolutions, entities):
- out["entityResolutions"] = [resolve_entity_via_trends(e) for e in entities]
- else:
- # Keep the empty case explicit and stable.
- out["entityResolutions"] = []
- return out
- class SQLiteClusterStore:
- def __init__(self, db_path: str | Path):
- self.db_path = str(db_path)
- self._init_db()
- def _conn(self) -> sqlite3.Connection:
- return sqlite3.connect(self.db_path)
- def _init_db(self) -> None:
- Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
- with self._conn() as conn:
- conn.execute("PRAGMA journal_mode=WAL")
- conn.execute("PRAGMA synchronous=NORMAL")
- conn.execute("PRAGMA busy_timeout=5000")
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS clusters (
- cluster_id TEXT PRIMARY KEY,
- topic TEXT NOT NULL,
- payload TEXT NOT NULL,
- updated_at TEXT NOT NULL,
- summary_payload TEXT,
- summary_updated_at TEXT
- )
- """
- )
- # If the table already exists without the summary columns,
- # add them (SQLite-friendly incremental migrations).
- for col_def in [
- "summary_payload TEXT",
- "summary_updated_at TEXT",
- ]:
- col = col_def.split()[0]
- try:
- conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}")
- except sqlite3.OperationalError:
- pass
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_clusters_updated_at ON clusters(updated_at)"
- )
- # Generated column for indexed event-time filtering (VIRTUAL for compatibility)
- try:
- conn.execute(
- "ALTER TABLE clusters ADD COLUMN payload_ts "
- "GENERATED ALWAYS AS (json_extract(payload, '$.timestamp')) VIRTUAL"
- )
- except sqlite3.OperationalError:
- pass # column already exists
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_clusters_payload_ts ON clusters(payload_ts)"
- )
- # Junction tables for SQL-level entity/keyword search
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS cluster_entities (
- cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
- entity TEXT NOT NULL,
- PRIMARY KEY (cluster_id, entity)
- )
- """
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_cluster_entities_entity ON cluster_entities(entity)"
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS cluster_keywords (
- cluster_id TEXT NOT NULL REFERENCES clusters(cluster_id) ON DELETE CASCADE,
- keyword TEXT NOT NULL,
- PRIMARY KEY (cluster_id, keyword)
- )
- """
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_cluster_keywords_keyword ON cluster_keywords(keyword)"
- )
- # Seen-articles table: tracks every article_key that has been
- # clustered, so the poller can skip already-processed articles
- # entirely (no re-clustering, no re-enrichment).
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS seen_articles (
- article_key TEXT PRIMARY KEY,
- cluster_id TEXT NOT NULL,
- first_seen TEXT NOT NULL,
- url TEXT NOT NULL DEFAULT '',
- content_hash TEXT NOT NULL DEFAULT ''
- )
- """
- )
- # Migration: add content_hash column if missing (existing DBs)
- try:
- conn.execute("ALTER TABLE seen_articles ADD COLUMN content_hash TEXT NOT NULL DEFAULT ''")
- except sqlite3.OperationalError:
- pass # column already exists
- try:
- cur = conn.execute("PRAGMA table_info(entity_metadata)")
- cols = [row[1] for row in cur.fetchall()]
- if cols and "entity_id" not in cols:
- conn.execute("DROP TABLE entity_metadata")
- except sqlite3.OperationalError:
- pass
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS entity_metadata (
- entity_id TEXT PRIMARY KEY,
- normalized_label TEXT NOT NULL,
- canonical_label TEXT,
- mid TEXT,
- sources_json TEXT,
- updated_at TEXT,
- last_requested_at TEXT
- )
- """
- )
- conn.execute(
- "CREATE UNIQUE INDEX IF NOT EXISTS idx_entity_metadata_mid ON entity_metadata(mid) WHERE mid IS NOT NULL"
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS feed_state (
- feed_key TEXT PRIMARY KEY,
- last_hash TEXT NOT NULL,
- last_item_count INTEGER,
- enabled INTEGER DEFAULT 1,
- updated_at TEXT NOT NULL
- )
- """
- )
- try:
- conn.execute("ALTER TABLE feed_state ADD COLUMN enabled INTEGER DEFAULT 1")
- except sqlite3.OperationalError:
- pass
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS meta (
- key TEXT PRIMARY KEY,
- value TEXT NOT NULL
- )
- """
- )
- # Seed site_config from .env / defaults (no-op if already populated)
- from news_mcp.site_config import seed_site_config
- seeded = seed_site_config(conn)
- if seeded:
- import logging
- logging.getLogger(__name__).info("site_config: seeded %d rows from env/defaults", seeded)
- def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
- now = datetime.now(timezone.utc)
- with self._conn() as conn:
- for c in clusters:
- c = sanitize_cluster_payload(c)
- cluster_id = c["cluster_id"]
- payload = json.dumps(c, ensure_ascii=False)
- conn.execute(
- "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
- "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
- (cluster_id, topic, payload, now.isoformat()),
- )
- # Populate junction tables for SQL-level entity/keyword search.
- # DELETE first so re-enrichment replaces stale entries.
- conn.execute("DELETE FROM cluster_entities WHERE cluster_id=?", (cluster_id,))
- conn.execute("DELETE FROM cluster_keywords WHERE cluster_id=?", (cluster_id,))
- for entity in c.get("entities", []):
- ent_norm = str(entity).strip().lower()
- if ent_norm:
- conn.execute(
- "INSERT OR IGNORE INTO cluster_entities(cluster_id, entity) VALUES(?, ?)",
- (cluster_id, ent_norm),
- )
- for kw in c.get("keywords", []):
- kw_norm = str(kw).strip().lower()
- if kw_norm:
- conn.execute(
- "INSERT OR IGNORE INTO cluster_keywords(cluster_id, keyword) VALUES(?, ?)",
- (cluster_id, kw_norm),
- )
- # Record every article in seen_articles so the poller can
- # skip already-processed articles on future cycles.
- for art in c.get("articles", []):
- akey = _article_key(art)
- if akey:
- art_url = str(art.get("url") or "").strip()
- from news_mcp.article_identity import article_content_hash as _chash
- ahash = _chash(art)
- conn.execute(
- "INSERT INTO seen_articles(article_key, cluster_id, first_seen, url, content_hash) VALUES(?,?,?,?,?) "
- "ON CONFLICT(article_key) DO UPDATE SET cluster_id=excluded.cluster_id, url=excluded.url, content_hash=excluded.content_hash",
- (akey, cluster_id, now.isoformat(), art_url, ahash),
- )
- def upsert_cluster_summary(
- self,
- cluster_id: str,
- summary_payload: dict,
- ) -> None:
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- "UPDATE clusters SET summary_payload=?, summary_updated_at=? WHERE cluster_id=?",
- (
- json.dumps(summary_payload, ensure_ascii=False),
- now,
- cluster_id,
- ),
- )
- def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None:
- cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
- cutoff_iso = cutoff.isoformat()
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT summary_payload, summary_updated_at FROM clusters "
- "WHERE cluster_id=? AND summary_updated_at >= ?",
- (cluster_id, cutoff_iso),
- )
- row = cur.fetchone()
- if not row or not row[0]:
- return None
- return json.loads(row[0])
- def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
- """Return newest clusters by event timestamp, filtered via SQL payload_ts index."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat()
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE topic=? AND payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?",
- (topic, cutoff, int(limit)),
- )
- return [json.loads(r[0]) for r in cur.fetchall()]
- def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
- """Return newest clusters across all topics, filtered via SQL payload_ts index."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).isoformat()
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE payload_ts >= ? ORDER BY payload_ts DESC LIMIT ?",
- (cutoff, int(limit)),
- )
- return [json.loads(r[0]) for r in cur.fetchall()]
- def get_cluster_by_id(self, cluster_id: str) -> dict | None:
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE cluster_id=?",
- (cluster_id,),
- )
- row = cur.fetchone()
- return json.loads(row[0]) if row else None
- def get_feed_hash(self, feed_key: str) -> str | None:
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT last_hash FROM feed_state WHERE feed_key=?",
- (feed_key,),
- )
- row = cur.fetchone()
- return row[0] if row else None
- def set_feed_hash(self, feed_key: str, last_hash: str) -> None:
- self.set_feed_state(feed_key, last_hash, None)
- def set_feed_state(self, feed_key: str, last_hash: str, last_item_count: int | None = None) -> None:
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- "INSERT INTO feed_state(feed_key, last_hash, last_item_count, updated_at) VALUES(?,?,?,?) "
- "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, last_item_count=excluded.last_item_count, updated_at=excluded.updated_at",
- (feed_key, last_hash, last_item_count, now),
- )
- def get_feed_state(self, feed_key: str) -> dict | None:
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?",
- (feed_key,),
- )
- row = cur.fetchone()
- if not row:
- return None
- return {"last_hash": row[0], "updated_at": row[1]}
- def get_all_feed_states(self) -> list[dict[str, Any]]:
- """All feed_state rows."""
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"
- )
- return [
- {
- "feed_key": row[0],
- "last_hash": row[1],
- "last_item_count": row[2],
- "enabled": bool(row[3]),
- "updated_at": row[4],
- }
- for row in cur.fetchall()
- ]
- def feed_ensure_seeded(self, feed_urls: list[str]) -> None:
- """Insert any feed URLs not yet present in feed_state (enabled by default)."""
- if not feed_urls:
- return
- with self._conn() as conn:
- for url in feed_urls:
- conn.execute(
- "INSERT OR IGNORE INTO feed_state(feed_key, last_hash, last_item_count, enabled, updated_at) VALUES(?, '', 0, 1, '')",
- (url,),
- )
- def get_feed_state_list(self) -> list[dict[str, Any]]:
- """Return all feeds with enabled/disabled status for the dashboard."""
- return self.get_all_feed_states()
- def set_feed_enabled(self, feed_url: str, enabled: bool) -> bool:
- """Toggle a feed's enabled state. Returns True if the feed existed and was updated."""
- with self._conn() as conn:
- cur = conn.execute(
- "UPDATE feed_state SET enabled = ? WHERE feed_key = ?",
- (1 if enabled else 0, feed_url),
- )
- return cur.rowcount > 0
- def get_enabled_feed_urls(self, feed_urls: list[str]) -> list[str]:
- """From a list of configured feed URLs, return only those that are enabled in feed_state.
- URLs not yet present in feed_state are seeded as enabled.
- """
- self.feed_ensure_seeded(feed_urls)
- with self._conn() as conn:
- placeholders = ",".join("?" for _ in feed_urls) if feed_urls else ""
- if placeholders:
- cur = conn.execute(
- f"SELECT feed_key FROM feed_state WHERE feed_key IN ({placeholders}) AND enabled = 1",
- feed_urls,
- )
- else:
- cur = conn.execute("SELECT feed_key FROM feed_state WHERE enabled = 1")
- return [row[0] for row in cur.fetchall()]
- # ------------------------------------------------------------------ #
- # Seen-articles: skip already-processed articles at ingestion
- # ------------------------------------------------------------------ #
- def filter_already_seen(self, articles: list[dict]) -> tuple[list[dict], list[dict], list[dict]]:
- """Split articles into (new, seen_unchanged, seen_changed) based on seen_articles.
- Uses _article_key (URL) as identity and article_content_hash to detect
- in-place content updates (e.g. a stub that gets fleshed out).
- Returns:
- new_articles: never seen before → full clustering + enrichment
- seen_unchanged: same key, same content hash → skip entirely
- seen_changed: same key, different content hash → re-cluster to update
- the existing cluster payload (will trigger re-enrichment)
- """
- from news_mcp.article_identity import article_content_hash as _content_hash
- keys = [_article_key(a) for a in articles]
- if not keys:
- return [], [], []
- with self._conn() as conn:
- placeholders = ",".join("?" for _ in keys)
- cur = conn.execute(
- f"SELECT article_key, content_hash FROM seen_articles WHERE article_key IN ({placeholders})",
- keys,
- )
- seen_map = {row[0]: row[1] for row in cur.fetchall()}
- new_articles = []
- seen_unchanged = []
- seen_changed = []
- for art, key in zip(articles, keys):
- if key not in seen_map:
- new_articles.append(art)
- elif seen_map[key] == _content_hash(art):
- seen_unchanged.append(art)
- elif not seen_map[key]:
- # Stored hash is empty (pre-migration row). Treat as unchanged
- # and let the next upsert populate the real hash.
- seen_unchanged.append(art)
- else:
- seen_changed.append(art)
- return new_articles, seen_unchanged, seen_changed
- def get_seen_article_count(self) -> int:
- """Total rows in seen_articles (for diagnostics)."""
- with self._conn() as conn:
- return conn.execute("SELECT count(*) FROM seen_articles").fetchone()[0]
- def get_meta(self, key: str) -> str | None:
- with self._conn() as conn:
- cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,))
- row = cur.fetchone()
- return row[0] if row else None
- def set_meta(self, key: str, value: str) -> None:
- with self._conn() as conn:
- conn.execute(
- "INSERT INTO meta(key, value) VALUES(?, ?) "
- "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
- (key, value),
- )
- def upsert_entity_metadata(
- self,
- normalized_label: str,
- canonical_label: str | None = None,
- mid: str | None = None,
- sources: list[str] | None = None,
- ) -> None:
- normalized_label = str(normalized_label or "").strip()
- if not normalized_label:
- return
- canonical_label = str(canonical_label).strip() if canonical_label else None
- mid = str(mid).strip() if mid else None
- entity_id = mid if mid else f"local:{normalized_label}"
- sources = sorted({s for s in (sources or []) if s})
- sources_json = json.dumps(sources, ensure_ascii=False)
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- """
- INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at)
- VALUES(?,?,?,?,?,?)
- ON CONFLICT(entity_id) DO UPDATE SET
- canonical_label=excluded.canonical_label,
- mid=excluded.mid,
- sources_json=excluded.sources_json,
- updated_at=excluded.updated_at
- """,
- (entity_id, normalized_label, canonical_label, mid, sources_json, now),
- )
- def get_entity_metadata(self, normalized_label: str) -> dict[str, Any] | None:
- normalized_label = str(normalized_label or "").strip()
- if not normalized_label:
- return None
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at "
- "FROM entity_metadata "
- "WHERE normalized_label=? "
- "ORDER BY CASE WHEN mid IS NOT NULL THEN 0 ELSE 1 END, "
- "COALESCE(last_requested_at, updated_at) DESC, updated_at DESC "
- "LIMIT 1",
- (normalized_label,),
- )
- row = cur.fetchone()
- if not row:
- return None
- sources = []
- if row[2]:
- try:
- sources = json.loads(row[2])
- except Exception:
- sources = []
- return {
- "entity_id": row[0],
- "normalized_label": normalized_label,
- "canonical_label": row[1],
- "mid": row[2],
- "sources": sources,
- "updated_at": row[4],
- "last_requested_at": row[5],
- }
- def record_entity_request(self, normalized_label: str, mid: str | None = None) -> None:
- normalized_label = str(normalized_label or "").strip()
- if not normalized_label:
- return
- mid = str(mid).strip() if mid else None
- entity_id = mid if mid else f"local:{normalized_label}"
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- """
- INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at, last_requested_at)
- VALUES(?,?,?,?,?,?,?)
- ON CONFLICT(entity_id) DO UPDATE SET
- last_requested_at=excluded.last_requested_at
- """,
- (entity_id, normalized_label, None, mid, json.dumps([], ensure_ascii=False), now, now),
- )
- def get_failed_enrichment_clusters(self, max_retries: int = 3) -> list[dict]:
- """Return clusters whose last enrichment failed and haven't exceeded max_retries.
- These are candidates for re-enrichment on the next polling cycle.
- """
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters "
- "WHERE json_extract(payload, '$.enrichment_failed_at') IS NOT NULL "
- "AND (json_extract(payload, '$.enrichment_retry_count') IS NULL "
- " OR json_extract(payload, '$.enrichment_retry_count') < ?) "
- "ORDER BY updated_at DESC LIMIT 500",
- (max_retries,),
- )
- rows = cur.fetchall()
- return [json.loads(r[0]) for r in rows]
- def prune_clusters(self, retention_days: float) -> int:
- retention_days = float(retention_days)
- if retention_days <= 0:
- return 0
- cutoff = datetime.now(timezone.utc) - timedelta(days=retention_days)
- cutoff_iso = cutoff.isoformat()
- pruned_at = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- # Use payload_ts (event time from payload.timestamp) not updated_at
- # (row write time). updated_at is refreshed on every upsert, which
- # would keep re-ingested old articles alive forever.
- # Collect cluster_ids being pruned so we can clean seen_articles.
- pruned_ids = [
- row[0] for row in conn.execute(
- "SELECT cluster_id FROM clusters WHERE payload_ts < ?", (cutoff_iso,)
- ).fetchall()
- ]
- cur = conn.execute("DELETE FROM clusters WHERE payload_ts < ?", (cutoff_iso,))
- deleted = int(cur.rowcount or 0)
- # Clean up seen_articles rows pointing to pruned clusters
- if pruned_ids:
- placeholders = ",".join("?" for _ in pruned_ids)
- conn.execute(
- f"DELETE FROM seen_articles WHERE cluster_id IN ({placeholders})",
- pruned_ids,
- )
- conn.execute(
- "INSERT INTO meta(key, value) VALUES(?, ?) "
- "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
- (META_LAST_PRUNE_AT, pruned_at),
- )
- return deleted
- def prune_if_due(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
- retention_days = float(retention_days)
- interval_hours = float(interval_hours)
- if (not pruning_enabled) or retention_days <= 0:
- return {
- "enabled": bool(pruning_enabled),
- "deleted": 0,
- "due": False,
- "retention_days": retention_days,
- "interval_hours": interval_hours,
- "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
- }
- last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
- now = datetime.now(timezone.utc)
- due = True
- if last_prune_at:
- try:
- last_dt = datetime.fromisoformat(last_prune_at)
- due = now - last_dt >= timedelta(hours=max(1.0, interval_hours))
- except Exception:
- due = True
- if not due:
- return {
- "enabled": True,
- "deleted": 0,
- "due": False,
- "retention_days": retention_days,
- "interval_hours": interval_hours,
- "last_prune_at": last_prune_at,
- }
- deleted = self.prune_clusters(retention_days)
- last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
- return {
- "enabled": True,
- "deleted": deleted,
- "due": True,
- "retention_days": retention_days,
- "interval_hours": interval_hours,
- "last_prune_at": last_prune_at,
- }
- def get_prune_state(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
- return {
- "enabled": bool(pruning_enabled),
- "retention_days": float(retention_days),
- "interval_hours": float(interval_hours),
- "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
- }
- # ------------------------------------------------------------------
- # Dashboard query helpers
- # ------------------------------------------------------------------
- def get_dashboard_stats(self) -> dict[str, Any]:
- """Aggregate status numbers for the health panel."""
- with self._conn() as conn:
- total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
- total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
- cluster_entities = conn.execute(
- "SELECT COUNT(DISTINCT e.value) "
- "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
- ).fetchone()[0]
- topic_counts = dict(conn.execute(
- "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
- ).fetchall())
- last_refresh = self.get_meta("last_refresh_at")
- feeds = {}
- for row in conn.execute(
- "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"
- ):
- feeds[row[0]] = {
- "last_hash": row[1], "last_item_count": row[2],
- "enabled": bool(row[3]), "updated_at": row[4],
- }
- # Freshness: did a refresh happen recently? (within 2x the configured interval)
- fresh = False
- if last_refresh:
- try:
- dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
- fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
- except Exception:
- pass
- last_prune = self.get_meta(META_LAST_PRUNE_AT)
- prune_state = self.get_prune_state(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- return {
- "total_clusters": total_clusters,
- "total_entities": total_entities,
- "cluster_entities": cluster_entities,
- "clusters_by_topic": topic_counts,
- "last_refresh_at": last_refresh,
- "last_prune_at": last_prune,
- "data_fresh": fresh,
- "feeds": feeds,
- "feed_count": len(feeds),
- "seen_article_count": self.get_seen_article_count(),
- "prune_state": prune_state,
- }
- def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
- """Dashboard-optimized cluster detail fetch."""
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
- )
- row = cur.fetchone()
- if not row:
- return None
- c = json.loads(row[0])
- summary = None
- if c.get("summary_payload"):
- try:
- summary = json.loads(c["summary_payload"])
- except Exception:
- pass
- return {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline", ""),
- "summary": c.get("summary", ""),
- "topic": c.get("topic", ""),
- "sentiment": c.get("sentiment", "neutral"),
- "sentimentScore": c.get("sentimentScore"),
- "importance": c.get("importance", 0),
- "entities": c.get("entities", []),
- "entityResolutions": c.get("entityResolutions", []),
- "keywords": c.get("keywords", []),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp", ""),
- "first_seen": c.get("first_seen", ""),
- "last_updated": c.get("last_updated", ""),
- "article_count": len(c.get("articles", [])),
- "articles": c.get("articles", []),
- "summary_text": summary.get("mergedSummary", "") if summary else "",
- "key_facts": summary.get("keyFacts", []) if summary else [],
- }
- # ── Paginated Clusters ────────────────────────────────────────────
- def get_clusters_page(
- self,
- topic: str | None = None,
- hours: float = 24,
- limit: int = 20,
- offset: int = 0,
- ) -> dict[str, Any]:
- """Paginated cluster listing filtered by SQL payload_ts index.
- Returns {"clusters": [...], "total": int}.
- """
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
- params: list = [cutoff]
- if topic and topic != "all":
- query += " AND topic = ?"
- params.append(topic)
- total = self._conn().execute(
- f"SELECT COUNT(*) FROM ({query})", params
- ).fetchone()[0]
- query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?"
- params.extend([limit, offset])
- with self._conn() as conn:
- rows = conn.execute(query, params).fetchall()
- return {
- "clusters": [
- {
- "cluster_id": c.get("cluster_id", ""),
- "headline": c.get("headline", ""),
- "topic": c.get("topic", ""),
- "sentiment": c.get("sentiment", "neutral"),
- "sentimentScore": c.get("sentimentScore"),
- "importance": c.get("importance", 0),
- "entities": c.get("entities", []),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp", ""),
- "keywords": c.get("keywords", []),
- "article_count": len(c.get("articles", [])),
- }
- for c in [json.loads(r[0]) for r in rows]
- ],
- "total": total,
- }
- # ── Sentiment Series ──────────────────────────────────────────────
- def get_sentiment_series(
- self,
- topic: str | None = None,
- hours: float = 24,
- bucket_hours: float = 1,
- ) -> list[dict[str, Any]]:
- """Sentiment score averaged per time bucket. Filters by payload_ts SQL index."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
- params: list = [cutoff]
- if topic and topic != "all":
- query += " AND topic = ?"
- params.append(topic)
- query += " ORDER BY payload_ts ASC"
- with self._conn() as conn:
- rows = conn.execute(query, params).fetchall()
- buckets: dict[datetime, list[float]] = {}
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- ts_str = c.get("timestamp")
- score = c.get("sentimentScore")
- if not ts_str or score is None:
- continue
- dt = datetime.fromisoformat(str(ts_str).strip())
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- dt = dt.astimezone(timezone.utc)
- bucket_key = dt.replace(minute=0, second=0, microsecond=0)
- if bucket_hours > 1:
- bucket_key = bucket_key.replace(
- hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
- )
- buckets.setdefault(bucket_key, []).append(float(score))
- return [
- {
- "time": bucket_key.isoformat(),
- "avg_sentiment": round(sum(scores) / len(scores), 3),
- "count": len(scores),
- "min": round(min(scores), 3),
- "max": round(max(scores), 3),
- }
- for bucket_key, scores in sorted(buckets.items())
- ]
- # ── Entity / Keyword Frequencies ──────────────────────────────────
- def get_entity_frequencies(
- self,
- hours: float = 24,
- limit: int = 30,
- ) -> list[dict[str, Any]]:
- """Top entities by mention count, using SQL junction table + payload_ts index."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- with self._conn() as conn:
- rows = conn.execute(
- """\
- SELECT ce.entity, COUNT(*) as cnt
- FROM cluster_entities ce
- JOIN clusters c ON c.cluster_id = ce.cluster_id
- WHERE c.payload_ts >= ?
- GROUP BY ce.entity
- ORDER BY cnt DESC
- LIMIT ?
- """,
- (cutoff, limit),
- ).fetchall()
- result: list[dict[str, Any]] = []
- for label, count in rows:
- meta = self.get_entity_metadata(label)
- result.append({
- "label": label,
- "count": count,
- "canonical_label": meta["canonical_label"] if meta else label,
- "mid": meta["mid"] if meta else None,
- })
- return result
- def get_keyword_frequencies(
- self,
- hours: float = 24,
- limit: int = 30,
- ) -> list[dict[str, Any]]:
- """Top keywords by mention count, using SQL junction table + payload_ts index.
- Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
- """
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- _topic_labels = {"crypto", "macro", "regulation", "ai", "other"}
- with self._conn() as conn:
- rows = conn.execute(
- """\
- SELECT ck.keyword, COUNT(*) as cnt
- FROM cluster_keywords ck
- JOIN clusters c ON c.cluster_id = ck.cluster_id
- WHERE c.payload_ts >= ?
- GROUP BY ck.keyword
- ORDER BY cnt DESC
- LIMIT ?
- """,
- (cutoff, limit),
- ).fetchall()
- return [
- {"label": label, "count": count}
- for label, count in rows
- if label.lower() not in _topic_labels
- ]
- # ── Junction-Table Entity / Keyword Cluster Search ────────────────
- def get_clusters_by_entity(
- self,
- entity: str,
- hours: float = 168,
- limit: int = 50,
- offset: int = 0,
- ) -> dict[str, Any]:
- """Return clusters matching an entity, SQL-level filter via junction table.
- Returns {"entity": ..., "clusters": [...], "total": int, "hours": float}.
- """
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- entity_norm = entity.strip().lower()
- with self._conn() as conn:
- total = conn.execute(
- "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
- "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
- "WHERE c.payload_ts >= ? AND ce.entity = ?",
- (cutoff, entity_norm),
- ).fetchone()[0]
- rows = conn.execute(
- "SELECT DISTINCT c.payload FROM clusters c "
- "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
- "WHERE c.payload_ts >= ? AND ce.entity = ? "
- "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
- (cutoff, entity_norm, limit, offset),
- ).fetchall()
- return {
- "entity": entity_norm,
- "clusters": [json.loads(r[0]) for r in rows],
- "total": total,
- "hours": hours,
- }
- def get_clusters_by_keyword(
- self,
- keyword: str,
- hours: float = 168,
- limit: int = 50,
- offset: int = 0,
- ) -> dict[str, Any]:
- """Return clusters matching a keyword, SQL-level filter via junction table."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- kw_norm = keyword.strip().lower()
- with self._conn() as conn:
- total = conn.execute(
- "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
- "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
- "WHERE c.payload_ts >= ? AND ck.keyword = ?",
- (cutoff, kw_norm),
- ).fetchone()[0]
- rows = conn.execute(
- "SELECT DISTINCT c.payload FROM clusters c "
- "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
- "WHERE c.payload_ts >= ? AND ck.keyword = ? "
- "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
- (cutoff, kw_norm, limit, offset),
- ).fetchall()
- return {
- "keyword": kw_norm,
- "clusters": [json.loads(r[0]) for r in rows],
- "total": total,
- "hours": hours,
- }
- def get_clusters_by_entity_or_keyword(
- self,
- query_terms: set[str],
- hours: float,
- limit: int,
- ) -> list[dict]:
- """Search clusters by matching ANY query term against entities OR keywords.
- Uses SQL-level junction-table filtering — no row-limit blind spot.
- Returns clusters sorted by recency.
- """
- terms = [q.strip().lower() for q in query_terms if q.strip()]
- if not terms:
- return []
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- placeholders = ",".join("?" for _ in terms)
- with self._conn() as conn:
- rows = conn.execute(
- f"SELECT DISTINCT c.payload FROM clusters c "
- f"LEFT JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
- f"LEFT JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
- f"WHERE c.payload_ts >= ? "
- f" AND (ce.entity IN ({placeholders}) OR ck.keyword IN ({placeholders})) "
- f"ORDER BY c.payload_ts DESC LIMIT ?",
- (cutoff, *terms, *terms, limit),
- ).fetchall()
- return [json.loads(r[0]) for r in rows]
|