From 39911d78abed648058904e95a7432be4c18c7073 Mon Sep 17 00:00:00 2001 From: zhaoqingliang Date: Thu, 30 Oct 2025 18:25:03 +0800 Subject: [PATCH] =?UTF-8?q?=E5=BC=82=E5=B8=B8=E6=97=A5=E5=BF=97=E5=AE=8C?= =?UTF-8?q?=E5=96=84?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- app/exceptions.py | 11 +++++++++ app/main.py | 15 +++++++++++- app/providers/anthropic.py | 16 +++++++++++++ app/providers/deepseek.py | 14 +++++++++++ app/providers/gemini.py | 16 +++++++++++++ app/providers/openai.py | 16 +++++++++++++ app/providers/openrouter.py | 16 +++++++++++++ app/providers/qwen.py | 14 +++++++++++ test/chat_completions_deepseek_example.py | 4 ++-- test/openrouter_chat_example.py | 29 ++++++++++++++++++----- 10 files changed, 142 insertions(+), 9 deletions(-) diff --git a/app/exceptions.py b/app/exceptions.py index 5ea8ff0..fcc7cac 100644 --- a/app/exceptions.py +++ b/app/exceptions.py @@ -4,3 +4,14 @@ class ProviderConfigurationError(RuntimeError): class ProviderAPICallError(RuntimeError): """Raised when the upstream provider responds with an error.""" + + def __init__( + self, + message: str, + *, + status_code: int | None = None, + response_text: str | None = None, + ) -> None: + super().__init__(message) + self.status_code = status_code + self.response_text = response_text diff --git a/app/main.py b/app/main.py index 0d8fed0..562b6ca 100644 --- a/app/main.py +++ b/app/main.py @@ -1,6 +1,7 @@ from __future__ import annotations import asyncio +import logging from contextlib import asynccontextmanager import httpx @@ -17,6 +18,9 @@ from app.services import LLMGateway from app.services.import_analysis import process_import_analysis_job +logger = logging.getLogger(__name__) + + @asynccontextmanager async def lifespan(app: FastAPI): client = httpx.AsyncClient(timeout=httpx.Timeout(30.0)) @@ -49,9 +53,18 @@ def create_app() -> FastAPI: try: return await gateway.chat(payload, client) except ProviderConfigurationError as exc: + logger.error("Provider configuration error: %s", exc, exc_info=True) raise HTTPException(status_code=422, detail=str(exc)) from exc except ProviderAPICallError as exc: - raise HTTPException(status_code=502, detail=str(exc)) from exc + status_code = exc.status_code or 502 + log_detail = exc.response_text or str(exc) + logger.error( + "Provider API call error (status %s): %s", + status_code, + log_detail, + exc_info=True, + ) + raise HTTPException(status_code=status_code, detail=str(exc)) from exc @application.post( "/v1/import/analyze", diff --git a/app/providers/anthropic.py b/app/providers/anthropic.py index 3cc73fa..326cfa4 100644 --- a/app/providers/anthropic.py +++ b/app/providers/anthropic.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Any, Dict, List, Tuple import httpx @@ -16,6 +17,9 @@ from app.models import ( from app.providers.base import LLMProviderClient +logger = logging.getLogger(__name__) + + class AnthropicProvider(LLMProviderClient): name = LLMProvider.ANTHROPIC.value api_key_env = "ANTHROPIC_API_KEY" @@ -52,7 +56,19 @@ class AnthropicProvider(LLMProviderClient): try: response = await client.post(self.base_url, json=payload, headers=headers) response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code + body = exc.response.text + logger.error( + "Anthropic upstream returned %s: %s", status_code, body, exc_info=True + ) + raise ProviderAPICallError( + f"Anthropic request failed with status {status_code}", + status_code=status_code, + response_text=body, + ) from exc except httpx.HTTPError as exc: + logger.error("Anthropic transport error: %s", exc, exc_info=True) raise ProviderAPICallError(f"Anthropic request failed: {exc}") from exc data: Dict[str, Any] = response.json() diff --git a/app/providers/deepseek.py b/app/providers/deepseek.py index d51e13c..b68da06 100644 --- a/app/providers/deepseek.py +++ b/app/providers/deepseek.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Any, Dict, List import httpx @@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon from app.providers.base import LLMProviderClient +logger = logging.getLogger(__name__) + + class DeepSeekProvider(LLMProviderClient): name = LLMProvider.DEEPSEEK.value api_key_env = "DEEPSEEK_API_KEY" @@ -40,7 +44,17 @@ class DeepSeekProvider(LLMProviderClient): try: response = await client.post(self.base_url, json=payload, headers=headers) response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code + body = exc.response.text + logger.error("DeepSeek upstream returned %s: %s", status_code, body, exc_info=True) + raise ProviderAPICallError( + f"DeepSeek request failed with status {status_code}", + status_code=status_code, + response_text=body, + ) from exc except httpx.HTTPError as exc: + logger.error("DeepSeek transport error: %s", exc, exc_info=True) raise ProviderAPICallError(f"DeepSeek request failed: {exc}") from exc data: Dict[str, Any] = response.json() diff --git a/app/providers/gemini.py b/app/providers/gemini.py index ed9b2ac..6a145aa 100644 --- a/app/providers/gemini.py +++ b/app/providers/gemini.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Any, Dict, List, Tuple import httpx @@ -16,6 +17,9 @@ from app.models import ( from app.providers.base import LLMProviderClient +logger = logging.getLogger(__name__) + + class GeminiProvider(LLMProviderClient): name = LLMProvider.GEMINI.value api_key_env = "GEMINI_API_KEY" @@ -53,7 +57,19 @@ class GeminiProvider(LLMProviderClient): try: response = await client.post(endpoint, json=payload, headers=headers) response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code + body = exc.response.text + logger.error( + "Gemini upstream returned %s: %s", status_code, body, exc_info=True + ) + raise ProviderAPICallError( + f"Gemini request failed with status {status_code}", + status_code=status_code, + response_text=body, + ) from exc except httpx.HTTPError as exc: + logger.error("Gemini transport error: %s", exc, exc_info=True) raise ProviderAPICallError(f"Gemini request failed: {exc}") from exc data: Dict[str, Any] = response.json() diff --git a/app/providers/openai.py b/app/providers/openai.py index 53e2dda..42f9f52 100644 --- a/app/providers/openai.py +++ b/app/providers/openai.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Any, Dict, List import httpx @@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon from app.providers.base import LLMProviderClient +logger = logging.getLogger(__name__) + + class OpenAIProvider(LLMProviderClient): name = LLMProvider.OPENAI.value api_key_env = "OPENAI_API_KEY" @@ -40,7 +44,19 @@ class OpenAIProvider(LLMProviderClient): try: response = await client.post(self.base_url, json=payload, headers=headers) response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code + body = exc.response.text + logger.error( + "OpenAI upstream returned %s: %s", status_code, body, exc_info=True + ) + raise ProviderAPICallError( + f"OpenAI request failed with status {status_code}", + status_code=status_code, + response_text=body, + ) from exc except httpx.HTTPError as exc: + logger.error("OpenAI transport error: %s", exc, exc_info=True) raise ProviderAPICallError(f"OpenAI request failed: {exc}") from exc data: Dict[str, Any] = response.json() diff --git a/app/providers/openrouter.py b/app/providers/openrouter.py index 5578962..1d93b5a 100644 --- a/app/providers/openrouter.py +++ b/app/providers/openrouter.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging import os from typing import Any, Dict, List @@ -10,6 +11,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon from app.providers.base import LLMProviderClient +logger = logging.getLogger(__name__) + + class OpenRouterProvider(LLMProviderClient): name = LLMProvider.OPENROUTER.value api_key_env = "OPENROUTER_API_KEY" @@ -51,7 +55,19 @@ class OpenRouterProvider(LLMProviderClient): try: response = await client.post(self.base_url, json=payload, headers=headers) response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code + body = exc.response.text + logger.error( + "OpenRouter upstream returned %s: %s", status_code, body, exc_info=True + ) + raise ProviderAPICallError( + f"OpenRouter request failed with status {status_code}", + status_code=status_code, + response_text=body, + ) from exc except httpx.HTTPError as exc: + logger.error("OpenRouter transport error: %s", exc, exc_info=True) raise ProviderAPICallError(f"OpenRouter request failed: {exc}") from exc data: Dict[str, Any] = response.json() diff --git a/app/providers/qwen.py b/app/providers/qwen.py index c2e7f1f..6239adc 100644 --- a/app/providers/qwen.py +++ b/app/providers/qwen.py @@ -1,5 +1,6 @@ from __future__ import annotations +import logging from typing import Any, Dict, List import httpx @@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon from app.providers.base import LLMProviderClient +logger = logging.getLogger(__name__) + + class QwenProvider(LLMProviderClient): name = LLMProvider.QWEN.value api_key_env = "QWEN_API_KEY" @@ -48,7 +52,17 @@ class QwenProvider(LLMProviderClient): try: response = await client.post(self.base_url, json=payload, headers=headers) response.raise_for_status() + except httpx.HTTPStatusError as exc: + status_code = exc.response.status_code + body = exc.response.text + logger.error("Qwen upstream returned %s: %s", status_code, body, exc_info=True) + raise ProviderAPICallError( + f"Qwen request failed with status {status_code}", + status_code=status_code, + response_text=body, + ) from exc except httpx.HTTPError as exc: + logger.error("Qwen transport error: %s", exc, exc_info=True) raise ProviderAPICallError(f"Qwen request failed: {exc}") from exc data: Dict[str, Any] = response.json() diff --git a/test/chat_completions_deepseek_example.py b/test/chat_completions_deepseek_example.py index 66101f4..0b2c04c 100644 --- a/test/chat_completions_deepseek_example.py +++ b/test/chat_completions_deepseek_example.py @@ -29,10 +29,10 @@ async def main() -> None: } ], "temperature": 0.2, - "max_tokens": 256, + "max_tokens": 350, } - async with httpx.AsyncClient(timeout=httpx.Timeout(20.0)) as client: + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: response = await client.post(API_URL, json=payload) response.raise_for_status() data = response.json() diff --git a/test/openrouter_chat_example.py b/test/openrouter_chat_example.py index 0f67ddf..cade3e5 100644 --- a/test/openrouter_chat_example.py +++ b/test/openrouter_chat_example.py @@ -6,39 +6,56 @@ import asyncio import httpx from dotenv import load_dotenv +import json load_dotenv() API_URL = "http://localhost:8000/v1/chat/completions" +def extract_schema(payload: dict) -> dict: + content = payload["choices"][0]["message"]["content"] + if content.startswith("```"): + content = content.split("```json", 1)[-1] + content = content.rsplit("```", 1)[0] + return json.loads(content) async def main() -> None: payload = { "provider": "openrouter", - "model": "anthropic/claude-3.5-sonnet", + "model": "anthropic/claude-3.7-sonnet", "messages": [ { "role": "system", - "content": "You are an API assistant that writes concise JSON only.", + "content": "角色:你是一名数据分析导入助手(Data Ingestion Analyst),擅长从原始数据抽取结构化元数据、推断字段类型、识别维度/事实属性,并输出导入建模建议(Table + JSON)。\n\n任务目标:对提供的数据(含表头或table schema与若干行样本数据)进行解析,生成一份导入分析与处理报告,指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。\n\n请从以下两个方向进行思考:\n\n方向 1:元数据识别与整理\n解析表明:根据表头、Origin Table Name、Orign File Name生成表名,表名需要有意义\n解析列名:生成标准化字段名(snake_case 或小驼峰),并给出原始列名与标准字段名映射。\n为每个字段写出中文/英文注释(若无法确定,给出“待确认”并附可能解释)。\n\n方向 2:字段数据类型与格式推断\n针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text)。\n说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。\n指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。\n给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime,适用于将样本数据转换成数据库表字段兼容的格式。\n若为“可能是枚举”的字段,列出候选枚举值及占比。\n\n最终内容都输出为一个json对象,格式为(字段级与表级定义),字段含:\n{\n \"table_name\": \"标准化后的表名\",\n \"description\": \"表简短描述\",\n \"columns\": [{\n \"original_name\": \"原始名称\",\n \"standard_name\": \"标准化后的名称: 下划线命名,大小写字母、数字、下划线\",\n \"data_type\": \"数据类型限制为:number/string/datetime\",\n \"db_type\": \"数据库字段类型\",\n \"java_type\": \"java字段类型限制为: int/long/double/string/date\",\n \"nullable\": true/false,\n \"distinct_count_sample\": number,\n \"null_ratio_sample\": 0.x,\n \"is_enum_candidate\": true/false,\n \"description\": \"字段简短描述\",\n \"date_format\": \"转换成Date类型的pattern\"\n }]\n}\n\n约束与风格:\n\n若信息不足,请显式指出“信息不足”并给出补充数据需求清单。\n避免武断结论,用“可能 / 候选 / 建议”字样。\n不要捏造样本未出现的值。" }, { "role": "user", - "content": "Return a JSON object describing this test invocation.", - }, + "content": "导入记录ID: demo-import-001\n\n表头信息:\n- 品牌\n- 产品价类\n- 是否重点品牌\n- 系统外销售量(箱)\n- 系统外销售金额(万元)\n- 同期系统外销售量(箱)\n- 同期系统外销售金额(万元)\n\n示例数据:\nCSV样本预览:\n品牌,产品价类,是否重点品牌,系统外销售量(箱),系统外销售金额(万元),同期系统外销售量(箱),同期系统外销售金额(万元)\r\n白沙,一类,重点品牌,3332.406875,64283.5593333333,3123.693375,61821.7986666667\r\nnan,二类,重点品牌,1094.4707375,3859.69366666667,869.65725,3067.00966666667\r\nnan,三类,重点品牌,3965.0457375,8388.306,4401.6714875,8802.132\r\n宝岛,一类,否,39.934375,301.617666666667,30.5975,249.399666666667\r\n长白山,一类,重点品牌,2666.53775,12360.8306666667,1916.252,9051.672\r\nnan,二类,重点品牌,2359.910025,7671.26233333333,2335.2480875,7590.791\r\nnan,三类,重点品牌,1263.293875,2826.665,1590.750875,3503.083\r\n大前门,一类,否,81.5806875,343.721333333333,114.1179875,480.809333333333\r\nnan,三类,否,226.445225,319.975666666667,254.6595125,359.894\r\n大青山,二类,否,60.73525,209.415,60.2415,207.712666666667\n\n附加结构信息:\n{\n \"source\": \"excel\",\n \"file_name\": \"全国品牌.xlsx\",\n \"sheet_name\": \"Sheet1\"\n}" + } ], "temperature": 0.1, - "max_tokens": 300, + "max_tokens": 1024, } async with httpx.AsyncClient(timeout=httpx.Timeout(15.0)) as client: response = await client.post(API_URL, json=payload) print("Status:", response.status_code) try: - print("Body:", response.json()) + data = response.json() except ValueError: print("Raw Body:", response.text) + return + + print("Body:", data) + try: + schema = extract_schema(data) + print("Table name:", schema.get("table_name")) + except (KeyError, json.JSONDecodeError) as exc: + print("Failed to parse schema:", exc) if __name__ == "__main__": asyncio.run(main()) + +