| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669670671672673674675676677678679680681682683684685686687688689690691692693694695696697698699700701702703704705706707708709710711712713714715716717718719720721722723724725726 |
- from __future__ import annotations
- import json
- import sqlite3
- from dataclasses import dataclass
- from datetime import datetime, timezone, timedelta
- from pathlib import Path
- from typing import Any
- from urllib.parse import urlparse
- from email.utils import parsedate_to_datetime
- from news_mcp.config import (
- NEWS_PRUNE_INTERVAL_HOURS,
- NEWS_PRUNING_ENABLED,
- NEWS_RETENTION_DAYS,
- )
- from news_mcp.entity_normalize import normalize_entities
- from news_mcp.trends_resolution import resolve_entity_via_trends
- @dataclass
- class ClusterRow:
- cluster_id: str
- topic: str
- payload: dict
- updated_at: datetime
- META_LAST_PRUNE_AT = "last_prune_at"
- def _article_key(article: dict[str, Any]) -> str:
- url = str(article.get("url") or "").strip()
- if not url:
- return str(article.get("title") or "")
- try:
- parsed = urlparse(url)
- parts = [p for p in parsed.path.split("/") if p]
- if parts:
- return parts[-1]
- except Exception:
- pass
- return url
- def _dedup_articles(articles: list[dict[str, Any]]) -> list[dict[str, Any]]:
- seen: set[str] = set()
- out: list[dict[str, Any]] = []
- for article in articles:
- key = _article_key(article)
- if key in seen:
- continue
- seen.add(key)
- out.append(article)
- return out
- def _has_valid_entity_resolutions(resolutions: Any, entities: list[str]) -> bool:
- if not isinstance(resolutions, list):
- return False
- if len(resolutions) != len(entities):
- return False
- for res in resolutions:
- if not isinstance(res, dict):
- return False
- if not res.get("normalized") or not res.get("canonical_label"):
- return False
- return True
- def sanitize_cluster_payload(cluster: dict[str, Any], *, include_resolutions: bool = True) -> dict[str, Any]:
- """Normalize cluster payload so every stored payload is internally consistent."""
- out = dict(cluster)
- raw_articles = out.get("articles", []) or []
- articles = [a for a in raw_articles if isinstance(a, dict)]
- out["articles"] = _dedup_articles(articles)
- raw_entities = out.get("entities", []) or []
- entities = normalize_entities(raw_entities)
- out["entities"] = entities
- if not include_resolutions:
- return out
- resolutions = out.get("entityResolutions", None)
- if entities:
- if not _has_valid_entity_resolutions(resolutions, entities):
- out["entityResolutions"] = [resolve_entity_via_trends(e) for e in entities]
- else:
- # Keep the empty case explicit and stable.
- out["entityResolutions"] = []
- return out
- class SQLiteClusterStore:
- def __init__(self, db_path: str | Path):
- self.db_path = str(db_path)
- self._init_db()
- def _conn(self) -> sqlite3.Connection:
- return sqlite3.connect(self.db_path)
- def _init_db(self) -> None:
- Path(self.db_path).parent.mkdir(parents=True, exist_ok=True)
- with self._conn() as conn:
- conn.execute("PRAGMA journal_mode=WAL")
- conn.execute("PRAGMA synchronous=NORMAL")
- conn.execute("PRAGMA busy_timeout=5000")
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS clusters (
- cluster_id TEXT PRIMARY KEY,
- topic TEXT NOT NULL,
- payload TEXT NOT NULL,
- updated_at TEXT NOT NULL,
- summary_payload TEXT,
- summary_updated_at TEXT
- )
- """
- )
- # If the table already exists without the summary columns,
- # add them (SQLite-friendly incremental migrations).
- for col_def in [
- "summary_payload TEXT",
- "summary_updated_at TEXT",
- ]:
- col = col_def.split()[0]
- try:
- conn.execute(f"ALTER TABLE clusters ADD COLUMN {col_def}")
- except sqlite3.OperationalError:
- pass
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_clusters_topic ON clusters(topic)"
- )
- conn.execute(
- "CREATE INDEX IF NOT EXISTS idx_clusters_updated_at ON clusters(updated_at)"
- )
- try:
- cur = conn.execute("PRAGMA table_info(entity_metadata)")
- cols = [row[1] for row in cur.fetchall()]
- if cols and "entity_id" not in cols:
- conn.execute("DROP TABLE entity_metadata")
- except sqlite3.OperationalError:
- pass
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS entity_metadata (
- entity_id TEXT PRIMARY KEY,
- normalized_label TEXT NOT NULL,
- canonical_label TEXT,
- mid TEXT,
- sources_json TEXT,
- updated_at TEXT,
- last_requested_at TEXT
- )
- """
- )
- conn.execute(
- "CREATE UNIQUE INDEX IF NOT EXISTS idx_entity_metadata_mid ON entity_metadata(mid) WHERE mid IS NOT NULL"
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS feed_state (
- feed_key TEXT PRIMARY KEY,
- last_hash TEXT NOT NULL,
- updated_at TEXT NOT NULL
- )
- """
- )
- conn.execute(
- """
- CREATE TABLE IF NOT EXISTS meta (
- key TEXT PRIMARY KEY,
- value TEXT NOT NULL
- )
- """
- )
- def upsert_clusters(self, clusters: list[dict], topic: str) -> None:
- now = datetime.now(timezone.utc)
- with self._conn() as conn:
- for c in clusters:
- c = sanitize_cluster_payload(c)
- cluster_id = c["cluster_id"]
- payload = json.dumps(c, ensure_ascii=False)
- conn.execute(
- "INSERT INTO clusters(cluster_id, topic, payload, updated_at) VALUES(?,?,?,?) "
- "ON CONFLICT(cluster_id) DO UPDATE SET topic=excluded.topic, payload=excluded.payload, updated_at=excluded.updated_at",
- (cluster_id, topic, payload, now.isoformat()),
- )
- def upsert_cluster_summary(
- self,
- cluster_id: str,
- summary_payload: dict,
- ) -> None:
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- "UPDATE clusters SET summary_payload=?, summary_updated_at=? WHERE cluster_id=?",
- (
- json.dumps(summary_payload, ensure_ascii=False),
- now,
- cluster_id,
- ),
- )
- def get_cluster_summary(self, cluster_id: str, ttl_hours: float) -> dict | None:
- cutoff = datetime.now(timezone.utc) - timedelta(hours=ttl_hours)
- cutoff_iso = cutoff.isoformat()
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT summary_payload, summary_updated_at FROM clusters "
- "WHERE cluster_id=? AND summary_updated_at >= ?",
- (cluster_id, cutoff_iso),
- )
- row = cur.fetchone()
- if not row or not row[0]:
- return None
- return json.loads(row[0])
- def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
- """Return newest clusters by *their own* timestamp.
- Filtering/sorting by the DB row's `updated_at` can drift away from the
- actual event time in `payload.timestamp`.
- """
- cutoff = datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))
- cutoff_ts = cutoff.timestamp()
- def _parse_payload_ts(ts: Any) -> float | None:
- if not ts:
- return None
- if isinstance(ts, (int, float)):
- return float(ts)
- text = str(ts).strip()
- try:
- dt = datetime.fromisoformat(text.replace('Z', '+00:00'))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).timestamp()
- except Exception:
- pass
- try:
- dt = parsedate_to_datetime(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).timestamp()
- except Exception:
- return None
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE topic=? ORDER BY updated_at DESC",
- (topic,),
- )
- candidates = [json.loads(r[0]) for r in cur.fetchall()]
- filtered: list[dict] = []
- for c in candidates:
- ts = _parse_payload_ts(c.get("timestamp"))
- if ts is None:
- continue
- if ts >= cutoff_ts:
- filtered.append(c)
- filtered.sort(key=lambda c: _parse_payload_ts(c.get("timestamp")) or 0.0, reverse=True)
- return filtered[: int(limit)]
- def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
- cutoff = datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))
- cutoff_ts = cutoff.timestamp()
- def _parse_payload_ts(ts: Any) -> float | None:
- if not ts:
- return None
- if isinstance(ts, (int, float)):
- return float(ts)
- text = str(ts).strip()
- try:
- dt = datetime.fromisoformat(text.replace('Z', '+00:00'))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).timestamp()
- except Exception:
- pass
- try:
- dt = parsedate_to_datetime(text)
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc).timestamp()
- except Exception:
- return None
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters ORDER BY updated_at DESC",
- )
- candidates = [json.loads(r[0]) for r in cur.fetchall()]
- filtered: list[dict] = []
- for c in candidates:
- ts = _parse_payload_ts(c.get("timestamp"))
- if ts is None:
- continue
- if ts >= cutoff_ts:
- filtered.append(c)
- filtered.sort(key=lambda c: _parse_payload_ts(c.get("timestamp")) or 0.0, reverse=True)
- return filtered[: int(limit)]
- def get_cluster_by_id(self, cluster_id: str) -> dict | None:
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE cluster_id=?",
- (cluster_id,),
- )
- row = cur.fetchone()
- return json.loads(row[0]) if row else None
- def get_feed_hash(self, feed_key: str) -> str | None:
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT last_hash FROM feed_state WHERE feed_key=?",
- (feed_key,),
- )
- row = cur.fetchone()
- return row[0] if row else None
- def set_feed_hash(self, feed_key: str, last_hash: str) -> None:
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- "INSERT INTO feed_state(feed_key, last_hash, updated_at) VALUES(?,?,?) "
- "ON CONFLICT(feed_key) DO UPDATE SET last_hash=excluded.last_hash, updated_at=excluded.updated_at",
- (feed_key, last_hash, now),
- )
- def get_feed_state(self, feed_key: str) -> dict | None:
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT last_hash, updated_at FROM feed_state WHERE feed_key=?",
- (feed_key,),
- )
- row = cur.fetchone()
- if not row:
- return None
- return {"last_hash": row[0], "updated_at": row[1]}
- def get_all_feed_states(self) -> list[dict[str, Any]]:
- """All feed_state rows.
- The live writer keys feed state as ``newsfeeds:<sha1(comma_joined_urls)>``,
- so a hardcoded literal lookup never matches when more than one feed is
- configured. Use this for surfacing health information.
- """
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT feed_key, last_hash, updated_at FROM feed_state ORDER BY updated_at DESC"
- )
- return [
- {"feed_key": row[0], "last_hash": row[1], "updated_at": row[2]}
- for row in cur.fetchall()
- ]
- def get_meta(self, key: str) -> str | None:
- with self._conn() as conn:
- cur = conn.execute("SELECT value FROM meta WHERE key=?", (key,))
- row = cur.fetchone()
- return row[0] if row else None
- def set_meta(self, key: str, value: str) -> None:
- with self._conn() as conn:
- conn.execute(
- "INSERT INTO meta(key, value) VALUES(?, ?) "
- "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
- (key, value),
- )
- def upsert_entity_metadata(
- self,
- normalized_label: str,
- canonical_label: str | None = None,
- mid: str | None = None,
- sources: list[str] | None = None,
- ) -> None:
- normalized_label = str(normalized_label or "").strip()
- if not normalized_label:
- return
- canonical_label = str(canonical_label).strip() if canonical_label else None
- mid = str(mid).strip() if mid else None
- entity_id = mid if mid else f"local:{normalized_label}"
- sources = sorted({s for s in (sources or []) if s})
- sources_json = json.dumps(sources, ensure_ascii=False)
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- """
- INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at)
- VALUES(?,?,?,?,?,?)
- ON CONFLICT(entity_id) DO UPDATE SET
- canonical_label=excluded.canonical_label,
- mid=excluded.mid,
- sources_json=excluded.sources_json,
- updated_at=excluded.updated_at
- """,
- (entity_id, normalized_label, canonical_label, mid, sources_json, now),
- )
- def get_entity_metadata(self, normalized_label: str) -> dict[str, Any] | None:
- normalized_label = str(normalized_label or "").strip()
- if not normalized_label:
- return None
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT entity_id, canonical_label, mid, sources_json, updated_at, last_requested_at "
- "FROM entity_metadata "
- "WHERE normalized_label=? "
- "ORDER BY CASE WHEN mid IS NOT NULL THEN 0 ELSE 1 END, "
- "COALESCE(last_requested_at, updated_at) DESC, updated_at DESC "
- "LIMIT 1",
- (normalized_label,),
- )
- row = cur.fetchone()
- if not row:
- return None
- sources = []
- if row[2]:
- try:
- sources = json.loads(row[2])
- except Exception:
- sources = []
- return {
- "entity_id": row[0],
- "normalized_label": normalized_label,
- "canonical_label": row[1],
- "mid": row[2],
- "sources": sources,
- "updated_at": row[4],
- "last_requested_at": row[5],
- }
- def record_entity_request(self, normalized_label: str, mid: str | None = None) -> None:
- normalized_label = str(normalized_label or "").strip()
- if not normalized_label:
- return
- mid = str(mid).strip() if mid else None
- entity_id = mid if mid else f"local:{normalized_label}"
- now = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- conn.execute(
- """
- INSERT INTO entity_metadata(entity_id, normalized_label, canonical_label, mid, sources_json, updated_at, last_requested_at)
- VALUES(?,?,?,?,?,?,?)
- ON CONFLICT(entity_id) DO UPDATE SET
- last_requested_at=excluded.last_requested_at
- """,
- (entity_id, normalized_label, None, mid, json.dumps([], ensure_ascii=False), now, now),
- )
- def prune_clusters(self, retention_days: float) -> int:
- retention_days = float(retention_days)
- if retention_days <= 0:
- return 0
- cutoff = datetime.now(timezone.utc) - timedelta(days=retention_days)
- cutoff_iso = cutoff.isoformat()
- pruned_at = datetime.now(timezone.utc).isoformat()
- with self._conn() as conn:
- cur = conn.execute("DELETE FROM clusters WHERE updated_at < ?", (cutoff_iso,))
- deleted = int(cur.rowcount or 0)
- conn.execute(
- "INSERT INTO meta(key, value) VALUES(?, ?) "
- "ON CONFLICT(key) DO UPDATE SET value=excluded.value",
- (META_LAST_PRUNE_AT, pruned_at),
- )
- return deleted
- def prune_if_due(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
- retention_days = float(retention_days)
- interval_hours = float(interval_hours)
- if (not pruning_enabled) or retention_days <= 0:
- return {
- "enabled": bool(pruning_enabled),
- "deleted": 0,
- "due": False,
- "retention_days": retention_days,
- "interval_hours": interval_hours,
- "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
- }
- last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
- now = datetime.now(timezone.utc)
- due = True
- if last_prune_at:
- try:
- last_dt = datetime.fromisoformat(last_prune_at)
- due = now - last_dt >= timedelta(hours=max(1.0, interval_hours))
- except Exception:
- due = True
- if not due:
- return {
- "enabled": True,
- "deleted": 0,
- "due": False,
- "retention_days": retention_days,
- "interval_hours": interval_hours,
- "last_prune_at": last_prune_at,
- }
- deleted = self.prune_clusters(retention_days)
- last_prune_at = self.get_meta(META_LAST_PRUNE_AT)
- return {
- "enabled": True,
- "deleted": deleted,
- "due": True,
- "retention_days": retention_days,
- "interval_hours": interval_hours,
- "last_prune_at": last_prune_at,
- }
- def get_prune_state(self, pruning_enabled: bool, retention_days: float, interval_hours: float = 24.0) -> dict[str, Any]:
- return {
- "enabled": bool(pruning_enabled),
- "retention_days": float(retention_days),
- "interval_hours": float(interval_hours),
- "last_prune_at": self.get_meta(META_LAST_PRUNE_AT),
- }
- # ------------------------------------------------------------------
- # Dashboard query helpers
- # ------------------------------------------------------------------
- def get_dashboard_stats(self) -> dict[str, Any]:
- """Aggregate status numbers for the health panel."""
- with self._conn() as conn:
- total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
- total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
- topic_counts = dict(conn.execute(
- "SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
- ).fetchall())
- last_refresh = self.get_meta("last_refresh_at")
- feeds = {}
- for row in conn.execute("SELECT feed_key, last_hash, updated_at FROM feed_state"):
- feeds[row[0]] = {"last_hash": row[1], "updated_at": row[2]}
- last_prune = self.get_meta(META_LAST_PRUNE_AT)
- prune_state = self.get_prune_state(
- pruning_enabled=NEWS_PRUNING_ENABLED,
- retention_days=NEWS_RETENTION_DAYS,
- interval_hours=NEWS_PRUNE_INTERVAL_HOURS,
- )
- return {
- "total_clusters": total_clusters,
- "total_entities": total_entities,
- "clusters_by_topic": topic_counts,
- "last_refresh_at": last_refresh,
- "last_prune_at": last_prune,
- "prune_state": prune_state,
- "feeds": feeds,
- }
- def get_clusters_page(
- self,
- topic: str | None = None,
- hours: float = 24,
- limit: int = 20,
- offset: int = 0,
- ) -> list[dict[str, Any]]:
- """Paginated cluster listing for the dashboard."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- query = "SELECT payload FROM clusters WHERE updated_at >= ?"
- params: list = [cutoff]
- if topic and topic != "all":
- query += " AND topic = ?"
- params.append(topic)
- query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?"
- params.extend([limit, offset])
- with self._conn() as conn:
- cur = conn.execute(query, params)
- rows = cur.fetchall()
- clusters: list[dict[str, Any]] = []
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- clusters.append({
- "cluster_id": c.get("cluster_id", ""),
- "headline": c.get("headline", ""),
- "topic": c.get("topic", ""),
- "sentiment": c.get("sentiment", "neutral"),
- "sentimentScore": c.get("sentimentScore"),
- "importance": c.get("importance", 0),
- "entities": c.get("entities", []),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp", ""),
- "keywords": c.get("keywords", []),
- "article_count": len(c.get("articles", [])),
- })
- return clusters
- def get_sentiment_series(
- self,
- topic: str | None = None,
- hours: float = 24,
- bucket_hours: float = 1,
- ) -> list[dict[str, Any]]:
- """Sentiment score averaged per time bucket."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- query = "SELECT payload FROM clusters WHERE updated_at >= ?"
- params: list = [cutoff]
- if topic and topic != "all":
- query += " AND topic = ?"
- params.append(topic)
- query += " ORDER BY updated_at ASC"
- with self._conn() as conn:
- cur = conn.execute(query, params)
- rows = cur.fetchall()
- def _parse_ts(ts: Any) -> datetime | None:
- if not ts:
- return None
- try:
- dt = datetime.fromisoformat(str(ts).replace("Z", "+00:00"))
- if dt.tzinfo is None:
- dt = dt.replace(tzinfo=timezone.utc)
- return dt.astimezone(timezone.utc)
- except Exception:
- return None
- buckets: dict[datetime, list[float]] = {}
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- dt = _parse_ts(c.get("timestamp"))
- score = c.get("sentimentScore")
- if dt is None or score is None:
- continue
- bucket_key = dt.replace(minute=0, second=0, microsecond=0)
- if bucket_hours > 1:
- bucket_key = bucket_key.replace(
- hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
- )
- buckets.setdefault(bucket_key, []).append(float(score))
- series: list[dict[str, Any]] = []
- for bucket_key in sorted(buckets):
- scores = buckets[bucket_key]
- series.append({
- "time": bucket_key.isoformat(),
- "avg_sentiment": round(sum(scores) / len(scores), 3),
- "count": len(scores),
- "min": round(min(scores), 3),
- "max": round(max(scores), 3),
- })
- return series
- def get_entity_frequencies(
- self,
- hours: float = 24,
- limit: int = 30,
- ) -> list[dict[str, Any]]:
- """Top entities by mention count in recent clusters."""
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE updated_at >= ? ORDER BY updated_at DESC LIMIT 500",
- (cutoff,),
- )
- rows = cur.fetchall()
- counter: dict[str, int] = {}
- for (payload_text,) in rows:
- c = json.loads(payload_text)
- for ent in c.get("entities", []):
- counter[ent] = counter.get(ent, 0) + 1
- sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
- result: list[dict[str, Any]] = []
- for label, count in sorted_entities:
- meta = self.get_entity_metadata(label)
- result.append({
- "label": label,
- "count": count,
- "canonical_label": meta["canonical_label"] if meta else label,
- "mid": meta["mid"] if meta else None,
- })
- return result
- def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
- """Dashboard-optimized cluster detail fetch."""
- with self._conn() as conn:
- cur = conn.execute(
- "SELECT payload FROM clusters WHERE cluster_id = ?", (cluster_id,)
- )
- row = cur.fetchone()
- if not row:
- return None
- c = json.loads(row[0])
- summary = None
- if c.get("summary_payload"):
- try:
- summary = json.loads(c["summary_payload"])
- except Exception:
- pass
- return {
- "cluster_id": c.get("cluster_id"),
- "headline": c.get("headline", ""),
- "summary": c.get("summary", ""),
- "topic": c.get("topic", ""),
- "sentiment": c.get("sentiment", "neutral"),
- "sentimentScore": c.get("sentimentScore"),
- "importance": c.get("importance", 0),
- "entities": c.get("entities", []),
- "entityResolutions": c.get("entityResolutions", []),
- "keywords": c.get("keywords", []),
- "sources": c.get("sources", []),
- "timestamp": c.get("timestamp", ""),
- "first_seen": c.get("first_seen", ""),
- "last_updated": c.get("last_updated", ""),
- "article_count": len(c.get("articles", [])),
- "articles": c.get("articles", []),
- "summary_text": summary.get("mergedSummary", "") if summary else "",
- "key_facts": summary.get("keyFacts", []) if summary else [],
- }
|