From f261121845e94247718a738a466551fc681a1f46 Mon Sep 17 00:00:00 2001 From: zhaoawd Date: Mon, 8 Dec 2025 23:11:43 +0800 Subject: [PATCH] =?UTF-8?q?=E5=88=87=E6=8D=A2=E6=88=90new-api=E6=96=B9?= =?UTF-8?q?=E5=BC=8F=E8=BF=9B=E8=A1=8Cllm=E8=B0=83=E7=94=A8?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .env | 4 +- README.md | 5 +- app/services/gateway.py | 120 +++++++++++++++++++++----------- app/services/import_analysis.py | 32 ++++++--- app/services/table_profiling.py | 12 ++-- app/settings.py | 6 +- project.md | 23 ++++++ 7 files changed, 145 insertions(+), 57 deletions(-) create mode 100644 project.md diff --git a/.env b/.env index 2c3e988..97c6958 100644 --- a/.env +++ b/.env @@ -17,7 +17,7 @@ DEFAULT_IMPORT_MODEL=deepseek:deepseek-chat IMPORT_GATEWAY_BASE_URL=http://localhost:8000 # HTTP client configuration -HTTP_CLIENT_TIMEOUT=60 +HTTP_CLIENT_TIMEOUT=120 HTTP_CLIENT_TRUST_ENV=false # HTTP_CLIENT_PROXY= @@ -27,3 +27,5 @@ IMPORT_CHAT_TIMEOUT_SECONDS=120 # Logging LOG_LEVEL=INFO # LOG_FORMAT=%(asctime)s %(levelname)s %(name)s:%(lineno)d %(message)s +NEW_API_BASE_URL=http://localhost:3000 +NEW_API_AUTH_TOKEN="sk-Q79KGFJRs5Vk9HsfFqoiJk948uLMDhAVe037AeCb31URyWGL" \ No newline at end of file diff --git a/README.md b/README.md index cf621a7..da6bd5f 100644 --- a/README.md +++ b/README.md @@ -2,7 +2,7 @@ This project exposes a FastAPI-based microservice that provides: -- A unified chat completions gateway supporting multiple LLM providers (OpenAI, Anthropic, OpenRouter, Gemini, Qwen, DeepSeek, etc.) +- A unified chat completions gateway that now forwards requests to the internal `new-api` service (default `http://localhost:3000`) while preserving the same client-facing schema. - An asynchronous data import analysis pipeline that orchestrates LLM calls to produce structured metadata and processing recommendations The following instructions cover environment setup, dependency installation, and running the backend service. @@ -56,6 +56,7 @@ Copy `.env.example` to `.env` (if provided) or edit `.env` to supply API keys an - `OPENAI_API_KEY`, `ANTHROPIC_API_KEY`, `OPENROUTER_API_KEY`, etc. - `HTTP_CLIENT_TIMEOUT`, `IMPORT_CHAT_TIMEOUT_SECONDS` - `LOG_LEVEL`, `LOG_FORMAT` for logging +- `NEW_API_BASE_URL` (defaults to `http://localhost:3000`) and optional `NEW_API_AUTH_TOKEN` if the new-api component enforces authentication. ## Run the Backend Service @@ -84,4 +85,4 @@ Or use a process manager such as `pm2`, `supervisor`, or systemd for production - Run the data import analysis example: `python test/data_import_analysis_example.py` - Test the OpenRouter demo: `python test/openrouter_chat_example.py` -- Send a DeepSeek chat request script: `python scripts/deepseek_request.py` \ No newline at end of file +- Send a DeepSeek chat request script: `python scripts/deepseek_request.py` diff --git a/app/services/gateway.py b/app/services/gateway.py index bb40725..0042206 100644 --- a/app/services/gateway.py +++ b/app/services/gateway.py @@ -1,53 +1,93 @@ from __future__ import annotations -import os -from typing import Dict, Type +import logging import httpx +from pydantic import ValidationError -from app.exceptions import ProviderConfigurationError -from app.models import LLMProvider, LLMRequest, LLMResponse -from app.providers import ( - AnthropicProvider, - DeepSeekProvider, - GeminiProvider, - LLMProviderClient, - OpenAIProvider, - OpenRouterProvider, - QwenProvider, -) +from app.exceptions import ProviderAPICallError +from app.models import LLMChoice, LLMMessage, LLMRequest, LLMResponse +from app.settings import NEW_API_AUTH_TOKEN, NEW_API_BASE_URL + + +logger = logging.getLogger(__name__) class LLMGateway: - """Simple registry that dispatches chat requests to provider clients.""" + """Forward chat requests to the configured new-api component.""" - def __init__(self) -> None: - self._providers: Dict[LLMProvider, LLMProviderClient] = {} - self._factory: Dict[LLMProvider, Type[LLMProviderClient]] = { - LLMProvider.OPENAI: OpenAIProvider, - LLMProvider.ANTHROPIC: AnthropicProvider, - LLMProvider.OPENROUTER: OpenRouterProvider, - LLMProvider.GEMINI: GeminiProvider, - LLMProvider.QWEN: QwenProvider, - LLMProvider.DEEPSEEK: DeepSeekProvider, - } - - def get_provider(self, provider: LLMProvider) -> LLMProviderClient: - if provider not in self._factory: - raise ProviderConfigurationError(f"Unsupported provider '{provider.value}'.") - - if provider not in self._providers: - self._providers[provider] = self._build_provider(provider) - return self._providers[provider] - - def _build_provider(self, provider: LLMProvider) -> LLMProviderClient: - provider_cls = self._factory[provider] - api_key_env = getattr(provider_cls, "api_key_env", None) - api_key = os.getenv(api_key_env) if api_key_env else None - return provider_cls(api_key) + def __init__( + self, + *, + base_url: str | None = None, + auth_token: str | None = None, + ) -> None: + resolved_base = base_url or NEW_API_BASE_URL + self._base_url = resolved_base.rstrip("/") + self._auth_token = auth_token or NEW_API_AUTH_TOKEN async def chat( self, request: LLMRequest, client: httpx.AsyncClient ) -> LLMResponse: - provider_client = self.get_provider(request.provider) - return await provider_client.chat(request, client) + url = f"{self._base_url}/v1/chat/completions" + payload = request.model_dump(mode="json", exclude_none=True) + headers = {"Content-Type": "application/json"} + if self._auth_token: + headers["Authorization"] = f"Bearer {self._auth_token}" + logger.info("Forwarding chat request to new-api at %s", url) + try: + response = await client.post(url, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code if exc.response else None + response_text = exc.response.text if exc.response else "" + logger.error( + "new-api upstream returned %s: %s", + status_code, + response_text, + exc_info=True, + ) + raise ProviderAPICallError( + "Chat completion request failed.", + status_code=status_code, + response_text=response_text, + ) from exc + except httpx.HTTPError as exc: + logger.error("new-api transport error: %s", exc, exc_info=True) + raise ProviderAPICallError(f"Chat completion request failed: {exc}") from exc + + try: + data = response.json() + except ValueError as exc: + logger.error("new-api responded with invalid JSON.", exc_info=True) + raise ProviderAPICallError( + "Chat completion response was not valid JSON." + ) from exc + + logger.info("new-api payload: %s", data) + normalized_choices: list[LLMChoice] = [] + for idx, choice in enumerate(data.get("choices", []) or []): + message_payload = choice.get("message") or {} + message = LLMMessage( + role=message_payload.get("role", "assistant"), + content=message_payload.get("content", ""), + ) + normalized_choices.append( + LLMChoice(index=choice.get("index", idx), message=message) + ) + + try: + normalized_response = LLMResponse( + provider=request.provider, + model=data.get("model", request.model), + choices=normalized_choices, + raw=data, + ) + return normalized_response + except ValidationError as exc: + logger.error( + "new-api response did not match expected schema: %s", data, exc_info=True + ) + raise ProviderAPICallError( + "Chat completion response was not in the expected format." + ) from exc diff --git a/app/services/import_analysis.py b/app/services/import_analysis.py index c9a55f2..96e5bdf 100644 --- a/app/services/import_analysis.py +++ b/app/services/import_analysis.py @@ -22,14 +22,24 @@ from app.models import ( LLMResponse, LLMRole, ) -from app.settings import DEFAULT_IMPORT_MODEL, get_supported_import_models +from app.settings import ( + DEFAULT_IMPORT_MODEL, + NEW_API_AUTH_TOKEN, + NEW_API_BASE_URL, + get_supported_import_models, +) from app.utils.llm_usage import extract_usage logger = logging.getLogger(__name__) -IMPORT_GATEWAY_BASE_URL = os.getenv( - "IMPORT_GATEWAY_BASE_URL", "http://localhost:8000" -) +IMPORT_GATEWAY_BASE_URL = os.getenv("IMPORT_GATEWAY_BASE_URL", NEW_API_BASE_URL) + + +def build_import_gateway_headers() -> dict[str, str]: + headers = {"Content-Type": "application/json"} + if NEW_API_AUTH_TOKEN: + headers["Authorization"] = f"Bearer {NEW_API_AUTH_TOKEN}" + return headers def _env_float(name: str, default: float) -> float: @@ -314,16 +324,18 @@ async def dispatch_import_analysis_job( url = f"{IMPORT_GATEWAY_BASE_URL.rstrip('/')}/v1/chat/completions" logger.info( - "Dispatching import %s to %s: %s", + "Dispatching import %s to %s using provider=%s model=%s", request.import_record_id, url, - json.dumps(payload, ensure_ascii=False), + payload.get("provider"), + payload.get("model"), ) timeout = httpx.Timeout(IMPORT_CHAT_TIMEOUT_SECONDS) + headers = build_import_gateway_headers() try: - response = await client.post(url, json=payload, timeout=timeout) + response = await client.post(url, json=payload, timeout=timeout, headers=headers) response.raise_for_status() except httpx.HTTPStatusError as exc: body_preview = "" @@ -348,9 +360,10 @@ async def dispatch_import_analysis_job( response.status_code, ) logger.info( - "LLM response for %s: %s", + "LLM response received for %s (status %s, choices=%s)", request.import_record_id, - json.dumps(response_data, ensure_ascii=False), + response.status_code, + len(response_data.get("choices") or []), ) try: @@ -404,6 +417,7 @@ async def process_import_analysis_job( request: DataImportAnalysisJobRequest, client: httpx.AsyncClient, ) -> None: + # Run the import analysis and ensure the callback fires regardless of success/failure. try: payload = await dispatch_import_analysis_job(request, client) except ProviderAPICallError as exc: diff --git a/app/services/table_profiling.py b/app/services/table_profiling.py index f412238..26143d5 100644 --- a/app/services/table_profiling.py +++ b/app/services/table_profiling.py @@ -24,6 +24,7 @@ from app.services import LLMGateway from app.settings import DEFAULT_IMPORT_MODEL from app.services.import_analysis import ( IMPORT_GATEWAY_BASE_URL, + build_import_gateway_headers, resolve_provider_from_model, ) from app.utils.llm_usage import extract_usage as extract_llm_usage @@ -532,6 +533,7 @@ async def _call_chat_completions( temperature: float = 0.2, timeout_seconds: Optional[float] = None, ) -> Any: + # Normalize model spec to provider+model and issue the unified chat call. provider, model_name = resolve_provider_from_model(model_spec) payload = { "provider": provider.value, @@ -545,16 +547,17 @@ async def _call_chat_completions( payload_size_bytes = len(json.dumps(payload, ensure_ascii=False).encode("utf-8")) url = f"{IMPORT_GATEWAY_BASE_URL.rstrip('/')}/v1/chat/completions" + headers = build_import_gateway_headers() try: - # log the request whole info logger.info( - "Calling chat completions API %s with model %s and size %s and payload %s", + "Calling chat completions API %s with model=%s payload_size=%sB", url, model_name, payload_size_bytes, - payload, ) - response = await client.post(url, json=payload, timeout=timeout_seconds) + response = await client.post( + url, json=payload, timeout=timeout_seconds, headers=headers + ) response.raise_for_status() except httpx.HTTPError as exc: @@ -703,6 +706,7 @@ async def _run_action_with_callback( input_payload: Any = None, model_spec: Optional[str] = None, ) -> Any: + # Execute a pipeline action and always emit a callback capturing success/failure. if input_payload is not None: logger.info( "Pipeline action %s input: %s", diff --git a/app/settings.py b/app/settings.py index 2a856ec..4faac7b 100644 --- a/app/settings.py +++ b/app/settings.py @@ -20,7 +20,11 @@ PROVIDER_KEY_ENV_MAP: Dict[str, str] = { } -DEFAULT_IMPORT_MODEL = os.getenv("DEFAULT_IMPORT_MODEL", "openai:gpt-4.1-mini") +DEFAULT_IMPORT_MODEL = os.getenv("DEFAULT_IMPORT_MODEL", "deepseek:deepseek-chat") +NEW_API_BASE_URL = os.getenv("NEW_API_BASE_URL") +NEW_API_AUTH_TOKEN = os.getenv("NEW_API_AUTH_TOKEN") +RAG_API_BASE_URL = os.getenv("RAG_API_BASE_URL", "http://127.0.0.1:8000") +RAG_API_AUTH_TOKEN = os.getenv("RAG_API_AUTH_TOKEN") @lru_cache(maxsize=1) diff --git a/project.md b/project.md new file mode 100644 index 0000000..aedb919 --- /dev/null +++ b/project.md @@ -0,0 +1,23 @@ +项目结构与逻辑 + +app/main.py:创建 FastAPI 应用与生命周期,初始化共享 httpx.AsyncClient 和 LLMGateway,统一异常处理后暴露四个接口:聊天代理、导入分析、表画像流水线、表片段入库。 +app/models.py:定义所有请求/响应模型与枚举(LLM 请求、导入分析作业、表画像作业、片段入库等),并给出字段校验与默认值。 +app/services:核心业务逻辑 +gateway.py 将 /v1/chat/completions 请求转发到 NEW_API_BASE_URL(带可选 Bearer Token),并归一化返回。 +import_analysis.py 组装导入提示词(prompt/data_import_analysis.md)、解析/截断样本、调用统一聊天接口、抽取 JSON 结果与 token 用量,最后回调业务方。 +table_profiling.py 串行执行 4 步流水线:Great Expectations profiling → LLM 结果描述(prompt/ge_result_desc_prompt.md)→ 片段生成(prompt/snippet_generator.md)→ 片段别名(prompt/snippet_alias_generator.md),每步都回调状态与结果。 +table_snippet.py 将各步骤结果 upsert 到数据库表,自动序列化 JSON/大小信息并构造 INSERT ... ON DUPLICATE KEY UPDATE。 +app/providers/*:各云厂商直连客户端(OpenAI/Anthropic/OpenRouter/Gemini/Qwen/DeepSeek),实现统一 chat 接口;当前主流程通过 new-api 转发,但保留直连能力。 +prompt/ 存放提示词模板;scripts/ 与 test/ 目录提供接口调用示例和回归样本;table_snippet.sql 给出 action_results 表结构(用于片段与 profiling 结果持久化)。 +功能/需求说明 + +LLM 网关:POST /v1/chat/completions 接收 LLMRequest(provider+model+messages 等),将 payload 透传到 NEW_API_BASE_URL/v1/chat/completions,带可选 NEW_API_AUTH_TOKEN 认证;异常时返回 4xx/5xx 并记录原始响应。 +导入分析(异步):POST /v1/import/analyze 接收导入样本(rows/headers/raw_csv/table_schema)、目标模型 llm_model(默认 DEFAULT_IMPORT_MODEL,可被 IMPORT_SUPPORTED_MODELS 白名单限制)、温度与回调地址。服务将样本转 CSV、附加 schema,拼接系统+用户消息后调用统一聊天接口,解析首个 choice 中的 JSON 作为分析结果,连同 LLM usage 一并以回调形式返回;失败时回调 status=failed 与错误信息。 +表画像流水线(异步):POST /v1/table/profiling 接收表标识、版本号、回调地址及 GE/LLM 配置(datasource/batch_request、连接串模板、LLM 模型与超时)。流水线按顺序执行: +Great Expectations profiling(可指定 profiler 类型、datasource、runtime SQL 查询/表),生成完整与摘要 JSON 及 Data Docs 路径; +调用聊天接口生成 GE 结果描述 JSON; +基于描述生成 SQL 片段数组; +生成片段别名/关键词。 +每步成功/失败都会回调,payload 包含 action_type、结果 JSON、模型、llm_usage、报错信息等。 +片段结果入库:POST /v1/table/snippet 接收 TableSnippetUpsertRequest(表/版本、action 类型、状态、schema、模型信息、各阶段 JSON 及大小、错误码、时间戳等),组装到 action_results 表进行 UPSERT,返回是否更新已有记录。 +配置与运行要求:核心环境变量在 app/settings.py(API Keys、DEFAULT_IMPORT_MODEL、IMPORT_GATEWAY_BASE_URL/NEW_API_BASE_URL、模型白名单、数据库 URL 等);日志使用 logging.yaml 自动创建 logs/;HTTP 客户端超时/代理可通过 HTTP_CLIENT_TIMEOUT、HTTP_CLIENT_TRUST_ENV、HTTP_CLIENT_PROXY 控制。 调试可用 uvicorn app.main:app --reload,Docker 由 Dockerfile/docker-compose.yml 提供。 \ No newline at end of file