from __future__ import annotations import json import logging from typing import Any, Dict, Tuple from sqlalchemy import text from sqlalchemy.engine import Engine from sqlalchemy.exc import SQLAlchemyError from app.db import get_engine from app.models import ( ActionType, TableSnippetUpsertRequest, TableSnippetUpsertResponse, ) logger = logging.getLogger(__name__) def _serialize_json(value: Any) -> Tuple[str | None, int | None]: logger.debug("Serializing JSON payload: %s", value) if value is None: return None, None if isinstance(value, str): encoded = value.encode("utf-8") return value, len(encoded) serialized = json.dumps(value, ensure_ascii=False) encoded = serialized.encode("utf-8") return serialized, len(encoded) def _prepare_table_schema(value: Any) -> str: logger.debug("Preparing table_schema payload.") if isinstance(value, str): return value return json.dumps(value, ensure_ascii=False) def _prepare_model_params(params: Dict[str, Any] | None) -> str | None: if not params: return None serialized, _ = _serialize_json(params) return serialized def _collect_common_columns(request: TableSnippetUpsertRequest) -> Dict[str, Any]: logger.debug( "Collecting common columns for table_id=%s version_ts=%s action_type=%s", request.table_id, request.version_ts, request.action_type, ) payload: Dict[str, Any] = { "table_id": request.table_id, "version_ts": request.version_ts, "action_type": request.action_type.value, "status": request.status.value, "callback_url": str(request.callback_url), "table_schema_version_id": request.table_schema_version_id, "table_schema": _prepare_table_schema(request.table_schema), "model": request.model, "model_provider": request.model_provider, } payload.update( { "ge_profiling_json": None, "ge_profiling_json_size_bytes": None, "ge_profiling_summary": None, "ge_profiling_summary_size_bytes": None, "ge_profiling_total_size_bytes": None, "ge_profiling_html_report_url": None, "ge_result_desc_json": None, "ge_result_desc_json_size_bytes": None, "snippet_json": None, "snippet_json_size_bytes": None, "snippet_alias_json": None, "snippet_alias_json_size_bytes": None, } ) payload["model_params"] = _prepare_model_params(request.model_params) if request.llm_usage is not None: llm_usage_json, _ = _serialize_json(request.llm_usage) if llm_usage_json is not None: payload["llm_usage"] = llm_usage_json if request.error_code is not None: logger.debug("Adding error_code: %s", request.error_code) payload["error_code"] = request.error_code if request.error_message is not None: logger.debug("Adding error_message: %s", request.error_message) payload["error_message"] = request.error_message if request.started_at is not None: payload["started_at"] = request.started_at if request.finished_at is not None: payload["finished_at"] = request.finished_at if request.duration_ms is not None: payload["duration_ms"] = request.duration_ms if request.result_checksum is not None: payload["result_checksum"] = request.result_checksum logger.debug("Collected common payload: %s", payload) return payload def _apply_action_payload( request: TableSnippetUpsertRequest, payload: Dict[str, Any], ) -> None: logger.debug("Applying action-specific payload for action_type=%s", request.action_type) if request.action_type == ActionType.GE_PROFILING: full_json, full_size = _serialize_json(request.ge_profiling_json) summary_json, summary_size = _serialize_json(request.ge_profiling_summary) if full_json is not None: payload["ge_profiling_json"] = full_json payload["ge_profiling_json_size_bytes"] = full_size if summary_json is not None: payload["ge_profiling_summary"] = summary_json payload["ge_profiling_summary_size_bytes"] = summary_size if request.ge_profiling_total_size_bytes is not None: payload["ge_profiling_total_size_bytes"] = request.ge_profiling_total_size_bytes elif full_size is not None or summary_size is not None: payload["ge_profiling_total_size_bytes"] = (full_size or 0) + (summary_size or 0) if request.ge_profiling_html_report_url: payload["ge_profiling_html_report_url"] = request.ge_profiling_html_report_url elif request.action_type == ActionType.GE_RESULT_DESC: full_json, full_size = _serialize_json(request.ge_result_desc_json) if full_json is not None: payload["ge_result_desc_json"] = full_json payload["ge_result_desc_json_size_bytes"] = full_size elif request.action_type == ActionType.SNIPPET: full_json, full_size = _serialize_json(request.snippet_json) if full_json is not None: payload["snippet_json"] = full_json payload["snippet_json_size_bytes"] = full_size elif request.action_type == ActionType.SNIPPET_ALIAS: full_json, full_size = _serialize_json(request.snippet_alias_json) if full_json is not None: payload["snippet_alias_json"] = full_json payload["snippet_alias_json_size_bytes"] = full_size else: logger.error("Unsupported action type encountered: %s", request.action_type) raise ValueError(f"Unsupported action type '{request.action_type}'.") logger.debug("Payload after applying action-specific data: %s", payload) def _build_insert_statement(columns: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]: logger.debug("Building insert statement for columns: %s", list(columns.keys())) column_names = list(columns.keys()) placeholders = [f":{name}" for name in column_names] update_assignments = [ f"{name}=VALUES({name})" for name in column_names if name not in {"table_id", "version_ts", "action_type"} ] update_assignments.append("updated_at=CURRENT_TIMESTAMP") sql = ( "INSERT INTO action_results ({cols}) VALUES ({vals}) " "ON DUPLICATE KEY UPDATE {updates}" ).format( cols=", ".join(column_names), vals=", ".join(placeholders), updates=", ".join(update_assignments), ) logger.debug("Generated SQL: %s", sql) return sql, columns def _execute_upsert(engine: Engine, sql: str, params: Dict[str, Any]) -> int: logger.info("Executing upsert for table_id=%s version_ts=%s action_type=%s", params.get("table_id"), params.get("version_ts"), params.get("action_type")) with engine.begin() as conn: result = conn.execute(text(sql), params) logger.info("Rows affected: %s", result.rowcount) return result.rowcount def upsert_action_result(request: TableSnippetUpsertRequest) -> TableSnippetUpsertResponse: logger.info( "Received upsert request: table_id=%s version_ts=%s action_type=%s status=%s", request.table_id, request.version_ts, request.action_type, request.status, ) logger.debug("Request payload: %s", request.model_dump()) columns = _collect_common_columns(request) _apply_action_payload(request, columns) sql, params = _build_insert_statement(columns) logger.debug("Final SQL params: %s", params) engine = get_engine() try: rowcount = _execute_upsert(engine, sql, params) except SQLAlchemyError as exc: logger.exception( "Failed to upsert action result: table_id=%s version_ts=%s action_type=%s", request.table_id, request.version_ts, request.action_type, ) raise RuntimeError(f"Database operation failed: {exc}") from exc updated = rowcount > 1 return TableSnippetUpsertResponse( table_id=request.table_id, version_ts=request.version_ts, action_type=request.action_type, status=request.status, updated=updated, )