diff --git a/app/routers/__init__.py b/app/routers/__init__.py new file mode 100644 index 0000000..c18be12 --- /dev/null +++ b/app/routers/__init__.py @@ -0,0 +1,4 @@ +from .chat import router as chat_router +from .metrics import router as metrics_router + +__all__ = ["chat_router", "metrics_router"] diff --git a/app/routers/metrics.py b/app/routers/metrics.py new file mode 100644 index 0000000..3a2f238 --- /dev/null +++ b/app/routers/metrics.py @@ -0,0 +1,166 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, List, Optional + +from fastapi import APIRouter, HTTPException, Query + +from app.schemas.metrics import ( + MetricCreate, + MetricResultsWriteRequest, + MetricRunTrigger, + MetricScheduleCreate, + MetricScheduleUpdate, + MetricUpdate, +) +from app.services import metric_store + + +router = APIRouter(prefix="/api/v1", tags=["metrics"]) + + +@router.post("/metrics") +def create_metric(payload: MetricCreate) -> Any: + """Create a metric definition.""" + try: + return metric_store.create_metric(payload) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + +@router.post("/metrics/{metric_id}") +def update_metric(metric_id: int, payload: MetricUpdate) -> Any: + """Update fields of a metric definition.""" + try: + return metric_store.update_metric(metric_id, payload) + except KeyError: + raise HTTPException(status_code=404, detail="Metric not found") + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + +@router.get("/metrics/{metric_id}") +def get_metric(metric_id: int) -> Any: + """Fetch a metric definition by id.""" + metric = metric_store.get_metric(metric_id) + if not metric: + raise HTTPException(status_code=404, detail="Metric not found") + return metric + + +@router.get("/metrics") +def list_metrics( + biz_domain: Optional[str] = None, + is_active: Optional[bool] = None, + keyword: Optional[str] = Query(None, description="Search by code/name"), + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), +) -> List[Any]: + """List metrics with optional filters.""" + return metric_store.list_metrics( + biz_domain=biz_domain, + is_active=is_active, + keyword=keyword, + limit=limit, + offset=offset, + ) + + +@router.post("/metric-schedules") +def create_schedule(payload: MetricScheduleCreate) -> Any: + """Create a metric schedule.""" + try: + return metric_store.create_metric_schedule(payload) + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + +@router.post("/metric-schedules/{schedule_id}") +def update_schedule(schedule_id: int, payload: MetricScheduleUpdate) -> Any: + """Update a metric schedule.""" + try: + return metric_store.update_metric_schedule(schedule_id, payload) + except KeyError: + raise HTTPException(status_code=404, detail="Schedule not found") + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + +@router.get("/metrics/{metric_id}/schedules") +def list_schedules(metric_id: int) -> List[Any]: + """List schedules for one metric.""" + return metric_store.list_schedules_for_metric(metric_id=metric_id) + + +@router.post("/metric-runs/trigger") +def trigger_run(payload: MetricRunTrigger) -> Any: + """Insert a run record (execution handled externally).""" + try: + return metric_store.trigger_metric_run(payload) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + + +@router.get("/metric-runs") +def list_runs( + metric_id: Optional[int] = None, + status: Optional[str] = None, + limit: int = Query(100, ge=1, le=500), + offset: int = Query(0, ge=0), +) -> List[Any]: + """List run records.""" + return metric_store.list_metric_runs( + metric_id=metric_id, status=status, limit=limit, offset=offset + ) + + +@router.get("/metric-runs/{run_id}") +def get_run(run_id: int) -> Any: + """Fetch run details.""" + run = metric_store.get_metric_run(run_id) + if not run: + raise HTTPException(status_code=404, detail="Run not found") + return run + + +@router.post("/metric-results/{metric_id}") +def write_results(metric_id: int, payload: MetricResultsWriteRequest) -> Any: + # Align path metric_id with payload to avoid mismatch. + if payload.metric_id != metric_id: + raise HTTPException(status_code=400, detail="metric_id in path/body mismatch") + try: + inserted = metric_store.write_metric_results(payload) + except KeyError as exc: + raise HTTPException(status_code=404, detail=str(exc)) from exc + except Exception as exc: + raise HTTPException(status_code=400, detail=str(exc)) from exc + return {"metric_id": metric_id, "inserted": inserted} + + +@router.get("/metric-results") +def query_results( + metric_id: int, + stat_from: Optional[datetime] = None, + stat_to: Optional[datetime] = None, + limit: int = Query(200, ge=1, le=1000), + offset: int = Query(0, ge=0), +) -> List[Any]: + """Query metric results by time range.""" + return metric_store.query_metric_results( + metric_id=metric_id, + stat_from=stat_from, + stat_to=stat_to, + limit=limit, + offset=offset, + ) + + +@router.get("/metric-results/latest") +def latest_result(metric_id: int) -> Any: + """Fetch the latest metric result.""" + result = metric_store.latest_metric_result(metric_id) + if not result: + raise HTTPException(status_code=404, detail="Metric result not found") + return result diff --git a/app/schemas/metrics.py b/app/schemas/metrics.py new file mode 100644 index 0000000..8c5dc44 --- /dev/null +++ b/app/schemas/metrics.py @@ -0,0 +1,99 @@ +from __future__ import annotations + +from datetime import datetime +from typing import Any, List, Optional + +from pydantic import BaseModel, Field + + +class MetricCreate(BaseModel): + """Create a metric definition with business and technical metadata.""" + metric_code: str = Field(..., description="Internal metric code, unique.") + metric_name: str = Field(..., description="Display name.") + metric_aliases: Optional[List[str]] = Field(None, description="Optional alias list.") + biz_domain: str = Field(..., description="Business domain identifier.") + biz_desc: Optional[str] = Field(None, description="Business definition.") + chat_turn_id: Optional[int] = Field(None, description="Source chat turn ID.") + tech_desc: Optional[str] = Field(None, description="Technical definition.") + formula_expr: Optional[str] = Field(None, description="Formula expression text.") + base_sql: str = Field(..., description="Canonical SQL used to compute the metric.") + time_grain: str = Field(..., description="DAY/HOUR/WEEK/MONTH etc.") + dim_binding: List[str] = Field(..., description="Dimension columns bound to the metric.") + update_strategy: str = Field(..., description="FULL/INCR/REALTIME.") + schedule_id: Optional[int] = Field(None, description="Linked schedule id if any.") + schedule_type: Optional[int] = Field(None, description="Scheduler type identifier.") + is_active: bool = Field(True, description="Whether the metric is enabled.") + created_by: Optional[int] = Field(None, description="Creator user id.") + updated_by: Optional[int] = Field(None, description="Updater user id.") + + +class MetricUpdate(BaseModel): + """Partial update for an existing metric definition.""" + metric_name: Optional[str] = None + metric_aliases: Optional[List[str]] = None + biz_domain: Optional[str] = None + biz_desc: Optional[str] = None + tech_desc: Optional[str] = None + formula_expr: Optional[str] = None + base_sql: Optional[str] = None + time_grain: Optional[str] = None + dim_binding: Optional[List[str]] = None + update_strategy: Optional[str] = None + schedule_id: Optional[int] = None + schedule_type: Optional[int] = None + is_active: Optional[bool] = None + updated_by: Optional[int] = None + + +class MetricScheduleCreate(BaseModel): + """Create a cron-based schedule for a metric.""" + metric_id: int + cron_expr: str + enabled: bool = True + priority: int = 10 + backfill_allowed: bool = True + max_runtime_sec: Optional[int] = None + retry_times: int = 0 + owner_team: Optional[str] = None + owner_user_id: Optional[int] = None + + +class MetricScheduleUpdate(BaseModel): + """Update fields of an existing metric schedule.""" + cron_expr: Optional[str] = None + enabled: Optional[bool] = None + priority: Optional[int] = None + backfill_allowed: Optional[bool] = None + max_runtime_sec: Optional[int] = None + retry_times: Optional[int] = None + owner_team: Optional[str] = None + owner_user_id: Optional[int] = None + + +class MetricRunTrigger(BaseModel): + """Trigger a metric run, optionally linking to a chat turn or schedule.""" + metric_id: int + schedule_id: Optional[int] = None + source_turn_id: Optional[int] = None + data_time_from: Optional[datetime] = None + data_time_to: Optional[datetime] = None + metric_version: Optional[int] = None + base_sql_snapshot: Optional[str] = None + triggered_by: str = Field("API", description="SCHEDULER/MANUAL/API/QA_TURN") + triggered_at: Optional[datetime] = None + + +class MetricResultItem(BaseModel): + """Single metric result row to be persisted.""" + stat_time: datetime + metric_value: float + metric_version: Optional[int] = None + extra_dims: Optional[dict[str, Any]] = None + load_time: Optional[datetime] = None + data_version: Optional[int] = None + + +class MetricResultsWriteRequest(BaseModel): + """Batch write request for metric results.""" + metric_id: int + results: List[MetricResultItem] diff --git a/app/services/__init__.py b/app/services/__init__.py index 849697d..a96fdcb 100644 --- a/app/services/__init__.py +++ b/app/services/__init__.py @@ -1,3 +1,4 @@ from .gateway import LLMGateway +from .rag_client import RagAPIClient -__all__ = ["LLMGateway"] +__all__ = ["LLMGateway", "RagAPIClient"] diff --git a/app/services/metric_store.py b/app/services/metric_store.py new file mode 100644 index 0000000..4015881 --- /dev/null +++ b/app/services/metric_store.py @@ -0,0 +1,842 @@ +from __future__ import annotations + +import hashlib +import json +import logging +from datetime import datetime +from typing import Any, Dict, Iterable, List, Optional +from uuid import uuid4 + +from sqlalchemy import text +from sqlalchemy.engine import Row + +from app.db import get_engine +from app.schemas.chat import ( + ChatSessionCreate, + ChatSessionUpdate, + ChatTurnCreate, + ChatTurnRetrievalItem, +) +from app.schemas.metrics import ( + MetricCreate, + MetricResultItem, + MetricResultsWriteRequest, + MetricRunTrigger, + MetricScheduleCreate, + MetricScheduleUpdate, + MetricUpdate, +) + + +logger = logging.getLogger(__name__) + + +# Common helpers +def _json_dump(value: Any) -> Optional[str]: + """Safe JSON dumper; returns None on failure to keep DB writes simple.""" + if value is None: + return None + if isinstance(value, str): + return value + try: + return json.dumps(value, ensure_ascii=False) + except (TypeError, ValueError): + return None + + +def _parse_json_fields(payload: Dict[str, Any], fields: Iterable[str]) -> Dict[str, Any]: + """Parse select fields from JSON strings into dict/list for responses.""" + for field in fields: + raw = payload.get(field) + if raw is None or isinstance(raw, (dict, list)): + continue + if isinstance(raw, (bytes, bytearray)): + raw = raw.decode("utf-8", errors="ignore") + if isinstance(raw, str): + try: + payload[field] = json.loads(raw) + except ValueError: + pass + return payload + + +def _row_to_dict(row: Row[Any]) -> Dict[str, Any]: + return dict(row._mapping) + + +# Chat sessions & turns +def create_chat_session(payload: ChatSessionCreate) -> Dict[str, Any]: + """Create a chat session row with optional external UUID.""" + engine = get_engine() + session_uuid = payload.session_uuid or str(uuid4()) + now = datetime.utcnow() + params = { + "user_id": payload.user_id, + "session_uuid": session_uuid, + "end_time": payload.end_time, + "status": payload.status or "OPEN", + "ext_context": _json_dump(payload.ext_context), + } + with engine.begin() as conn: + result = conn.execute( + text( + """ + INSERT INTO chat_session (user_id, session_uuid, end_time, status, ext_context) + VALUES (:user_id, :session_uuid, :end_time, :status, :ext_context) + """ + ), + params, + ) + session_id = result.lastrowid + row = conn.execute( + text("SELECT * FROM chat_session WHERE id=:id"), {"id": session_id} + ).first() + if not row: + raise RuntimeError("Failed to create chat session.") + data = _row_to_dict(row) + _parse_json_fields(data, ["ext_context"]) + return data + + +def update_chat_session(session_id: int, payload: ChatSessionUpdate) -> Dict[str, Any]: + """Patch selected chat session fields.""" + updates = {} + if payload.status is not None: + updates["status"] = payload.status + if payload.end_time is not None: + updates["end_time"] = payload.end_time + if payload.last_turn_id is not None: + updates["last_turn_id"] = payload.last_turn_id + if payload.ext_context is not None: + updates["ext_context"] = _json_dump(payload.ext_context) + + if not updates: + current = get_chat_session(session_id) + if not current: + raise KeyError(f"Session {session_id} not found.") + return current + + set_clause = ", ".join(f"{key}=:{key}" for key in updates.keys()) + params = dict(updates) + params["id"] = session_id + + engine = get_engine() + with engine.begin() as conn: + conn.execute( + text(f"UPDATE chat_session SET {set_clause} WHERE id=:id"), + params, + ) + row = conn.execute( + text("SELECT * FROM chat_session WHERE id=:id"), {"id": session_id} + ).first() + if not row: + raise KeyError(f"Session {session_id} not found.") + data = _row_to_dict(row) + _parse_json_fields(data, ["ext_context"]) + return data + + +def close_chat_session(session_id: int) -> Dict[str, Any]: + """Mark a chat session as CLOSED with end_time.""" + now = datetime.utcnow() + return update_chat_session( + session_id, + ChatSessionUpdate(status="CLOSED", end_time=now), + ) + + +def get_chat_session(session_id: int) -> Optional[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + row = conn.execute( + text("SELECT * FROM chat_session WHERE id=:id"), {"id": session_id} + ).first() + if not row: + return None + data = _row_to_dict(row) + _parse_json_fields(data, ["ext_context"]) + return data + + +def list_chat_sessions( + *, + user_id: Optional[int] = None, + status: Optional[str] = None, + start_from: Optional[datetime] = None, + start_to: Optional[datetime] = None, + limit: int = 50, + offset: int = 0, +) -> List[Dict[str, Any]]: + """List chat sessions with optional filters and pagination.""" + conditions = [] + params: Dict[str, Any] = {"limit": limit, "offset": offset} + if user_id is not None: + conditions.append("user_id=:user_id") + params["user_id"] = user_id + if status is not None: + conditions.append("status=:status") + params["status"] = status + if start_from is not None: + conditions.append("created_at>=:start_from") + params["start_from"] = start_from + if start_to is not None: + conditions.append("created_at<=:start_to") + params["start_to"] = start_to + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text( + f"SELECT * FROM chat_session {where_clause} " + "ORDER BY created_at DESC LIMIT :limit OFFSET :offset" + ), + params, + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in rows: + data = _row_to_dict(row) + _parse_json_fields(data, ["ext_context"]) + results.append(data) + return results + + +def _next_turn_no(conn, session_id: int) -> int: + row = conn.execute( + text("SELECT COALESCE(MAX(turn_no), 0) + 1 AS next_no FROM chat_turn WHERE session_id=:sid"), + {"sid": session_id}, + ).first() + if not row: + return 1 + return int(row._mapping["next_no"]) + + +def create_chat_turn(session_id: int, payload: ChatTurnCreate) -> Dict[str, Any]: + """Insert a chat turn and auto-increment turn number within the session.""" + engine = get_engine() + now = datetime.utcnow() + params = { + "session_id": session_id, + "user_id": payload.user_id, + "user_query": payload.user_query, + "intent": payload.intent, + "ast_json": _json_dump(payload.ast_json), + "generated_sql": payload.generated_sql, + "sql_status": payload.sql_status, + "error_msg": payload.error_msg, + "main_metric_ids": _json_dump(payload.main_metric_ids), + "created_metric_ids": _json_dump(payload.created_metric_ids), + "end_time": payload.end_time, + } + with engine.begin() as conn: + turn_no = _next_turn_no(conn, session_id) + params["turn_no"] = turn_no + result = conn.execute( + text( + """ + INSERT INTO chat_turn ( + session_id, turn_no, user_id, + user_query, intent, ast_json, + generated_sql, sql_status, error_msg, + main_metric_ids, created_metric_ids, + end_time + ) + VALUES ( + :session_id, :turn_no, :user_id, + :user_query, :intent, :ast_json, + :generated_sql, :sql_status, :error_msg, + :main_metric_ids, :created_metric_ids, + :end_time + ) + """ + ), + params, + ) + turn_id = result.lastrowid + row = conn.execute( + text("SELECT * FROM chat_turn WHERE id=:id"), {"id": turn_id} + ).first() + if not row: + raise RuntimeError("Failed to create chat turn.") + data = _row_to_dict(row) + _parse_json_fields(data, ["ast_json", "main_metric_ids", "created_metric_ids"]) + return data + + +def get_chat_turn(turn_id: int) -> Optional[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + row = conn.execute( + text("SELECT * FROM chat_turn WHERE id=:id"), {"id": turn_id} + ).first() + if not row: + return None + data = _row_to_dict(row) + _parse_json_fields(data, ["ast_json", "main_metric_ids", "created_metric_ids"]) + return data + + +def list_chat_turns(session_id: int) -> List[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text( + "SELECT * FROM chat_turn WHERE session_id=:session_id ORDER BY turn_no ASC" + ), + {"session_id": session_id}, + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in rows: + data = _row_to_dict(row) + _parse_json_fields(data, ["ast_json", "main_metric_ids", "created_metric_ids"]) + results.append(data) + return results + + +def create_retrievals(turn_id: int, retrievals: List[ChatTurnRetrievalItem]) -> int: + """Batch insert retrieval records for a turn.""" + if not retrievals: + return 0 + engine = get_engine() + params_list = [] + for item in retrievals: + params_list.append( + { + "turn_id": turn_id, + "item_type": item.item_type, + "item_id": item.item_id, + "item_extra": _json_dump(item.item_extra), + "similarity_score": item.similarity_score, + "rank_no": item.rank_no, + "used_in_reasoning": 1 if item.used_in_reasoning else 0, + "used_in_sql": 1 if item.used_in_sql else 0, + } + ) + with engine.begin() as conn: + conn.execute( + text( + """ + INSERT INTO chat_turn_retrieval ( + turn_id, item_type, item_id, item_extra, + similarity_score, rank_no, used_in_reasoning, used_in_sql + ) + VALUES ( + :turn_id, :item_type, :item_id, :item_extra, + :similarity_score, :rank_no, :used_in_reasoning, :used_in_sql + ) + """ + ), + params_list, + ) + return len(retrievals) + + +def list_retrievals(turn_id: int) -> List[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text( + "SELECT * FROM chat_turn_retrieval WHERE turn_id=:turn_id ORDER BY created_at ASC, rank_no ASC" + ), + {"turn_id": turn_id}, + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in rows: + data = _row_to_dict(row) + _parse_json_fields(data, ["item_extra"]) + data["used_in_reasoning"] = bool(data.get("used_in_reasoning")) + data["used_in_sql"] = bool(data.get("used_in_sql")) + results.append(data) + return results + + +# Metric registry +def _metric_sql_hash(sql_text: str) -> str: + """Compute a stable hash to detect SQL definition changes.""" + return hashlib.md5(sql_text.encode("utf-8")).hexdigest() + + +def create_metric(payload: MetricCreate) -> Dict[str, Any]: + """Insert a new metric definition; version starts at 1.""" + engine = get_engine() + now = datetime.utcnow() + sql_hash = _metric_sql_hash(payload.base_sql) + params = { + "metric_code": payload.metric_code, + "metric_name": payload.metric_name, + "metric_aliases": _json_dump(payload.metric_aliases), + "biz_domain": payload.biz_domain, + "biz_desc": payload.biz_desc, + "chat_turn_id": payload.chat_turn_id, + "tech_desc": payload.tech_desc, + "formula_expr": payload.formula_expr, + "base_sql": payload.base_sql, + "time_grain": payload.time_grain, + "dim_binding": _json_dump(payload.dim_binding), + "update_strategy": payload.update_strategy, + "schedule_id": payload.schedule_id, + "schedule_type": payload.schedule_type, + "version": 1, + "is_active": 1 if payload.is_active else 0, + "sql_hash": sql_hash, + "created_by": payload.created_by, + "updated_by": payload.updated_by, + "created_at": now, + "updated_at": now, + } + with engine.begin() as conn: + result = conn.execute( + text( + """ + INSERT INTO metric_def ( + metric_code, metric_name, metric_aliases, biz_domain, biz_desc, + chat_turn_id, tech_desc, formula_expr, base_sql, + time_grain, dim_binding, update_strategy, + schedule_id, schedule_type, version, is_active, + sql_hash, created_by, updated_by, created_at, updated_at + ) + VALUES ( + :metric_code, :metric_name, :metric_aliases, :biz_domain, :biz_desc, + :chat_turn_id, :tech_desc, :formula_expr, :base_sql, + :time_grain, :dim_binding, :update_strategy, + :schedule_id, :schedule_type, :version, :is_active, + :sql_hash, :created_by, :updated_by, :created_at, :updated_at + ) + """ + ), + params, + ) + metric_id = result.lastrowid + row = conn.execute( + text("SELECT * FROM metric_def WHERE id=:id"), {"id": metric_id} + ).first() + if not row: + raise RuntimeError("Failed to create metric definition.") + data = _row_to_dict(row) + _parse_json_fields(data, ["metric_aliases", "dim_binding"]) + data["is_active"] = bool(data.get("is_active")) + return data + + +def update_metric(metric_id: int, payload: MetricUpdate) -> Dict[str, Any]: + """Update mutable fields of a metric definition and refresh sql_hash when needed.""" + updates: Dict[str, Any] = {} + for field in ( + "metric_name", + "biz_domain", + "biz_desc", + "tech_desc", + "formula_expr", + "base_sql", + "time_grain", + "update_strategy", + "schedule_id", + "schedule_type", + "updated_by", + ): + value = getattr(payload, field) + if value is not None: + updates[field] = value + + if payload.metric_aliases is not None: + updates["metric_aliases"] = _json_dump(payload.metric_aliases) + if payload.dim_binding is not None: + updates["dim_binding"] = _json_dump(payload.dim_binding) + if payload.is_active is not None: + updates["is_active"] = 1 if payload.is_active else 0 + if payload.base_sql is not None: + updates["sql_hash"] = _metric_sql_hash(payload.base_sql) + + if not updates: + current = get_metric(metric_id) + if not current: + raise KeyError(f"Metric {metric_id} not found.") + return current + + updates["updated_at"] = datetime.utcnow() + set_clause = ", ".join(f"{key}=:{key}" for key in updates.keys()) + params = dict(updates) + params["id"] = metric_id + + engine = get_engine() + with engine.begin() as conn: + conn.execute( + text(f"UPDATE metric_def SET {set_clause} WHERE id=:id"), + params, + ) + row = conn.execute( + text("SELECT * FROM metric_def WHERE id=:id"), {"id": metric_id} + ).first() + if not row: + raise KeyError(f"Metric {metric_id} not found.") + data = _row_to_dict(row) + _parse_json_fields(data, ["metric_aliases", "dim_binding"]) + data["is_active"] = bool(data.get("is_active")) + return data + + +def get_metric(metric_id: int) -> Optional[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + row = conn.execute( + text("SELECT * FROM metric_def WHERE id=:id"), {"id": metric_id} + ).first() + if not row: + return None + data = _row_to_dict(row) + _parse_json_fields(data, ["metric_aliases", "dim_binding"]) + data["is_active"] = bool(data.get("is_active")) + return data + + +def list_metrics( + *, + biz_domain: Optional[str] = None, + is_active: Optional[bool] = None, + keyword: Optional[str] = None, + limit: int = 100, + offset: int = 0, +) -> List[Dict[str, Any]]: + """List metric definitions with simple filters and pagination.""" + conditions = [] + params: Dict[str, Any] = {"limit": limit, "offset": offset} + if biz_domain: + conditions.append("biz_domain=:biz_domain") + params["biz_domain"] = biz_domain + if is_active is not None: + conditions.append("is_active=:is_active") + params["is_active"] = 1 if is_active else 0 + if keyword: + conditions.append("(metric_code LIKE :kw OR metric_name LIKE :kw)") + params["kw"] = f"%{keyword}%" + + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text( + f"SELECT * FROM metric_def {where_clause} " + "ORDER BY updated_at DESC LIMIT :limit OFFSET :offset" + ), + params, + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in rows: + data = _row_to_dict(row) + _parse_json_fields(data, ["metric_aliases", "dim_binding"]) + data["is_active"] = bool(data.get("is_active")) + results.append(data) + return results + + +# Metric schedules +def create_metric_schedule(payload: MetricScheduleCreate) -> Dict[str, Any]: + """Create a schedule record for a metric.""" + engine = get_engine() + params = { + "metric_id": payload.metric_id, + "cron_expr": payload.cron_expr, + "enabled": 1 if payload.enabled else 0, + "priority": payload.priority, + "backfill_allowed": 1 if payload.backfill_allowed else 0, + "max_runtime_sec": payload.max_runtime_sec, + "retry_times": payload.retry_times, + "owner_team": payload.owner_team, + "owner_user_id": payload.owner_user_id, + } + with engine.begin() as conn: + result = conn.execute( + text( + """ + INSERT INTO metric_schedule ( + metric_id, cron_expr, enabled, priority, + backfill_allowed, max_runtime_sec, retry_times, + owner_team, owner_user_id + ) VALUES ( + :metric_id, :cron_expr, :enabled, :priority, + :backfill_allowed, :max_runtime_sec, :retry_times, + :owner_team, :owner_user_id + ) + """ + ), + params, + ) + schedule_id = result.lastrowid + row = conn.execute( + text("SELECT * FROM metric_schedule WHERE id=:id"), {"id": schedule_id} + ).first() + if not row: + raise RuntimeError("Failed to create metric schedule.") + data = _row_to_dict(row) + data["enabled"] = bool(data.get("enabled")) + data["backfill_allowed"] = bool(data.get("backfill_allowed")) + return data + + +def update_metric_schedule(schedule_id: int, payload: MetricScheduleUpdate) -> Dict[str, Any]: + updates: Dict[str, Any] = {} + for field in ( + "cron_expr", + "priority", + "max_runtime_sec", + "retry_times", + "owner_team", + "owner_user_id", + ): + value = getattr(payload, field) + if value is not None: + updates[field] = value + if payload.enabled is not None: + updates["enabled"] = 1 if payload.enabled else 0 + if payload.backfill_allowed is not None: + updates["backfill_allowed"] = 1 if payload.backfill_allowed else 0 + + if not updates: + current = list_schedules_for_metric(schedule_id=schedule_id) + if current: + return current[0] + raise KeyError(f"Schedule {schedule_id} not found.") + + set_clause = ", ".join(f"{key}=:{key}" for key in updates.keys()) + params = dict(updates) + params["id"] = schedule_id + + engine = get_engine() + with engine.begin() as conn: + conn.execute( + text(f"UPDATE metric_schedule SET {set_clause} WHERE id=:id"), + params, + ) + row = conn.execute( + text("SELECT * FROM metric_schedule WHERE id=:id"), {"id": schedule_id} + ).first() + if not row: + raise KeyError(f"Schedule {schedule_id} not found.") + data = _row_to_dict(row) + data["enabled"] = bool(data.get("enabled")) + data["backfill_allowed"] = bool(data.get("backfill_allowed")) + return data + + +def list_schedules_for_metric(metric_id: Optional[int] = None, schedule_id: Optional[int] = None) -> List[Dict[str, Any]]: + conditions = [] + params: Dict[str, Any] = {} + if metric_id is not None: + conditions.append("metric_id=:metric_id") + params["metric_id"] = metric_id + if schedule_id is not None: + conditions.append("id=:id") + params["id"] = schedule_id + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text(f"SELECT * FROM metric_schedule {where_clause} ORDER BY id DESC"), + params, + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in rows: + data = _row_to_dict(row) + data["enabled"] = bool(data.get("enabled")) + data["backfill_allowed"] = bool(data.get("backfill_allowed")) + results.append(data) + return results + + +# Metric runs +def trigger_metric_run(payload: MetricRunTrigger) -> Dict[str, Any]: + """Create a metric_job_run entry; execution is orchestrated elsewhere.""" + metric = get_metric(payload.metric_id) + if not metric: + raise KeyError(f"Metric {payload.metric_id} not found.") + metric_version = payload.metric_version or metric.get("version", 1) + base_sql_snapshot = payload.base_sql_snapshot or metric.get("base_sql") + triggered_at = payload.triggered_at or datetime.utcnow() + + params = { + "metric_id": payload.metric_id, + "schedule_id": payload.schedule_id, + "source_turn_id": payload.source_turn_id, + "data_time_from": payload.data_time_from, + "data_time_to": payload.data_time_to, + "metric_version": metric_version, + "base_sql_snapshot": base_sql_snapshot, + "status": "RUNNING", + "error_msg": None, + "affected_rows": None, + "runtime_ms": None, + "triggered_by": payload.triggered_by, + "triggered_at": triggered_at, + "started_at": None, + "finished_at": None, + } + engine = get_engine() + with engine.begin() as conn: + result = conn.execute( + text( + """ + INSERT INTO metric_job_run ( + metric_id, schedule_id, source_turn_id, + data_time_from, data_time_to, metric_version, + base_sql_snapshot, status, error_msg, + affected_rows, runtime_ms, + triggered_by, triggered_at, started_at, finished_at + ) VALUES ( + :metric_id, :schedule_id, :source_turn_id, + :data_time_from, :data_time_to, :metric_version, + :base_sql_snapshot, :status, :error_msg, + :affected_rows, :runtime_ms, + :triggered_by, :triggered_at, :started_at, :finished_at + ) + """ + ), + params, + ) + run_id = result.lastrowid + row = conn.execute( + text("SELECT * FROM metric_job_run WHERE id=:id"), {"id": run_id} + ).first() + if not row: + raise RuntimeError("Failed to create metric job run.") + return _row_to_dict(row) + + +def get_metric_run(run_id: int) -> Optional[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + row = conn.execute( + text("SELECT * FROM metric_job_run WHERE id=:id"), {"id": run_id} + ).first() + if not row: + return None + return _row_to_dict(row) + + +def list_metric_runs( + *, + metric_id: Optional[int] = None, + status: Optional[str] = None, + limit: int = 100, + offset: int = 0, +) -> List[Dict[str, Any]]: + conditions = [] + params: Dict[str, Any] = {"limit": limit, "offset": offset} + if metric_id is not None: + conditions.append("metric_id=:metric_id") + params["metric_id"] = metric_id + if status is not None: + conditions.append("status=:status") + params["status"] = status + where_clause = "WHERE " + " AND ".join(conditions) if conditions else "" + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text( + f"SELECT * FROM metric_job_run {where_clause} " + "ORDER BY triggered_at DESC LIMIT :limit OFFSET :offset" + ), + params, + ).fetchall() + return [_row_to_dict(row) for row in rows] + + +# Metric results +def write_metric_results(payload: MetricResultsWriteRequest) -> int: + """Bulk insert metric_result rows for a metric/version.""" + metric = get_metric(payload.metric_id) + if not metric: + raise KeyError(f"Metric {payload.metric_id} not found.") + default_version = metric.get("version", 1) + now = datetime.utcnow() + rows: List[Dict[str, Any]] = [] + for item in payload.results: + rows.append( + { + "metric_id": payload.metric_id, + "metric_version": item.metric_version or default_version, + "stat_time": item.stat_time, + "extra_dims": _json_dump(item.extra_dims), + "metric_value": item.metric_value, + "load_time": item.load_time or now, + "data_version": item.data_version, + } + ) + if not rows: + return 0 + engine = get_engine() + with engine.begin() as conn: + conn.execute( + text( + """ + INSERT INTO metric_result ( + metric_id, metric_version, stat_time, + extra_dims, metric_value, load_time, data_version + ) VALUES ( + :metric_id, :metric_version, :stat_time, + :extra_dims, :metric_value, :load_time, :data_version + ) + """ + ), + rows, + ) + return len(rows) + + +def query_metric_results( + *, + metric_id: int, + stat_from: Optional[datetime] = None, + stat_to: Optional[datetime] = None, + limit: int = 200, + offset: int = 0, +) -> List[Dict[str, Any]]: + conditions = ["metric_id=:metric_id"] + params: Dict[str, Any] = { + "metric_id": metric_id, + "limit": limit, + "offset": offset, + } + if stat_from is not None: + conditions.append("stat_time>=:stat_from") + params["stat_from"] = stat_from + if stat_to is not None: + conditions.append("stat_time<=:stat_to") + params["stat_to"] = stat_to + + where_clause = "WHERE " + " AND ".join(conditions) + engine = get_engine() + with engine.begin() as conn: + rows = conn.execute( + text( + f"SELECT * FROM metric_result {where_clause} " + "ORDER BY stat_time DESC LIMIT :limit OFFSET :offset" + ), + params, + ).fetchall() + results: List[Dict[str, Any]] = [] + for row in rows: + data = _row_to_dict(row) + _parse_json_fields(data, ["extra_dims"]) + results.append(data) + return results + + +def latest_metric_result(metric_id: int) -> Optional[Dict[str, Any]]: + engine = get_engine() + with engine.begin() as conn: + row = conn.execute( + text( + """ + SELECT * FROM metric_result + WHERE metric_id=:metric_id + ORDER BY stat_time DESC + LIMIT 1 + """ + ), + {"metric_id": metric_id}, + ).first() + if not row: + return None + data = _row_to_dict(row) + _parse_json_fields(data, ["extra_dims"]) + return data diff --git a/doc/指标api.md b/doc/指标api.md new file mode 100644 index 0000000..732653a --- /dev/null +++ b/doc/指标api.md @@ -0,0 +1,69 @@ +# 新建指标 +curl -X POST "/api/v1/metrics" \ + -H "Content-Type: application/json" \ + -d '{ + "metric_code": "metric_1234", + "metric_name": "订单数", + "biz_domain": "order", + "biz_desc": "订单总数", + "base_sql": "select count(*) as order_cnt from orders", + "time_grain": "DAY", + "dim_binding": ["dt"], + "update_strategy": "FULL", + "metric_aliases": ["订单量"], + "created_by": '"$METRIC_USER_ID"' + }' + +# 更新指标 +curl -X POST "/api/v1/metrics/{metric_id}" \ + -H "Content-Type: application/json" \ + -d '{"metric_name":"订单数-更新","is_active":false}' + +# 获取指标 +curl "/api/v1/metrics/{metric_id}" + +# 新建调度 +curl -X POST "/api/v1/metric-schedules" \ + -H "Content-Type: application/json" \ + -d '{"metric_id":{metric_id},"cron_expr":"0 2 * * *","priority":5,"enabled":true}' + +# 更新调度 +curl -X POST "/api/v1/metric-schedules/{schedule_id}" \ + -H "Content-Type: application/json" \ + -d '{"enabled":false,"retry_times":1}' + +# 列出某指标的调度 +curl "/api/v1/metrics/{metric_id}/schedules" + +# 触发运行 +curl -X POST "/api/v1/metric-runs/trigger" \ + -H "Content-Type: application/json" \ + -d '{ + "metric_id": {metric_id}, + "triggered_by": "API", + "data_time_from": "2024-05-01T00:00:00Z", + "data_time_to": "2024-05-02T00:00:00Z" + }' + +# 列出运行 +curl "/api/v1/metric-runs?metric_id={metric_id}" + +# 获取单次运行 +curl "/api/v1/metric-runs/{run_id}" + +# 写入指标结果 +curl -X POST "/api/v1/metric-results/{metric_id}" \ + -H "Content-Type: application/json" \ + -d '{ + "metric_id": {metric_id}, + "results": [ + {"stat_time":"2024-05-01T00:00:00Z","metric_value":123.45,"data_version":"{run_id}"}, + {"stat_time":"2024-05-02T00:00:00Z","metric_value":234.56,"data_version":"{run_id}"} + ] + }' + +# 查询指标结果 +curl "/api/v1/metric-results?metric_id={metric_id}" + +# 查询最新结果 +curl "/api/v1/metric-results/latest?metric_id={metric_id}" diff --git a/file/tableschema/metrics.sql b/file/tableschema/metrics.sql new file mode 100644 index 0000000..4a16732 --- /dev/null +++ b/file/tableschema/metrics.sql @@ -0,0 +1,155 @@ +CREATE TABLE metric_def ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + + metric_code VARCHAR(64) NOT NULL, -- 内部编码:order_cnt_delivery + metric_name VARCHAR(128) NOT NULL, -- 中文名:外送订单数 + metric_aliases JSON NULL, -- 别名列表 + + biz_domain VARCHAR(64) NOT NULL, -- 通过table tag获取,支持人工配置 + biz_desc TEXT NULL, -- 业务口径描述 + + chat_turn_id BIGINT NULL, -- 来自哪轮会话 + + tech_desc TEXT NULL, -- 技术口径描述 + formula_expr TEXT NULL, -- 公式描述:"sum(pay_amount)" + base_sql MEDIUMTEXT NOT NULL, -- 标准计算SQL(逻辑SQL/snippet) + + time_grain VARCHAR(32) NOT NULL, -- DAY/HOUR/WEEK/MONTH + dim_binding JSON NOT NULL, -- 维度绑定,如 ["dt","store_id","channel"] + + update_strategy VARCHAR(32) NOT NULL, -- FULL/INCR/REALTIME + schedule_id BIGINT NULL, -- 调度ID + schedule_type INT NULL, -- 调度类型,默认调度cron + + version INT NOT NULL DEFAULT 1, + is_active TINYINT(1) NOT NULL DEFAULT 1, + + sql_hash VARCHAR(64) NULL, -- base_sql hash 用于版本比较 + created_by BIGINT NULL, + updated_by BIGINT NULL, + + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + + UNIQUE KEY uk_metric_code (metric_code), + KEY idx_domain_active (biz_domain, is_active), + KEY idx_update_strategy (update_strategy), + KEY idx_name (metric_name) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +CREATE TABLE metric_schedule ( + id BIGINT AUTO_INCREMENT PRIMARY KEY, + metric_id BIGINT NOT NULL, -- 关联 metric_def.id + + cron_expr VARCHAR(64) NOT NULL, -- 调度表达式 + enabled TINYINT(1) NOT NULL DEFAULT 1, -- 是否启用 + priority INT NOT NULL DEFAULT 10, -- 优先级 + + backfill_allowed TINYINT(1) NOT NULL DEFAULT 1, -- 是否允许补数 + max_runtime_sec INT NULL, -- 最大运行时长(秒) + retry_times INT NOT NULL DEFAULT 0, -- 失败重试次数 + + owner_team VARCHAR(64) NULL, + owner_user_id BIGINT NULL, + + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + + KEY idx_metric_enabled (metric_id, enabled), + KEY idx_owner (owner_team, owner_user_id) +) ENGINE=InnoDB DEFAULT CHARSET=utf8mb4; + + +CREATE TABLE metric_job_run ( + id BIGINT AUTO_INCREMENT, + + metric_id BIGINT NOT NULL, -- metric_def.id + schedule_id BIGINT NULL, -- metric_schedule.id,手动触发则可为空 + source_turn_id BIGINT NULL, -- 若本次运行由某次问答触发,关联 qa_turn.id + + data_time_from DATETIME NULL, -- 指标统计时间窗口起 + data_time_to DATETIME NULL, -- 指标统计时间窗口止 + + metric_version INT NOT NULL, -- 执行时使用的指标版本 + base_sql_snapshot MEDIUMTEXT NOT NULL, -- 本次执行使用的SQL快照 + + status VARCHAR(32) NOT NULL, -- RUNNING/SUCCESS/FAILED/SKIPPED + error_msg TEXT NULL, + + affected_rows BIGINT NULL, -- 写入行数 + runtime_ms BIGINT NULL, -- 执行耗时 + + triggered_by VARCHAR(32) NOT NULL, -- SCHEDULER/MANUAL/API/QA_TURN + triggered_at DATETIME NOT NULL, + started_at DATETIME NULL, + finished_at DATETIME NULL, + + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + -- 主键改为联合主键,必须包含 created_at + PRIMARY KEY (id, created_at), + KEY idx_metric_time (metric_id, data_time_from, data_time_to), + KEY idx_status_time (status, triggered_at), + KEY idx_schedule (schedule_id), + KEY idx_source_turn (source_turn_id) +) +ENGINE=InnoDB +DEFAULT CHARSET=utf8mb4 +PARTITION BY RANGE COLUMNS(created_at) ( + -- 历史数据分区(根据实际需求调整) + PARTITION p202511 VALUES LESS THAN ('2025-12-01'), + PARTITION p202512 VALUES LESS THAN ('2026-01-01'), + -- 2026年按月分区 + PARTITION p202601 VALUES LESS THAN ('2026-02-01'), + PARTITION p202602 VALUES LESS THAN ('2026-03-01'), + PARTITION p202603 VALUES LESS THAN ('2026-04-01'), + PARTITION p202604 VALUES LESS THAN ('2026-05-01'), + PARTITION p202605 VALUES LESS THAN ('2026-06-01'), + PARTITION p202606 VALUES LESS THAN ('2026-07-01'), + -- ... 可以预建几个月 ... + + -- 兜底分区,存放未来的数据,防止插入报错 + PARTITION p_future VALUES LESS THAN (MAXVALUE) +); + + +CREATE TABLE metric_result ( + id BIGINT AUTO_INCREMENT, + + metric_id BIGINT NOT NULL, -- metric_def.id + metric_version INT NOT NULL, -- metric_def.version + stat_time DATETIME NOT NULL, -- 按 time_grain 对齐后的时间 + + extra_dims JSON NULL, -- 其他维度,JSON 存 + + metric_value DECIMAL(32,8) NOT NULL, -- 指标结果值 + + load_time DATETIME NOT NULL, -- 入库时间 + data_version BIGINT NULL, -- 版本或 job_run id + + created_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP, + updated_at DATETIME NOT NULL DEFAULT CURRENT_TIMESTAMP ON UPDATE CURRENT_TIMESTAMP, + -- 主键改为联合主键,必须包含 created_at + PRIMARY KEY (id, created_at), + KEY idx_metric_time (metric_id, stat_time), + KEY idx_load_time (load_time) +) +ENGINE=InnoDB +DEFAULT CHARSET=utf8mb4 +PARTITION BY RANGE COLUMNS(created_at) ( + -- 历史数据分区(根据实际需求调整) + PARTITION p202511 VALUES LESS THAN ('2025-12-01'), + PARTITION p202512 VALUES LESS THAN ('2026-01-01'), + -- 2026年按月分区 + PARTITION p202601 VALUES LESS THAN ('2026-02-01'), + PARTITION p202602 VALUES LESS THAN ('2026-03-01'), + PARTITION p202603 VALUES LESS THAN ('2026-04-01'), + PARTITION p202604 VALUES LESS THAN ('2026-05-01'), + PARTITION p202605 VALUES LESS THAN ('2026-06-01'), + PARTITION p202606 VALUES LESS THAN ('2026-07-01'), + -- ... 可以预建几个月 ... + + -- 兜底分区,存放未来的数据,防止插入报错 + PARTITION p_future VALUES LESS THAN (MAXVALUE) +); \ No newline at end of file diff --git a/test/test_rag_client.py b/test/test_rag_client.py new file mode 100644 index 0000000..249b413 --- /dev/null +++ b/test/test_rag_client.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +import json + +import httpx +import pytest + +from app.exceptions import ProviderAPICallError +from app.schemas.rag import RagDeleteRequest, RagItemPayload, RagRetrieveRequest +from app.services.rag_client import RagAPIClient + + +@pytest.mark.asyncio +async def test_add_sends_payload_and_headers() -> None: + rag_client = RagAPIClient(base_url="http://rag.test", auth_token="secret-token") + + def handler(request: httpx.Request) -> httpx.Response: + assert request.method == "POST" + assert str(request.url) == "http://rag.test/rag/add" + assert request.headers["Authorization"] == "Bearer secret-token" + payload = json.loads(request.content.decode()) + assert payload == { + "id": 1, + "workspaceId": 2, + "name": "demo", + "embeddingData": "vector", + "type": "METRIC", + } + return httpx.Response(200, json={"ok": True, "echo": payload}) + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await rag_client.add( + client, + RagItemPayload(id=1, workspaceId=2, name="demo", embeddingData="vector", type="METRIC"), + ) + assert result["ok"] is True + assert result["echo"]["name"] == "demo" + + +@pytest.mark.asyncio +async def test_add_batch_serializes_list() -> None: + rag_client = RagAPIClient(base_url="http://rag.test", auth_token=None) + + def handler(request: httpx.Request) -> httpx.Response: + payload = json.loads(request.content.decode()) + assert request.url.path == "/rag/addBatch" + assert isinstance(payload, list) and len(payload) == 2 + return httpx.Response(200, json={"received": len(payload)}) + + items = [ + RagItemPayload(id=1, workspaceId=2, name="a", embeddingData="vec-a", type="METRIC"), + RagItemPayload(id=2, workspaceId=2, name="b", embeddingData="vec-b", type="METRIC"), + ] + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await rag_client.add_batch(client, items) + assert result == {"received": 2} + + +@pytest.mark.asyncio +async def test_http_error_raises_provider_error() -> None: + rag_client = RagAPIClient(base_url="http://rag.test") + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(500, text="boom") + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + with pytest.raises(ProviderAPICallError) as excinfo: + await rag_client.delete(client, RagDeleteRequest(id=1, type="METRIC")) + + err = excinfo.value + assert err.status_code == 500 + assert "boom" in (err.response_text or "") + + +@pytest.mark.asyncio +async def test_non_json_response_returns_raw_text() -> None: + rag_client = RagAPIClient(base_url="http://rag.test") + + def handler(request: httpx.Request) -> httpx.Response: + return httpx.Response(200, text="plain-text-body") + + transport = httpx.MockTransport(handler) + async with httpx.AsyncClient(transport=transport) as client: + result = await rag_client.retrieve( + client, RagRetrieveRequest(query="foo", num=1, workspaceId=1, type="METRIC") + ) + assert result == {"raw": "plain-text-body"} +