|
@@ -12,6 +12,7 @@ from email.utils import parsedate_to_datetime
|
|
|
from news_mcp.config import (
|
|
from news_mcp.config import (
|
|
|
NEWS_PRUNE_INTERVAL_HOURS,
|
|
NEWS_PRUNE_INTERVAL_HOURS,
|
|
|
NEWS_PRUNING_ENABLED,
|
|
NEWS_PRUNING_ENABLED,
|
|
|
|
|
+ NEWS_REFRESH_INTERVAL_SECONDS,
|
|
|
NEWS_RETENTION_DAYS,
|
|
NEWS_RETENTION_DAYS,
|
|
|
)
|
|
)
|
|
|
from news_mcp.entity_normalize import normalize_entities
|
|
from news_mcp.entity_normalize import normalize_entities
|
|
@@ -671,13 +672,34 @@ class SQLiteClusterStore:
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
|
|
total_clusters = conn.execute("SELECT COUNT(*) FROM clusters").fetchone()[0]
|
|
|
total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
|
|
total_entities = conn.execute("SELECT COUNT(*) FROM entity_metadata").fetchone()[0]
|
|
|
|
|
+ cluster_entities = conn.execute(
|
|
|
|
|
+ "SELECT COUNT(DISTINCT e.value) "
|
|
|
|
|
+ "FROM clusters, json_each(clusters.payload, '$.entities') AS e"
|
|
|
|
|
+ ).fetchone()[0]
|
|
|
topic_counts = dict(conn.execute(
|
|
topic_counts = dict(conn.execute(
|
|
|
"SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
|
|
"SELECT topic, COUNT(*) FROM clusters GROUP BY topic"
|
|
|
).fetchall())
|
|
).fetchall())
|
|
|
last_refresh = self.get_meta("last_refresh_at")
|
|
last_refresh = self.get_meta("last_refresh_at")
|
|
|
feeds = {}
|
|
feeds = {}
|
|
|
- for row in conn.execute("SELECT feed_key, last_hash, last_item_count, updated_at FROM feed_state"):
|
|
|
|
|
- feeds[row[0]] = {"last_hash": row[1], "last_item_count": row[2], "updated_at": row[3]}
|
|
|
|
|
|
|
+ for row in conn.execute(
|
|
|
|
|
+ "SELECT feed_key, last_hash, last_item_count, enabled, updated_at FROM feed_state ORDER BY updated_at DESC"
|
|
|
|
|
+ ):
|
|
|
|
|
+ feeds[row[0]] = {
|
|
|
|
|
+ "last_hash": row[1], "last_item_count": row[2],
|
|
|
|
|
+ "enabled": bool(row[3]), "updated_at": row[4],
|
|
|
|
|
+ }
|
|
|
|
|
+ # Freshness: did a refresh happen recently? (within 2x the configured interval)
|
|
|
|
|
+ fresh = False
|
|
|
|
|
+ if last_refresh:
|
|
|
|
|
+ try:
|
|
|
|
|
+ dt = datetime.fromisoformat(last_refresh.replace("Z", "+00:00"))
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ age_hours = (datetime.now(timezone.utc) - dt).total_seconds() / 3600
|
|
|
|
|
+ fresh = age_hours < max(1.0, NEWS_REFRESH_INTERVAL_SECONDS / 3600) * 2
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ pass
|
|
|
|
|
+
|
|
|
last_prune = self.get_meta(META_LAST_PRUNE_AT)
|
|
last_prune = self.get_meta(META_LAST_PRUNE_AT)
|
|
|
prune_state = self.get_prune_state(
|
|
prune_state = self.get_prune_state(
|
|
|
pruning_enabled=NEWS_PRUNING_ENABLED,
|
|
pruning_enabled=NEWS_PRUNING_ENABLED,
|
|
@@ -687,11 +709,14 @@ class SQLiteClusterStore:
|
|
|
return {
|
|
return {
|
|
|
"total_clusters": total_clusters,
|
|
"total_clusters": total_clusters,
|
|
|
"total_entities": total_entities,
|
|
"total_entities": total_entities,
|
|
|
|
|
+ "cluster_entities": cluster_entities,
|
|
|
"clusters_by_topic": topic_counts,
|
|
"clusters_by_topic": topic_counts,
|
|
|
"last_refresh_at": last_refresh,
|
|
"last_refresh_at": last_refresh,
|
|
|
"last_prune_at": last_prune,
|
|
"last_prune_at": last_prune,
|
|
|
- "prune_state": prune_state,
|
|
|
|
|
|
|
+ "data_fresh": fresh,
|
|
|
"feeds": feeds,
|
|
"feeds": feeds,
|
|
|
|
|
+ "feed_count": len(feeds),
|
|
|
|
|
+ "prune_state": prune_state,
|
|
|
}
|
|
}
|
|
|
|
|
|
|
|
def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
|
|
def get_cluster_detail(self, cluster_id: str) -> dict[str, Any] | None:
|
|
@@ -730,3 +755,269 @@ class SQLiteClusterStore:
|
|
|
"summary_text": summary.get("mergedSummary", "") if summary else "",
|
|
"summary_text": summary.get("mergedSummary", "") if summary else "",
|
|
|
"key_facts": summary.get("keyFacts", []) if summary else [],
|
|
"key_facts": summary.get("keyFacts", []) if summary else [],
|
|
|
}
|
|
}
|
|
|
|
|
+
|
|
|
|
|
+ # ── Paginated Clusters ────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ def get_clusters_page(
|
|
|
|
|
+ self,
|
|
|
|
|
+ topic: str | None = None,
|
|
|
|
|
+ hours: float = 24,
|
|
|
|
|
+ limit: int = 20,
|
|
|
|
|
+ offset: int = 0,
|
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
|
+ """Paginated cluster listing filtered by SQL payload_ts index.
|
|
|
|
|
+
|
|
|
|
|
+ Returns {"clusters": [...], "total": int}.
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
|
|
|
|
|
+ params: list = [cutoff]
|
|
|
|
|
+ if topic and topic != "all":
|
|
|
|
|
+ query += " AND topic = ?"
|
|
|
|
|
+ params.append(topic)
|
|
|
|
|
+ total = self._conn().execute(
|
|
|
|
|
+ f"SELECT COUNT(*) FROM ({query})", params
|
|
|
|
|
+ ).fetchone()[0]
|
|
|
|
|
+ query += " ORDER BY payload_ts DESC LIMIT ? OFFSET ?"
|
|
|
|
|
+ params.extend([limit, offset])
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ rows = conn.execute(query, params).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "clusters": [
|
|
|
|
|
+ {
|
|
|
|
|
+ "cluster_id": c.get("cluster_id", ""),
|
|
|
|
|
+ "headline": c.get("headline", ""),
|
|
|
|
|
+ "topic": c.get("topic", ""),
|
|
|
|
|
+ "sentiment": c.get("sentiment", "neutral"),
|
|
|
|
|
+ "sentimentScore": c.get("sentimentScore"),
|
|
|
|
|
+ "importance": c.get("importance", 0),
|
|
|
|
|
+ "entities": c.get("entities", []),
|
|
|
|
|
+ "sources": c.get("sources", []),
|
|
|
|
|
+ "timestamp": c.get("timestamp", ""),
|
|
|
|
|
+ "keywords": c.get("keywords", []),
|
|
|
|
|
+ "article_count": len(c.get("articles", [])),
|
|
|
|
|
+ }
|
|
|
|
|
+ for c in [json.loads(r[0]) for r in rows]
|
|
|
|
|
+ ],
|
|
|
|
|
+ "total": total,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ # ── Sentiment Series ──────────────────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ def get_sentiment_series(
|
|
|
|
|
+ self,
|
|
|
|
|
+ topic: str | None = None,
|
|
|
|
|
+ hours: float = 24,
|
|
|
|
|
+ bucket_hours: float = 1,
|
|
|
|
|
+ ) -> list[dict[str, Any]]:
|
|
|
|
|
+ """Sentiment score averaged per time bucket. Filters by payload_ts SQL index."""
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ query = "SELECT payload FROM clusters WHERE payload_ts >= ?"
|
|
|
|
|
+ params: list = [cutoff]
|
|
|
|
|
+ if topic and topic != "all":
|
|
|
|
|
+ query += " AND topic = ?"
|
|
|
|
|
+ params.append(topic)
|
|
|
|
|
+ query += " ORDER BY payload_ts ASC"
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ rows = conn.execute(query, params).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ buckets: dict[datetime, list[float]] = {}
|
|
|
|
|
+ for (payload_text,) in rows:
|
|
|
|
|
+ c = json.loads(payload_text)
|
|
|
|
|
+ ts_str = c.get("timestamp")
|
|
|
|
|
+ score = c.get("sentimentScore")
|
|
|
|
|
+ if not ts_str or score is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+ dt = datetime.fromisoformat(str(ts_str).strip())
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ dt = dt.astimezone(timezone.utc)
|
|
|
|
|
+ bucket_key = dt.replace(minute=0, second=0, microsecond=0)
|
|
|
|
|
+ if bucket_hours > 1:
|
|
|
|
|
+ bucket_key = bucket_key.replace(
|
|
|
|
|
+ hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
|
|
|
|
|
+ )
|
|
|
|
|
+ buckets.setdefault(bucket_key, []).append(float(score))
|
|
|
|
|
+
|
|
|
|
|
+ return [
|
|
|
|
|
+ {
|
|
|
|
|
+ "time": bucket_key.isoformat(),
|
|
|
|
|
+ "avg_sentiment": round(sum(scores) / len(scores), 3),
|
|
|
|
|
+ "count": len(scores),
|
|
|
|
|
+ "min": round(min(scores), 3),
|
|
|
|
|
+ "max": round(max(scores), 3),
|
|
|
|
|
+ }
|
|
|
|
|
+ for bucket_key, scores in sorted(buckets.items())
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ # ── Entity / Keyword Frequencies ──────────────────────────────────
|
|
|
|
|
+
|
|
|
|
|
+ def get_entity_frequencies(
|
|
|
|
|
+ self,
|
|
|
|
|
+ hours: float = 24,
|
|
|
|
|
+ limit: int = 30,
|
|
|
|
|
+ ) -> list[dict[str, Any]]:
|
|
|
|
|
+ """Top entities by mention count, using SQL junction table + payload_ts index."""
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ rows = conn.execute(
|
|
|
|
|
+ """\
|
|
|
|
|
+ SELECT ce.entity, COUNT(*) as cnt
|
|
|
|
|
+ FROM cluster_entities ce
|
|
|
|
|
+ JOIN clusters c ON c.cluster_id = ce.cluster_id
|
|
|
|
|
+ WHERE c.payload_ts >= ?
|
|
|
|
|
+ GROUP BY ce.entity
|
|
|
|
|
+ ORDER BY cnt DESC
|
|
|
|
|
+ LIMIT ?
|
|
|
|
|
+ """,
|
|
|
|
|
+ (cutoff, limit),
|
|
|
|
|
+ ).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ result: list[dict[str, Any]] = []
|
|
|
|
|
+ for label, count in rows:
|
|
|
|
|
+ meta = self.get_entity_metadata(label)
|
|
|
|
|
+ result.append({
|
|
|
|
|
+ "label": label,
|
|
|
|
|
+ "count": count,
|
|
|
|
|
+ "canonical_label": meta["canonical_label"] if meta else label,
|
|
|
|
|
+ "mid": meta["mid"] if meta else None,
|
|
|
|
|
+ })
|
|
|
|
|
+ return result
|
|
|
|
|
+
|
|
|
|
|
+ def get_keyword_frequencies(
|
|
|
|
|
+ self,
|
|
|
|
|
+ hours: float = 24,
|
|
|
|
|
+ limit: int = 30,
|
|
|
|
|
+ ) -> list[dict[str, Any]]:
|
|
|
|
|
+ """Top keywords by mention count, using SQL junction table + payload_ts index.
|
|
|
|
|
+
|
|
|
|
|
+ Excludes DEFAULT_TOPICS labels (crypto, macro, regulation, ai, other).
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ _topic_labels = {"crypto", "macro", "regulation", "ai", "other"}
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ rows = conn.execute(
|
|
|
|
|
+ """\
|
|
|
|
|
+ SELECT ck.keyword, COUNT(*) as cnt
|
|
|
|
|
+ FROM cluster_keywords ck
|
|
|
|
|
+ JOIN clusters c ON c.cluster_id = ck.cluster_id
|
|
|
|
|
+ WHERE c.payload_ts >= ?
|
|
|
|
|
+ GROUP BY ck.keyword
|
|
|
|
|
+ ORDER BY cnt DESC
|
|
|
|
|
+ LIMIT ?
|
|
|
|
|
+ """,
|
|
|
|
|
+ (cutoff, limit),
|
|
|
|
|
+ ).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ return [
|
|
|
|
|
+ {"label": label, "count": count}
|
|
|
|
|
+ for label, count in rows
|
|
|
|
|
+ if label.lower() not in _topic_labels
|
|
|
|
|
+ ]
|
|
|
|
|
+
|
|
|
|
|
+ # ── Junction-Table Entity / Keyword Cluster Search ────────────────
|
|
|
|
|
+
|
|
|
|
|
+ def get_clusters_by_entity(
|
|
|
|
|
+ self,
|
|
|
|
|
+ entity: str,
|
|
|
|
|
+ hours: float = 168,
|
|
|
|
|
+ limit: int = 50,
|
|
|
|
|
+ offset: int = 0,
|
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
|
+ """Return clusters matching an entity, SQL-level filter via junction table.
|
|
|
|
|
+
|
|
|
|
|
+ Returns {"entity": ..., "clusters": [...], "total": int, "hours": float}.
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ entity_norm = entity.strip().lower()
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ total = conn.execute(
|
|
|
|
|
+ "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
|
|
|
|
|
+ "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
|
|
|
|
|
+ "WHERE c.payload_ts >= ? AND ce.entity = ?",
|
|
|
|
|
+ (cutoff, entity_norm),
|
|
|
|
|
+ ).fetchone()[0]
|
|
|
|
|
+
|
|
|
|
|
+ rows = conn.execute(
|
|
|
|
|
+ "SELECT DISTINCT c.payload FROM clusters c "
|
|
|
|
|
+ "JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
|
|
|
|
|
+ "WHERE c.payload_ts >= ? AND ce.entity = ? "
|
|
|
|
|
+ "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
|
|
|
|
|
+ (cutoff, entity_norm, limit, offset),
|
|
|
|
|
+ ).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "entity": entity_norm,
|
|
|
|
|
+ "clusters": [json.loads(r[0]) for r in rows],
|
|
|
|
|
+ "total": total,
|
|
|
|
|
+ "hours": hours,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def get_clusters_by_keyword(
|
|
|
|
|
+ self,
|
|
|
|
|
+ keyword: str,
|
|
|
|
|
+ hours: float = 168,
|
|
|
|
|
+ limit: int = 50,
|
|
|
|
|
+ offset: int = 0,
|
|
|
|
|
+ ) -> dict[str, Any]:
|
|
|
|
|
+ """Return clusters matching a keyword, SQL-level filter via junction table."""
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ kw_norm = keyword.strip().lower()
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ total = conn.execute(
|
|
|
|
|
+ "SELECT COUNT(DISTINCT c.cluster_id) FROM clusters c "
|
|
|
|
|
+ "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
|
|
|
|
|
+ "WHERE c.payload_ts >= ? AND ck.keyword = ?",
|
|
|
|
|
+ (cutoff, kw_norm),
|
|
|
|
|
+ ).fetchone()[0]
|
|
|
|
|
+
|
|
|
|
|
+ rows = conn.execute(
|
|
|
|
|
+ "SELECT DISTINCT c.payload FROM clusters c "
|
|
|
|
|
+ "JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
|
|
|
|
|
+ "WHERE c.payload_ts >= ? AND ck.keyword = ? "
|
|
|
|
|
+ "ORDER BY c.payload_ts DESC LIMIT ? OFFSET ?",
|
|
|
|
|
+ (cutoff, kw_norm, limit, offset),
|
|
|
|
|
+ ).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ return {
|
|
|
|
|
+ "keyword": kw_norm,
|
|
|
|
|
+ "clusters": [json.loads(r[0]) for r in rows],
|
|
|
|
|
+ "total": total,
|
|
|
|
|
+ "hours": hours,
|
|
|
|
|
+ }
|
|
|
|
|
+
|
|
|
|
|
+ def get_clusters_by_entity_or_keyword(
|
|
|
|
|
+ self,
|
|
|
|
|
+ query_terms: set[str],
|
|
|
|
|
+ hours: float,
|
|
|
|
|
+ limit: int,
|
|
|
|
|
+ ) -> list[dict]:
|
|
|
|
|
+ """Search clusters by matching ANY query term against entities OR keywords.
|
|
|
|
|
+
|
|
|
|
|
+ Uses SQL-level junction-table filtering — no row-limit blind spot.
|
|
|
|
|
+ Returns clusters sorted by recency.
|
|
|
|
|
+ """
|
|
|
|
|
+ terms = [q.strip().lower() for q in query_terms if q.strip()]
|
|
|
|
|
+ if not terms:
|
|
|
|
|
+ return []
|
|
|
|
|
+ cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
+ placeholders = ",".join("?" for _ in terms)
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ rows = conn.execute(
|
|
|
|
|
+ f"SELECT DISTINCT c.payload FROM clusters c "
|
|
|
|
|
+ f"LEFT JOIN cluster_entities ce ON c.cluster_id = ce.cluster_id "
|
|
|
|
|
+ f"LEFT JOIN cluster_keywords ck ON c.cluster_id = ck.cluster_id "
|
|
|
|
|
+ f"WHERE c.payload_ts >= ? "
|
|
|
|
|
+ f" AND (ce.entity IN ({placeholders}) OR ck.keyword IN ({placeholders})) "
|
|
|
|
|
+ f"ORDER BY c.payload_ts DESC LIMIT ?",
|
|
|
|
|
+ (cutoff, *terms, *terms, limit),
|
|
|
|
|
+ ).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ return [json.loads(r[0]) for r in rows]
|