|
@@ -55,6 +55,27 @@ def _normalize_ts(ts: Any) -> str:
|
|
|
return text
|
|
return text
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+def _read_ts(ts: Any) -> float | None:
|
|
|
|
|
+ """Parse a stored, already-normalized ISO 8601 UTC timestamp to a unix float.
|
|
|
|
|
+
|
|
|
|
|
+ All payload.timestamp / payload.first_seen / payload.last_updated values
|
|
|
|
|
+ are guaranteed YYYY-MM-DDTHH:MM:SS+00:00 at write time (enforced by
|
|
|
|
|
+ sanitize_cluster_payload → _normalize_ts). Only datetime.fromisoformat is
|
|
|
|
|
+ needed here. Do NOT add RFC 2822 / parsedate_to_datetime fallbacks — if
|
|
|
|
|
+ this function can't parse a stored timestamp it means the normalization
|
|
|
|
|
+ pipeline has a bug that should be fixed there, not papered over here.
|
|
|
|
|
+ """
|
|
|
|
|
+ if not ts:
|
|
|
|
|
+ return None
|
|
|
|
|
+ try:
|
|
|
|
|
+ dt = datetime.fromisoformat(str(ts).strip())
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ return dt.astimezone(timezone.utc).timestamp()
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return None
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@dataclass
|
|
@dataclass
|
|
|
class ClusterRow:
|
|
class ClusterRow:
|
|
|
cluster_id: str
|
|
cluster_id: str
|
|
@@ -278,35 +299,12 @@ class SQLiteClusterStore:
|
|
|
return json.loads(row[0])
|
|
return json.loads(row[0])
|
|
|
|
|
|
|
|
def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
|
|
def get_latest_clusters(self, topic: str, ttl_hours: float, limit: int) -> list[dict]:
|
|
|
- """Return newest clusters by *their own* timestamp.
|
|
|
|
|
|
|
+ """Return newest clusters by their own event timestamp (payload.timestamp).
|
|
|
|
|
|
|
|
- Filtering/sorting by the DB row's `updated_at` can drift away from the
|
|
|
|
|
- actual event time in `payload.timestamp`.
|
|
|
|
|
|
|
+ payload.timestamp is guaranteed ISO 8601 UTC — use _read_ts, not raw
|
|
|
|
|
+ JSON parsing with RFC 2822 fallbacks.
|
|
|
"""
|
|
"""
|
|
|
-
|
|
|
|
|
- cutoff = datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))
|
|
|
|
|
- cutoff_ts = cutoff.timestamp()
|
|
|
|
|
-
|
|
|
|
|
- def _parse_payload_ts(ts: Any) -> float | None:
|
|
|
|
|
- if not ts:
|
|
|
|
|
- return None
|
|
|
|
|
- if isinstance(ts, (int, float)):
|
|
|
|
|
- return float(ts)
|
|
|
|
|
- text = str(ts).strip()
|
|
|
|
|
- try:
|
|
|
|
|
- dt = datetime.fromisoformat(text.replace('Z', '+00:00'))
|
|
|
|
|
- if dt.tzinfo is None:
|
|
|
|
|
- dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
- return dt.astimezone(timezone.utc).timestamp()
|
|
|
|
|
- except Exception:
|
|
|
|
|
- pass
|
|
|
|
|
- try:
|
|
|
|
|
- dt = parsedate_to_datetime(text)
|
|
|
|
|
- if dt.tzinfo is None:
|
|
|
|
|
- dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
- return dt.astimezone(timezone.utc).timestamp()
|
|
|
|
|
- except Exception:
|
|
|
|
|
- return None
|
|
|
|
|
|
|
+ cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).timestamp()
|
|
|
|
|
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
cur = conn.execute(
|
|
cur = conn.execute(
|
|
@@ -315,57 +313,24 @@ class SQLiteClusterStore:
|
|
|
)
|
|
)
|
|
|
candidates = [json.loads(r[0]) for r in cur.fetchall()]
|
|
candidates = [json.loads(r[0]) for r in cur.fetchall()]
|
|
|
|
|
|
|
|
- filtered: list[dict] = []
|
|
|
|
|
- for c in candidates:
|
|
|
|
|
- ts = _parse_payload_ts(c.get("timestamp"))
|
|
|
|
|
- if ts is None:
|
|
|
|
|
- continue
|
|
|
|
|
- if ts >= cutoff_ts:
|
|
|
|
|
- filtered.append(c)
|
|
|
|
|
-
|
|
|
|
|
- filtered.sort(key=lambda c: _parse_payload_ts(c.get("timestamp")) or 0.0, reverse=True)
|
|
|
|
|
|
|
+ filtered = [c for c in candidates if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
|
|
|
|
|
+ filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
|
|
|
return filtered[: int(limit)]
|
|
return filtered[: int(limit)]
|
|
|
|
|
|
|
|
def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
|
|
def get_latest_clusters_all_topics(self, ttl_hours: float, limit: int) -> list[dict]:
|
|
|
- cutoff = datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))
|
|
|
|
|
- cutoff_ts = cutoff.timestamp()
|
|
|
|
|
|
|
+ """Return newest clusters across all topics by event timestamp.
|
|
|
|
|
|
|
|
- def _parse_payload_ts(ts: Any) -> float | None:
|
|
|
|
|
- if not ts:
|
|
|
|
|
- return None
|
|
|
|
|
- if isinstance(ts, (int, float)):
|
|
|
|
|
- return float(ts)
|
|
|
|
|
- text = str(ts).strip()
|
|
|
|
|
- try:
|
|
|
|
|
- dt = datetime.fromisoformat(text.replace('Z', '+00:00'))
|
|
|
|
|
- if dt.tzinfo is None:
|
|
|
|
|
- dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
- return dt.astimezone(timezone.utc).timestamp()
|
|
|
|
|
- except Exception:
|
|
|
|
|
- pass
|
|
|
|
|
- try:
|
|
|
|
|
- dt = parsedate_to_datetime(text)
|
|
|
|
|
- if dt.tzinfo is None:
|
|
|
|
|
- dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
- return dt.astimezone(timezone.utc).timestamp()
|
|
|
|
|
- except Exception:
|
|
|
|
|
- return None
|
|
|
|
|
|
|
+ payload.timestamp is guaranteed ISO 8601 UTC — use _read_ts, not raw
|
|
|
|
|
+ JSON parsing with RFC 2822 fallbacks.
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=float(ttl_hours))).timestamp()
|
|
|
|
|
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
- cur = conn.execute(
|
|
|
|
|
- "SELECT payload FROM clusters ORDER BY updated_at DESC",
|
|
|
|
|
- )
|
|
|
|
|
|
|
+ cur = conn.execute("SELECT payload FROM clusters ORDER BY updated_at DESC")
|
|
|
candidates = [json.loads(r[0]) for r in cur.fetchall()]
|
|
candidates = [json.loads(r[0]) for r in cur.fetchall()]
|
|
|
|
|
|
|
|
- filtered: list[dict] = []
|
|
|
|
|
- for c in candidates:
|
|
|
|
|
- ts = _parse_payload_ts(c.get("timestamp"))
|
|
|
|
|
- if ts is None:
|
|
|
|
|
- continue
|
|
|
|
|
- if ts >= cutoff_ts:
|
|
|
|
|
- filtered.append(c)
|
|
|
|
|
-
|
|
|
|
|
- filtered.sort(key=lambda c: _parse_payload_ts(c.get("timestamp")) or 0.0, reverse=True)
|
|
|
|
|
|
|
+ filtered = [c for c in candidates if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
|
|
|
|
|
+ filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
|
|
|
return filtered[: int(limit)]
|
|
return filtered[: int(limit)]
|
|
|
|
|
|
|
|
def get_cluster_by_id(self, cluster_id: str) -> dict | None:
|
|
def get_cluster_by_id(self, cluster_id: str) -> dict | None:
|
|
@@ -687,23 +652,29 @@ class SQLiteClusterStore:
|
|
|
limit: int = 20,
|
|
limit: int = 20,
|
|
|
offset: int = 0,
|
|
offset: int = 0,
|
|
|
) -> list[dict[str, Any]]:
|
|
) -> list[dict[str, Any]]:
|
|
|
- """Paginated cluster listing for the dashboard."""
|
|
|
|
|
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
- now = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
- query = "SELECT payload FROM clusters WHERE updated_at >= ? AND updated_at <= ?"
|
|
|
|
|
- params: list = [cutoff, now]
|
|
|
|
|
|
|
+ """Paginated cluster listing filtered by payload.timestamp (event time).
|
|
|
|
|
+
|
|
|
|
|
+ payload.timestamp is guaranteed ISO 8601 UTC — filtered and sorted
|
|
|
|
|
+ using _read_ts, not updated_at (row modification time).
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
|
|
|
|
|
+
|
|
|
|
|
+ query = "SELECT payload FROM clusters"
|
|
|
|
|
+ params: list = []
|
|
|
if topic and topic != "all":
|
|
if topic and topic != "all":
|
|
|
- query += " AND topic = ?"
|
|
|
|
|
|
|
+ query += " WHERE topic = ?"
|
|
|
params.append(topic)
|
|
params.append(topic)
|
|
|
- query += " ORDER BY updated_at DESC LIMIT ? OFFSET ?"
|
|
|
|
|
- params.extend([limit, offset])
|
|
|
|
|
|
|
+
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
- cur = conn.execute(query, params)
|
|
|
|
|
- rows = cur.fetchall()
|
|
|
|
|
- clusters: list[dict[str, Any]] = []
|
|
|
|
|
- for (payload_text,) in rows:
|
|
|
|
|
- c = json.loads(payload_text)
|
|
|
|
|
- clusters.append({
|
|
|
|
|
|
|
+ rows = conn.execute(query, params).fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ filtered = [json.loads(r[0]) for r in rows]
|
|
|
|
|
+ filtered = [c for c in filtered if (_read_ts(c.get("timestamp")) or 0.0) >= cutoff_ts]
|
|
|
|
|
+ filtered.sort(key=lambda c: _read_ts(c.get("timestamp")) or 0.0, reverse=True)
|
|
|
|
|
+ page = filtered[offset:offset + limit]
|
|
|
|
|
+
|
|
|
|
|
+ return [
|
|
|
|
|
+ {
|
|
|
"cluster_id": c.get("cluster_id", ""),
|
|
"cluster_id": c.get("cluster_id", ""),
|
|
|
"headline": c.get("headline", ""),
|
|
"headline": c.get("headline", ""),
|
|
|
"topic": c.get("topic", ""),
|
|
"topic": c.get("topic", ""),
|
|
@@ -715,8 +686,9 @@ class SQLiteClusterStore:
|
|
|
"timestamp": c.get("timestamp", ""),
|
|
"timestamp": c.get("timestamp", ""),
|
|
|
"keywords": c.get("keywords", []),
|
|
"keywords": c.get("keywords", []),
|
|
|
"article_count": len(c.get("articles", [])),
|
|
"article_count": len(c.get("articles", [])),
|
|
|
- })
|
|
|
|
|
- return clusters
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ for c in page
|
|
|
|
|
+ ]
|
|
|
|
|
|
|
|
def get_sentiment_series(
|
|
def get_sentiment_series(
|
|
|
self,
|
|
self,
|
|
@@ -726,45 +698,28 @@ class SQLiteClusterStore:
|
|
|
) -> list[dict[str, Any]]:
|
|
) -> list[dict[str, Any]]:
|
|
|
"""Sentiment score averaged per time bucket.
|
|
"""Sentiment score averaged per time bucket.
|
|
|
|
|
|
|
|
- Filters by the cluster's own event timestamp (payload.timestamp),
|
|
|
|
|
- not by updated_at which tracks row modification time.
|
|
|
|
|
|
|
+ Filters by payload.timestamp (event time, ISO 8601 UTC guaranteed).
|
|
|
"""
|
|
"""
|
|
|
- cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
|
|
|
|
|
+ cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
|
|
|
query = "SELECT payload FROM clusters"
|
|
query = "SELECT payload FROM clusters"
|
|
|
params: list = []
|
|
params: list = []
|
|
|
if topic and topic != "all":
|
|
if topic and topic != "all":
|
|
|
query += " WHERE topic = ?"
|
|
query += " WHERE topic = ?"
|
|
|
params.append(topic)
|
|
params.append(topic)
|
|
|
- query += " ORDER BY updated_at ASC"
|
|
|
|
|
|
|
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
- cur = conn.execute(query, params)
|
|
|
|
|
- rows = cur.fetchall()
|
|
|
|
|
-
|
|
|
|
|
- def _parse_ts(ts: Any) -> datetime | None:
|
|
|
|
|
- if not ts:
|
|
|
|
|
- return None
|
|
|
|
|
- s = str(ts).strip()
|
|
|
|
|
- try:
|
|
|
|
|
- dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
|
|
|
- except Exception:
|
|
|
|
|
- try:
|
|
|
|
|
- dt = parsedate_to_datetime(s)
|
|
|
|
|
- except Exception:
|
|
|
|
|
- return None
|
|
|
|
|
- if dt.tzinfo is None:
|
|
|
|
|
- dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
- return dt.astimezone(timezone.utc)
|
|
|
|
|
|
|
+ rows = conn.execute(query, params).fetchall()
|
|
|
|
|
|
|
|
buckets: dict[datetime, list[float]] = {}
|
|
buckets: dict[datetime, list[float]] = {}
|
|
|
for (payload_text,) in rows:
|
|
for (payload_text,) in rows:
|
|
|
c = json.loads(payload_text)
|
|
c = json.loads(payload_text)
|
|
|
- dt = _parse_ts(c.get("timestamp"))
|
|
|
|
|
|
|
+ ts = _read_ts(c.get("timestamp"))
|
|
|
score = c.get("sentimentScore")
|
|
score = c.get("sentimentScore")
|
|
|
- if dt is None or score is None:
|
|
|
|
|
|
|
+ if ts is None or score is None:
|
|
|
continue
|
|
continue
|
|
|
- if dt < cutoff.replace(tzinfo=timezone.utc):
|
|
|
|
|
|
|
+ if ts < cutoff_ts:
|
|
|
continue
|
|
continue
|
|
|
|
|
+ dt = datetime.fromtimestamp(ts, tz=timezone.utc)
|
|
|
bucket_key = dt.replace(minute=0, second=0, microsecond=0)
|
|
bucket_key = dt.replace(minute=0, second=0, microsecond=0)
|
|
|
if bucket_hours > 1:
|
|
if bucket_hours > 1:
|
|
|
bucket_key = bucket_key.replace(
|
|
bucket_key = bucket_key.replace(
|
|
@@ -772,66 +727,38 @@ class SQLiteClusterStore:
|
|
|
)
|
|
)
|
|
|
buckets.setdefault(bucket_key, []).append(float(score))
|
|
buckets.setdefault(bucket_key, []).append(float(score))
|
|
|
|
|
|
|
|
- series: list[dict[str, Any]] = []
|
|
|
|
|
- for bucket_key in sorted(buckets):
|
|
|
|
|
- scores = buckets[bucket_key]
|
|
|
|
|
- series.append({
|
|
|
|
|
|
|
+ return [
|
|
|
|
|
+ {
|
|
|
"time": bucket_key.isoformat(),
|
|
"time": bucket_key.isoformat(),
|
|
|
"avg_sentiment": round(sum(scores) / len(scores), 3),
|
|
"avg_sentiment": round(sum(scores) / len(scores), 3),
|
|
|
"count": len(scores),
|
|
"count": len(scores),
|
|
|
"min": round(min(scores), 3),
|
|
"min": round(min(scores), 3),
|
|
|
"max": round(max(scores), 3),
|
|
"max": round(max(scores), 3),
|
|
|
- })
|
|
|
|
|
- return series
|
|
|
|
|
|
|
+ }
|
|
|
|
|
+ for bucket_key, scores in sorted(buckets.items())
|
|
|
|
|
+ ]
|
|
|
|
|
|
|
|
def get_entity_frequencies(
|
|
def get_entity_frequencies(
|
|
|
self,
|
|
self,
|
|
|
hours: float = 24,
|
|
hours: float = 24,
|
|
|
limit: int = 30,
|
|
limit: int = 30,
|
|
|
) -> list[dict[str, Any]]:
|
|
) -> list[dict[str, Any]]:
|
|
|
- """Top entities by mention count in recent clusters.
|
|
|
|
|
|
|
+ """Top entities by mention count filtered by payload.timestamp (ISO 8601 UTC guaranteed)."""
|
|
|
|
|
+ cutoff_ts = (datetime.now(timezone.utc) - timedelta(hours=hours)).timestamp()
|
|
|
|
|
|
|
|
- Filters by the cluster's own event timestamp (payload.timestamp),
|
|
|
|
|
- not by updated_at which tracks row modification time.
|
|
|
|
|
- """
|
|
|
|
|
- cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
|
|
|
-
|
|
|
|
|
- query = "SELECT payload FROM clusters"
|
|
|
|
|
- params: list = []
|
|
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
- cur = conn.execute(query, params)
|
|
|
|
|
- rows = cur.fetchall()
|
|
|
|
|
-
|
|
|
|
|
- def _parse_ts(ts):
|
|
|
|
|
- if not ts:
|
|
|
|
|
- return None
|
|
|
|
|
- s = str(ts).strip()
|
|
|
|
|
- try:
|
|
|
|
|
- dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
|
|
|
- except Exception:
|
|
|
|
|
- try:
|
|
|
|
|
- from email.utils import parsedate_to_datetime
|
|
|
|
|
- dt = parsedate_to_datetime(s)
|
|
|
|
|
- except Exception:
|
|
|
|
|
- return None
|
|
|
|
|
- if dt.tzinfo is None:
|
|
|
|
|
- dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
- return dt.astimezone(timezone.utc)
|
|
|
|
|
-
|
|
|
|
|
|
|
+ rows = conn.execute("SELECT payload FROM clusters").fetchall()
|
|
|
|
|
+
|
|
|
counter: dict[str, int] = {}
|
|
counter: dict[str, int] = {}
|
|
|
for (payload_text,) in rows:
|
|
for (payload_text,) in rows:
|
|
|
c = json.loads(payload_text)
|
|
c = json.loads(payload_text)
|
|
|
- dt = _parse_ts(c.get("timestamp"))
|
|
|
|
|
- if dt is None:
|
|
|
|
|
- continue
|
|
|
|
|
- if dt < cutoff:
|
|
|
|
|
|
|
+ if (_read_ts(c.get("timestamp")) or 0.0) < cutoff_ts:
|
|
|
continue
|
|
continue
|
|
|
for ent in c.get("entities", []):
|
|
for ent in c.get("entities", []):
|
|
|
counter[ent] = counter.get(ent, 0) + 1
|
|
counter[ent] = counter.get(ent, 0) + 1
|
|
|
-
|
|
|
|
|
- sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
|
|
|
|
|
|
|
+
|
|
|
result: list[dict[str, Any]] = []
|
|
result: list[dict[str, Any]] = []
|
|
|
- for label, count in sorted_entities:
|
|
|
|
|
|
|
+ for label, count in sorted(counter.items(), key=lambda x: -x[1])[:limit]:
|
|
|
meta = self.get_entity_metadata(label)
|
|
meta = self.get_entity_metadata(label)
|
|
|
result.append({
|
|
result.append({
|
|
|
"label": label,
|
|
"label": label,
|