207 lines
7.8 KiB
Python
207 lines
7.8 KiB
Python
from __future__ import annotations
|
|
|
|
import json
|
|
import logging
|
|
from typing import Any, Dict, Tuple
|
|
|
|
from sqlalchemy import text
|
|
from sqlalchemy.engine import Engine
|
|
from sqlalchemy.exc import SQLAlchemyError
|
|
|
|
from app.db import get_engine
|
|
from app.models import (
|
|
ActionType,
|
|
TableSnippetUpsertRequest,
|
|
TableSnippetUpsertResponse,
|
|
)
|
|
|
|
|
|
logger = logging.getLogger(__name__)
|
|
|
|
|
|
def _serialize_json(value: Any) -> Tuple[str | None, int | None]:
|
|
logger.debug("Serializing JSON payload: %s", value)
|
|
if value is None:
|
|
return None, None
|
|
if isinstance(value, str):
|
|
encoded = value.encode("utf-8")
|
|
return value, len(encoded)
|
|
serialized = json.dumps(value, ensure_ascii=False)
|
|
encoded = serialized.encode("utf-8")
|
|
return serialized, len(encoded)
|
|
|
|
|
|
def _prepare_table_schema(value: Any) -> str:
|
|
logger.debug("Preparing table_schema payload.")
|
|
if isinstance(value, str):
|
|
return value
|
|
return json.dumps(value, ensure_ascii=False)
|
|
|
|
|
|
def _collect_common_columns(request: TableSnippetUpsertRequest) -> Dict[str, Any]:
|
|
logger.debug(
|
|
"Collecting common columns for table_id=%s version_ts=%s action_type=%s",
|
|
request.table_id,
|
|
request.version_ts,
|
|
request.action_type,
|
|
)
|
|
payload: Dict[str, Any] = {
|
|
"table_id": request.table_id,
|
|
"version_ts": request.version_ts,
|
|
"action_type": request.action_type.value,
|
|
"status": request.status.value,
|
|
"callback_url": str(request.callback_url),
|
|
"table_schema_version_id": request.table_schema_version_id,
|
|
"table_schema": _prepare_table_schema(request.table_schema),
|
|
}
|
|
|
|
payload.update(
|
|
{
|
|
"ge_profiling_json": None,
|
|
"ge_profiling_json_size_bytes": None,
|
|
"ge_profiling_summary": None,
|
|
"ge_profiling_summary_size_bytes": None,
|
|
"ge_profiling_total_size_bytes": None,
|
|
"ge_profiling_html_report_url": None,
|
|
"ge_result_desc_json": None,
|
|
"ge_result_desc_json_size_bytes": None,
|
|
"snippet_json": None,
|
|
"snippet_json_size_bytes": None,
|
|
"snippet_alias_json": None,
|
|
"snippet_alias_json_size_bytes": None,
|
|
}
|
|
)
|
|
|
|
if request.llm_usage is not None:
|
|
llm_usage_json, _ = _serialize_json(request.llm_usage)
|
|
if llm_usage_json is not None:
|
|
payload["llm_usage"] = llm_usage_json
|
|
|
|
if request.error_code is not None:
|
|
logger.debug("Adding error_code: %s", request.error_code)
|
|
payload["error_code"] = request.error_code
|
|
if request.error_message is not None:
|
|
logger.debug("Adding error_message: %s", request.error_message)
|
|
payload["error_message"] = request.error_message
|
|
if request.started_at is not None:
|
|
payload["started_at"] = request.started_at
|
|
if request.finished_at is not None:
|
|
payload["finished_at"] = request.finished_at
|
|
if request.duration_ms is not None:
|
|
payload["duration_ms"] = request.duration_ms
|
|
if request.result_checksum is not None:
|
|
payload["result_checksum"] = request.result_checksum
|
|
|
|
logger.debug("Collected common payload: %s", payload)
|
|
return payload
|
|
|
|
|
|
def _apply_action_payload(
|
|
request: TableSnippetUpsertRequest,
|
|
payload: Dict[str, Any],
|
|
) -> None:
|
|
logger.debug("Applying action-specific payload for action_type=%s", request.action_type)
|
|
if request.action_type == ActionType.GE_PROFILING:
|
|
full_json, full_size = _serialize_json(request.ge_profiling_json)
|
|
summary_json, summary_size = _serialize_json(request.ge_profiling_summary)
|
|
if full_json is not None:
|
|
payload["ge_profiling_json"] = full_json
|
|
payload["ge_profiling_json_size_bytes"] = full_size
|
|
if summary_json is not None:
|
|
payload["ge_profiling_summary"] = summary_json
|
|
payload["ge_profiling_summary_size_bytes"] = summary_size
|
|
if request.ge_profiling_total_size_bytes is not None:
|
|
payload["ge_profiling_total_size_bytes"] = request.ge_profiling_total_size_bytes
|
|
elif full_size is not None or summary_size is not None:
|
|
payload["ge_profiling_total_size_bytes"] = (full_size or 0) + (summary_size or 0)
|
|
if request.ge_profiling_html_report_url:
|
|
payload["ge_profiling_html_report_url"] = request.ge_profiling_html_report_url
|
|
elif request.action_type == ActionType.GE_RESULT_DESC:
|
|
full_json, full_size = _serialize_json(request.ge_result_desc_json)
|
|
if full_json is not None:
|
|
payload["ge_result_desc_json"] = full_json
|
|
payload["ge_result_desc_json_size_bytes"] = full_size
|
|
elif request.action_type == ActionType.SNIPPET:
|
|
full_json, full_size = _serialize_json(request.snippet_json)
|
|
if full_json is not None:
|
|
payload["snippet_json"] = full_json
|
|
payload["snippet_json_size_bytes"] = full_size
|
|
elif request.action_type == ActionType.SNIPPET_ALIAS:
|
|
full_json, full_size = _serialize_json(request.snippet_alias_json)
|
|
if full_json is not None:
|
|
payload["snippet_alias_json"] = full_json
|
|
payload["snippet_alias_json_size_bytes"] = full_size
|
|
else:
|
|
logger.error("Unsupported action type encountered: %s", request.action_type)
|
|
raise ValueError(f"Unsupported action type '{request.action_type}'.")
|
|
|
|
logger.debug("Payload after applying action-specific data: %s", payload)
|
|
|
|
|
|
def _build_insert_statement(columns: Dict[str, Any]) -> Tuple[str, Dict[str, Any]]:
|
|
logger.debug("Building insert statement for columns: %s", list(columns.keys()))
|
|
column_names = list(columns.keys())
|
|
placeholders = [f":{name}" for name in column_names]
|
|
update_assignments = [
|
|
f"{name}=VALUES({name})"
|
|
for name in column_names
|
|
if name not in {"table_id", "version_ts", "action_type"}
|
|
]
|
|
update_assignments.append("updated_at=CURRENT_TIMESTAMP")
|
|
|
|
sql = (
|
|
"INSERT INTO action_results ({cols}) VALUES ({vals}) "
|
|
"ON DUPLICATE KEY UPDATE {updates}"
|
|
).format(
|
|
cols=", ".join(column_names),
|
|
vals=", ".join(placeholders),
|
|
updates=", ".join(update_assignments),
|
|
)
|
|
logger.debug("Generated SQL: %s", sql)
|
|
return sql, columns
|
|
|
|
|
|
def _execute_upsert(engine: Engine, sql: str, params: Dict[str, Any]) -> int:
|
|
logger.info("Executing upsert for table_id=%s version_ts=%s action_type=%s", params.get("table_id"), params.get("version_ts"), params.get("action_type"))
|
|
with engine.begin() as conn:
|
|
result = conn.execute(text(sql), params)
|
|
logger.info("Rows affected: %s", result.rowcount)
|
|
return result.rowcount
|
|
|
|
|
|
def upsert_action_result(request: TableSnippetUpsertRequest) -> TableSnippetUpsertResponse:
|
|
logger.info(
|
|
"Received upsert request: table_id=%s version_ts=%s action_type=%s status=%s",
|
|
request.table_id,
|
|
request.version_ts,
|
|
request.action_type,
|
|
request.status,
|
|
)
|
|
logger.debug("Request payload: %s", request.model_dump())
|
|
columns = _collect_common_columns(request)
|
|
_apply_action_payload(request, columns)
|
|
|
|
sql, params = _build_insert_statement(columns)
|
|
logger.debug("Final SQL params: %s", params)
|
|
|
|
engine = get_engine()
|
|
try:
|
|
rowcount = _execute_upsert(engine, sql, params)
|
|
except SQLAlchemyError as exc:
|
|
logger.exception(
|
|
"Failed to upsert action result: table_id=%s version_ts=%s action_type=%s",
|
|
request.table_id,
|
|
request.version_ts,
|
|
request.action_type,
|
|
)
|
|
raise RuntimeError(f"Database operation failed: {exc}") from exc
|
|
|
|
updated = rowcount > 1
|
|
return TableSnippetUpsertResponse(
|
|
table_id=request.table_id,
|
|
version_ts=request.version_ts,
|
|
action_type=request.action_type,
|
|
status=request.status,
|
|
updated=updated,
|
|
)
|