|
@@ -673,84 +673,116 @@ class SQLiteClusterStore:
|
|
|
return clusters
|
|
return clusters
|
|
|
|
|
|
|
|
def get_sentiment_series(
|
|
def get_sentiment_series(
|
|
|
|
|
+ self,
|
|
|
|
|
+ topic: str | None = None,
|
|
|
|
|
+ hours: float = 24,
|
|
|
|
|
+ bucket_hours: float = 1,
|
|
|
|
|
+ ) -> list[dict[str, Any]]:
|
|
|
|
|
+ """Sentiment score averaged per time bucket.
|
|
|
|
|
+
|
|
|
|
|
+ Filters by the cluster's own event timestamp (payload.timestamp),
|
|
|
|
|
+ not by updated_at which tracks row modification time.
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
|
|
|
+ query = "SELECT payload FROM clusters"
|
|
|
|
|
+ params: list = []
|
|
|
|
|
+ if topic and topic != "all":
|
|
|
|
|
+ query += " WHERE topic = ?"
|
|
|
|
|
+ params.append(topic)
|
|
|
|
|
+ query += " ORDER BY updated_at ASC"
|
|
|
|
|
+
|
|
|
|
|
+ with self._conn() as conn:
|
|
|
|
|
+ cur = conn.execute(query, params)
|
|
|
|
|
+ rows = cur.fetchall()
|
|
|
|
|
+
|
|
|
|
|
+ def _parse_ts(ts: Any) -> datetime | None:
|
|
|
|
|
+ if not ts:
|
|
|
|
|
+ return None
|
|
|
|
|
+ s = str(ts).strip()
|
|
|
|
|
+ try:
|
|
|
|
|
+ dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ try:
|
|
|
|
|
+ dt = parsedate_to_datetime(s)
|
|
|
|
|
+ except Exception:
|
|
|
|
|
+ return None
|
|
|
|
|
+ if dt.tzinfo is None:
|
|
|
|
|
+ dt = dt.replace(tzinfo=timezone.utc)
|
|
|
|
|
+ return dt.astimezone(timezone.utc)
|
|
|
|
|
+
|
|
|
|
|
+ buckets: dict[datetime, list[float]] = {}
|
|
|
|
|
+ for (payload_text,) in rows:
|
|
|
|
|
+ c = json.loads(payload_text)
|
|
|
|
|
+ dt = _parse_ts(c.get("timestamp"))
|
|
|
|
|
+ score = c.get("sentimentScore")
|
|
|
|
|
+ if dt is None or score is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if dt < cutoff.replace(tzinfo=timezone.utc):
|
|
|
|
|
+ continue
|
|
|
|
|
+ bucket_key = dt.replace(minute=0, second=0, microsecond=0)
|
|
|
|
|
+ if bucket_hours > 1:
|
|
|
|
|
+ bucket_key = bucket_key.replace(
|
|
|
|
|
+ hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
|
|
|
|
|
+ )
|
|
|
|
|
+ buckets.setdefault(bucket_key, []).append(float(score))
|
|
|
|
|
+
|
|
|
|
|
+ series: list[dict[str, Any]] = []
|
|
|
|
|
+ for bucket_key in sorted(buckets):
|
|
|
|
|
+ scores = buckets[bucket_key]
|
|
|
|
|
+ series.append({
|
|
|
|
|
+ "time": bucket_key.isoformat(),
|
|
|
|
|
+ "avg_sentiment": round(sum(scores) / len(scores), 3),
|
|
|
|
|
+ "count": len(scores),
|
|
|
|
|
+ "min": round(min(scores), 3),
|
|
|
|
|
+ "max": round(max(scores), 3),
|
|
|
|
|
+ })
|
|
|
|
|
+ return series
|
|
|
|
|
+
|
|
|
|
|
+ def get_entity_frequencies(
|
|
|
self,
|
|
self,
|
|
|
- topic: str | None = None,
|
|
|
|
|
hours: float = 24,
|
|
hours: float = 24,
|
|
|
- bucket_hours: float = 1,
|
|
|
|
|
|
|
+ limit: int = 30,
|
|
|
) -> list[dict[str, Any]]:
|
|
) -> list[dict[str, Any]]:
|
|
|
- """Sentiment score averaged per time bucket."""
|
|
|
|
|
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
- now = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
- query = "SELECT payload FROM clusters WHERE updated_at >= ? AND updated_at <= ?"
|
|
|
|
|
- params: list = [cutoff, now]
|
|
|
|
|
- if topic and topic != "all":
|
|
|
|
|
- query += " AND topic = ?"
|
|
|
|
|
- params.append(topic)
|
|
|
|
|
- query += " ORDER BY updated_at ASC"
|
|
|
|
|
|
|
+ """Top entities by mention count in recent clusters.
|
|
|
|
|
+
|
|
|
|
|
+ Filters by the cluster's own event timestamp (payload.timestamp),
|
|
|
|
|
+ not by updated_at which tracks row modification time.
|
|
|
|
|
+ """
|
|
|
|
|
+ cutoff = datetime.now(timezone.utc) - timedelta(hours=hours)
|
|
|
|
|
+
|
|
|
|
|
+ query = "SELECT payload FROM clusters"
|
|
|
|
|
+ params: list = []
|
|
|
with self._conn() as conn:
|
|
with self._conn() as conn:
|
|
|
cur = conn.execute(query, params)
|
|
cur = conn.execute(query, params)
|
|
|
rows = cur.fetchall()
|
|
rows = cur.fetchall()
|
|
|
-
|
|
|
|
|
- def _parse_ts(ts: Any) -> datetime | None:
|
|
|
|
|
|
|
+
|
|
|
|
|
+ def _parse_ts(ts):
|
|
|
if not ts:
|
|
if not ts:
|
|
|
return None
|
|
return None
|
|
|
- s = str(ts)
|
|
|
|
|
|
|
+ s = str(ts).strip()
|
|
|
try:
|
|
try:
|
|
|
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
dt = datetime.fromisoformat(s.replace("Z", "+00:00"))
|
|
|
except Exception:
|
|
except Exception:
|
|
|
try:
|
|
try:
|
|
|
|
|
+ from email.utils import parsedate_to_datetime
|
|
|
dt = parsedate_to_datetime(s)
|
|
dt = parsedate_to_datetime(s)
|
|
|
except Exception:
|
|
except Exception:
|
|
|
return None
|
|
return None
|
|
|
if dt.tzinfo is None:
|
|
if dt.tzinfo is None:
|
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
dt = dt.replace(tzinfo=timezone.utc)
|
|
|
return dt.astimezone(timezone.utc)
|
|
return dt.astimezone(timezone.utc)
|
|
|
-
|
|
|
|
|
- buckets: dict[datetime, list[float]] = {}
|
|
|
|
|
|
|
+
|
|
|
|
|
+ counter: dict[str, int] = {}
|
|
|
for (payload_text,) in rows:
|
|
for (payload_text,) in rows:
|
|
|
c = json.loads(payload_text)
|
|
c = json.loads(payload_text)
|
|
|
dt = _parse_ts(c.get("timestamp"))
|
|
dt = _parse_ts(c.get("timestamp"))
|
|
|
- score = c.get("sentimentScore")
|
|
|
|
|
- if dt is None or score is None:
|
|
|
|
|
|
|
+ if dt is None:
|
|
|
|
|
+ continue
|
|
|
|
|
+ if dt < cutoff:
|
|
|
continue
|
|
continue
|
|
|
- bucket_key = dt.replace(minute=0, second=0, microsecond=0)
|
|
|
|
|
- if bucket_hours > 1:
|
|
|
|
|
- bucket_key = bucket_key.replace(
|
|
|
|
|
- hour=(bucket_key.hour // int(bucket_hours)) * int(bucket_hours)
|
|
|
|
|
- )
|
|
|
|
|
- buckets.setdefault(bucket_key, []).append(float(score))
|
|
|
|
|
-
|
|
|
|
|
- series: list[dict[str, Any]] = []
|
|
|
|
|
- for bucket_key in sorted(buckets):
|
|
|
|
|
- scores = buckets[bucket_key]
|
|
|
|
|
- series.append({
|
|
|
|
|
- "time": bucket_key.isoformat(),
|
|
|
|
|
- "avg_sentiment": round(sum(scores) / len(scores), 3),
|
|
|
|
|
- "count": len(scores),
|
|
|
|
|
- "min": round(min(scores), 3),
|
|
|
|
|
- "max": round(max(scores), 3),
|
|
|
|
|
- })
|
|
|
|
|
- return series
|
|
|
|
|
-
|
|
|
|
|
- def get_entity_frequencies(
|
|
|
|
|
- self,
|
|
|
|
|
- hours: float = 24,
|
|
|
|
|
- limit: int = 30,
|
|
|
|
|
- ) -> list[dict[str, Any]]:
|
|
|
|
|
- """Top entities by mention count in recent clusters."""
|
|
|
|
|
- cutoff = (datetime.now(timezone.utc) - timedelta(hours=hours)).isoformat()
|
|
|
|
|
- now = datetime.now(timezone.utc).isoformat()
|
|
|
|
|
- with self._conn() as conn:
|
|
|
|
|
- cur = conn.execute(
|
|
|
|
|
- "SELECT payload FROM clusters WHERE updated_at >= ? AND updated_at <= ? ORDER BY updated_at DESC LIMIT 500",
|
|
|
|
|
- (cutoff, now),
|
|
|
|
|
- )
|
|
|
|
|
- rows = cur.fetchall()
|
|
|
|
|
- counter: dict[str, int] = {}
|
|
|
|
|
- for (payload_text,) in rows:
|
|
|
|
|
- c = json.loads(payload_text)
|
|
|
|
|
for ent in c.get("entities", []):
|
|
for ent in c.get("entities", []):
|
|
|
counter[ent] = counter.get(ent, 0) + 1
|
|
counter[ent] = counter.get(ent, 0) + 1
|
|
|
|
|
+
|
|
|
sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
|
|
sorted_entities = sorted(counter.items(), key=lambda x: -x[1])[:limit]
|
|
|
result: list[dict[str, Any]] = []
|
|
result: list[dict[str, Any]] = []
|
|
|
for label, count in sorted_entities:
|
|
for label, count in sorted_entities:
|