From daf951d81f0ac9fca647fdda0bb5f4b3d4ac69ea Mon Sep 17 00:00:00 2001 From: zhaoawd Date: Wed, 10 Dec 2025 00:20:14 +0800 Subject: [PATCH] =?UTF-8?q?=E6=81=A2=E5=A4=8DGE=E7=89=88=E6=9C=AC=E4=B8=BA?= =?UTF-8?q?0.18=EF=BC=8C=E7=94=9F=E6=88=90SNIPPET=E5=90=8E=E8=87=AA?= =?UTF-8?q?=E5=8A=A8=E7=94=9F=E6=88=90rag=5Ftext=E6=B5=81=E7=A8=8B?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/main.py | 37 ++++++++++++++++++++++++++++++++- app/models.py | 29 +++++++++++++++++++++++++- app/services/table_profiling.py | 14 ++++++------- app/services/table_snippet.py | 15 ++++++++++++- app/settings.py | 2 +- pyproject.toml | 16 +++++++++++++- 6 files changed, 100 insertions(+), 13 deletions(-) diff --git a/app/main.py b/app/main.py index 1eee626..085e561 100644 --- a/app/main.py +++ b/app/main.py @@ -24,6 +24,8 @@ from app.models import ( LLMResponse, TableProfilingJobAck, TableProfilingJobRequest, + TableSnippetRagIngestRequest, + TableSnippetRagIngestResponse, TableSnippetUpsertRequest, TableSnippetUpsertResponse, ) @@ -252,6 +254,7 @@ def create_app() -> FastAPI: ) raise HTTPException(status_code=500, detail=str(exc)) from exc else: + # After snippet_alias is stored, automatically trigger RAG ingest when configured. if ( payload.action_type == ActionType.SNIPPET_ALIAS and payload.status == ActionStatus.SUCCESS @@ -267,14 +270,46 @@ def create_app() -> FastAPI: ) except Exception: logger.exception( - "Failed to ingest snippet RAG artifacts", + "Failed to ingest snippet RAG artifacts after snippet_alias upsert", extra={ "table_id": payload.table_id, "version_ts": payload.version_ts, + "workspace_id": payload.rag_workspace_id, }, ) + return response + @application.post( + "/v1/table/snippet/rag_ingest", + response_model=TableSnippetRagIngestResponse, + summary="Merge snippet+alias results from action_results and ingest into RAG.", + ) + async def ingest_snippet_rag( + payload: TableSnippetRagIngestRequest, + client: httpx.AsyncClient = Depends(get_http_client), + ) -> TableSnippetRagIngestResponse: + try: + rag_item_ids = await ingest_snippet_rag_from_db( + table_id=payload.table_id, + version_ts=payload.version_ts, + workspace_id=payload.workspace_id, + rag_item_type=payload.rag_item_type or "SNIPPET", + client=client, + ) + except Exception as exc: + logger.exception( + "Failed to ingest snippet RAG artifacts", + extra={ + "table_id": payload.table_id, + "version_ts": payload.version_ts, + "workspace_id": payload.workspace_id, + }, + ) + raise HTTPException(status_code=500, detail=str(exc)) from exc + + return TableSnippetRagIngestResponse(rag_item_ids=rag_item_ids) + @application.post("/__mock__/import-callback") async def mock_import_callback(payload: dict[str, Any]) -> dict[str, str]: logger.info("Received import analysis callback: %s", payload) diff --git a/app/models.py b/app/models.py index 7405597..879a6b0 100644 --- a/app/models.py +++ b/app/models.py @@ -232,6 +232,15 @@ class TableProfilingJobRequest(BaseModel): None, description="Miscellaneous execution flags applied across pipeline steps.", ) + workspace_id: Optional[int] = Field( + None, + ge=0, + description="Optional workspace identifier forwarded to snippet_alias callback for RAG ingestion.", + ) + rag_item_type: Optional[str] = Field( + "SNIPPET", + description="Optional RAG item type forwarded to snippet_alias callback.", + ) class TableProfilingJobAck(BaseModel): @@ -247,7 +256,7 @@ class TableSnippetUpsertRequest(BaseModel): ge=0, description="Version timestamp aligned with the pipeline (yyyyMMddHHmmss as integer).", ) - rag_workspace_id: Optional[int] = Field( + workspace_id: Optional[int] = Field( None, ge=0, description="Optional workspace identifier for RAG ingestion; when provided and action_type=snippet_alias " @@ -329,6 +338,24 @@ class TableSnippetUpsertRequest(BaseModel): ge=0, description="Optional execution duration in milliseconds.", ) + + +class TableSnippetRagIngestRequest(BaseModel): + table_id: int = Field(..., ge=1, description="Unique identifier for the table.") + version_ts: int = Field( + ..., + ge=0, + description="Version timestamp aligned with the pipeline (yyyyMMddHHmmss as integer).", + ) + workspace_id: int = Field(..., ge=0, description="Workspace id used when pushing snippets to RAG.") + rag_item_type: Optional[str] = Field( + "SNIPPET", + description="Optional RAG item type used when pushing snippets to RAG. Defaults to 'SNIPPET'.", + ) + + +class TableSnippetRagIngestResponse(BaseModel): + rag_item_ids: List[int] = Field(..., description="List of ingested rag_item_ids.") result_checksum: Optional[str] = Field( None, description="Optional checksum for the result payload (e.g., MD5).", diff --git a/app/services/table_profiling.py b/app/services/table_profiling.py index 26143d5..e43e9a0 100644 --- a/app/services/table_profiling.py +++ b/app/services/table_profiling.py @@ -24,7 +24,6 @@ from app.services import LLMGateway from app.settings import DEFAULT_IMPORT_MODEL from app.services.import_analysis import ( IMPORT_GATEWAY_BASE_URL, - build_import_gateway_headers, resolve_provider_from_model, ) from app.utils.llm_usage import extract_usage as extract_llm_usage @@ -533,7 +532,6 @@ async def _call_chat_completions( temperature: float = 0.2, timeout_seconds: Optional[float] = None, ) -> Any: - # Normalize model spec to provider+model and issue the unified chat call. provider, model_name = resolve_provider_from_model(model_spec) payload = { "provider": provider.value, @@ -547,17 +545,16 @@ async def _call_chat_completions( payload_size_bytes = len(json.dumps(payload, ensure_ascii=False).encode("utf-8")) url = f"{IMPORT_GATEWAY_BASE_URL.rstrip('/')}/v1/chat/completions" - headers = build_import_gateway_headers() try: + # log the request whole info logger.info( - "Calling chat completions API %s with model=%s payload_size=%sB", + "Calling chat completions API %s with model %s and size %s and payload %s", url, model_name, payload_size_bytes, + payload, ) - response = await client.post( - url, json=payload, timeout=timeout_seconds, headers=headers - ) + response = await client.post(url, json=payload, timeout=timeout_seconds) response.raise_for_status() except httpx.HTTPError as exc: @@ -706,7 +703,6 @@ async def _run_action_with_callback( input_payload: Any = None, model_spec: Optional[str] = None, ) -> Any: - # Execute a pipeline action and always emit a callback capturing success/failure. if input_payload is not None: logger.info( "Pipeline action %s input: %s", @@ -789,6 +785,8 @@ async def process_table_profiling_job( "table_schema_version_id": request.table_schema_version_id, "llm_model": request.llm_model, "llm_timeout_seconds": timeout_seconds, + "workspace_id": request.workspace_id, + "rag_item_type": request.rag_item_type, } logging_request_payload = _profiling_request_for_log(request) diff --git a/app/services/table_snippet.py b/app/services/table_snippet.py index 70db2d3..00376a4 100644 --- a/app/services/table_snippet.py +++ b/app/services/table_snippet.py @@ -459,6 +459,18 @@ def _stable_rag_item_id(table_id: int, version_ts: int, snippet_id: str) -> int: return int(digest[:16], 16) % 9_000_000_000_000_000_000 +def _to_serializable(value: Any) -> Any: + if value is None or isinstance(value, (str, int, float, bool)): + return value + if isinstance(value, datetime): + return value.isoformat() + if isinstance(value, dict): + return {k: _to_serializable(v) for k, v in value.items()} + if isinstance(value, list): + return [_to_serializable(v) for v in value] + return str(value) + + def _build_rag_text(snippet: Dict[str, Any]) -> str: # Deterministic text concatenation for embedding input. parts: List[str] = [] @@ -512,7 +524,8 @@ def _prepare_rag_payloads( continue rag_item_id = _stable_rag_item_id(table_id, version_ts, snippet_id) rag_text = _build_rag_text(snippet) - merged_json = json.dumps(snippet, ensure_ascii=False) + serializable_snippet = _to_serializable(snippet) + merged_json = json.dumps(serializable_snippet, ensure_ascii=False) updated_at_raw = snippet.get("updated_at_from_action") or now if isinstance(updated_at_raw, str): try: diff --git a/app/settings.py b/app/settings.py index 4faac7b..dc4deb1 100644 --- a/app/settings.py +++ b/app/settings.py @@ -23,7 +23,7 @@ PROVIDER_KEY_ENV_MAP: Dict[str, str] = { DEFAULT_IMPORT_MODEL = os.getenv("DEFAULT_IMPORT_MODEL", "deepseek:deepseek-chat") NEW_API_BASE_URL = os.getenv("NEW_API_BASE_URL") NEW_API_AUTH_TOKEN = os.getenv("NEW_API_AUTH_TOKEN") -RAG_API_BASE_URL = os.getenv("RAG_API_BASE_URL", "http://127.0.0.1:8000") +RAG_API_BASE_URL = os.getenv("RAG_API_BASE_URL", "https://tchatbi.agentcarrier.cn/chatbi/api") RAG_API_AUTH_TOKEN = os.getenv("RAG_API_AUTH_TOKEN") diff --git a/pyproject.toml b/pyproject.toml index b1c7160..b647ba5 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -4,4 +4,18 @@ version = "0.1.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.11" -dependencies = [] +dependencies = [ + "fastapi>=0.111.0", + "uvicorn[standard]>=0.29.0", + "pydantic>=2.6.0", + "sqlalchemy>=2.0.28", + "pymysql>=1.1.0", + "great-expectations[profilers]==0.18.19", + "pandas>=2.0", + "numpy>=1.24", + "openpyxl>=3.1", + "httpx==0.27.2", + "python-dotenv==1.0.1", + "requests>=2.31.0", + "PyYAML>=6.0.1", +]