diff --git a/.env b/.env index 43dd8e4..2c3e988 100644 --- a/.env +++ b/.env @@ -17,7 +17,7 @@ DEFAULT_IMPORT_MODEL=deepseek:deepseek-chat IMPORT_GATEWAY_BASE_URL=http://localhost:8000 # HTTP client configuration -HTTP_CLIENT_TIMEOUT=30 +HTTP_CLIENT_TIMEOUT=60 HTTP_CLIENT_TRUST_ENV=false # HTTP_CLIENT_PROXY= diff --git a/app/models.py b/app/models.py index fbcf096..f5f9280 100644 --- a/app/models.py +++ b/app/models.py @@ -77,8 +77,8 @@ class DataImportAnalysisRequest(BaseModel): description="Ordered list of table headers associated with the data.", ) llm_model: str = Field( - ..., - description="Model identifier. Accepts 'provider:model' format or plain model name.", + None, + description="Model identifier. Accepts 'provider:model_name' format or custom model alias.", ) temperature: Optional[float] = Field( None, @@ -138,6 +138,21 @@ class DataImportAnalysisJobAck(BaseModel): status: str = Field("accepted", description="Processing status acknowledgement.") +class ActionType(str, Enum): + GE_PROFILING = "ge_profiling" + GE_RESULT_DESC = "ge_result_desc" + SNIPPET = "snippet" + SNIPPET_ALIAS = "snippet_alias" + + +class ActionStatus(str, Enum): + PENDING = "pending" + RUNNING = "running" + SUCCESS = "success" + FAILED = "failed" + PARTIAL = "partial" + + class TableProfilingJobRequest(BaseModel): table_id: str = Field(..., description="Unique identifier for the table to profile.") version_ts: str = Field( @@ -149,6 +164,10 @@ class TableProfilingJobRequest(BaseModel): ..., description="Callback endpoint invoked after each pipeline action completes.", ) + llm_model: Optional[str] = Field( + None, + description="Default LLM model spec applied to prompt-based actions when overrides are omitted.", + ) table_schema: Optional[Any] = Field( None, description="Schema structure snapshot for the current table version.", @@ -196,10 +215,7 @@ class TableProfilingJobRequest(BaseModel): "user_configurable", description="Profiler implementation identifier. Currently supports 'user_configurable' or 'data_assistant'.", ) - llm_model: Optional[str] = Field( - None, - description="Default LLM model spec applied to prompt-based actions when overrides are omitted.", - ) + result_desc_model: Optional[str] = Field( None, description="LLM model override used for GE result description (action 2).", @@ -222,3 +238,56 @@ class TableProfilingJobAck(BaseModel): table_id: str = Field(..., description="Echo of the table identifier.") version_ts: str = Field(..., description="Echo of the profiling version timestamp (yyyyMMddHHmmss).") status: str = Field("accepted", description="Processing acknowledgement status.") + + +class TableSnippetUpsertRequest(BaseModel): + table_id: int = Field(..., ge=1, description="Unique identifier for the table.") + version_ts: int = Field( + ..., + ge=0, + description="Version timestamp aligned with the pipeline (yyyyMMddHHmmss as integer).", + ) + action_type: ActionType = Field(..., description="Pipeline action type for this record.") + status: ActionStatus = Field( + ActionStatus.SUCCESS, description="Execution status for the action." + ) + callback_url: HttpUrl = Field(..., description="Callback URL associated with the action run.") + table_schema_version_id: int = Field(..., ge=0, description="Identifier for the schema snapshot.") + table_schema: Any = Field(..., description="Schema snapshot payload for the table.") + result_json: Optional[Any] = Field( + None, + description="Primary result payload for the action (e.g., profiling output, snippet array).", + ) + result_summary_json: Optional[Any] = Field( + None, + description="Optional summary payload (e.g., profiling summary) for the action.", + ) + html_report_url: Optional[str] = Field( + None, + description="Optional HTML report URL generated by the action.", + ) + error_code: Optional[str] = Field(None, description="Optional error code when status indicates a failure.") + error_message: Optional[str] = Field(None, description="Optional error message when status indicates a failure.") + started_at: Optional[datetime] = Field( + None, description="Timestamp when the action started executing." + ) + finished_at: Optional[datetime] = Field( + None, description="Timestamp when the action finished executing." + ) + duration_ms: Optional[int] = Field( + None, + ge=0, + description="Optional execution duration in milliseconds.", + ) + result_checksum: Optional[str] = Field( + None, + description="Optional checksum for the result payload (e.g., MD5).", + ) + + +class TableSnippetUpsertResponse(BaseModel): + table_id: int + version_ts: int + action_type: ActionType + status: ActionStatus + updated: bool diff --git a/app/services/import_analysis.py b/app/services/import_analysis.py index 8b9a3a0..0aef6fc 100644 --- a/app/services/import_analysis.py +++ b/app/services/import_analysis.py @@ -42,7 +42,7 @@ def _env_float(name: str, default: float) -> float: return default -IMPORT_CHAT_TIMEOUT_SECONDS = _env_float("IMPORT_CHAT_TIMEOUT_SECONDS", 90.0) +IMPORT_CHAT_TIMEOUT_SECONDS = _env_float("IMPORT_CHAT_TIMEOUT_SECONDS", 120.0) SUPPORTED_IMPORT_MODELS = get_supported_import_models() @@ -298,7 +298,7 @@ def parse_llm_analysis_json(llm_response: LLMResponse) -> Dict[str, Any]: try: return json.loads(json_payload) except json.JSONDecodeError as exc: - preview = json_payload[:2000] + preview = json_payload[:10000] logger.error("Failed to parse JSON from LLM response content: %s", preview, exc_info=True) raise ProviderAPICallError("LLM response JSON could not be parsed.") from exc diff --git a/ge_v1.py b/ge_v1.py index e1165fa..872bab2 100644 --- a/ge_v1.py +++ b/ge_v1.py @@ -121,7 +121,7 @@ def clean_value(value: Any) -> Any: if isinstance(value, (np.generic,)): return value.item() if isinstance(value, pd.Timestamp): - return value.isoformat() + return str(value) if pd.isna(value): return None return value