Compare commits

...

10 Commits

Author SHA1 Message Date
a0ed43a8b7 增加日志配置 2025-10-30 22:38:23 +08:00
455b884551 数据导入分析接口调整 2025-10-30 22:38:05 +08:00
39911d78ab 异常日志完善 2025-10-30 18:25:29 +08:00
89af7cd0a4 Clean up cached __pycache__ and pyc files 2025-10-29 23:50:15 +08:00
261418fd35 增加忽略内容 2025-10-29 23:43:48 +08:00
2d158750fa model相关参数和默认 2025-10-29 23:43:26 +08:00
a78c8b9446 测试用例和结果 2025-10-29 23:43:06 +08:00
59c9efa5d8 导入分析接口使用项目chat接口 2025-10-29 23:42:42 +08:00
f43590585b 数据导入schema分析功能接口和测试用例 2025-10-29 22:36:20 +08:00
76b8c9d79b 数据导入接口增加prompt拼接内容 2025-10-29 00:53:07 +08:00
49 changed files with 915 additions and 55 deletions

29
.env Normal file
View File

@ -0,0 +1,29 @@
# LLM provider API keys
OPENAI_API_KEY=
ANTHROPIC_API_KEY=
OPENROUTER_API_KEY="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"
OPENROUTER_SITE_URL=
OPENROUTER_APP_NAME=
GEMINI_API_KEY=
QWEN_API_KEY=
DEEPSEEK_API_KEY="sk-657f0752a1564563be7ce35b6a0a7b46"
DEEPSEEK_TIMEOUT_SECONDS=120
# Data import analysis defaults
IMPORT_SUPPORTED_MODELS=openai:gpt-5,deepseek:deepseek-chat,openrouter:anthropic/claude-4.0-sonnet
DEFAULT_IMPORT_MODEL=deepseek:deepseek-chat
# Service configuration
IMPORT_GATEWAY_BASE_URL=http://localhost:8000
# HTTP client configuration
HTTP_CLIENT_TIMEOUT=30
HTTP_CLIENT_TRUST_ENV=false
# HTTP_CLIENT_PROXY=
# Import analysis configuration
IMPORT_CHAT_TIMEOUT_SECONDS=120
# Logging
LOG_LEVEL=INFO
# LOG_FORMAT=%(asctime)s %(levelname)s %(name)s:%(lineno)d %(message)s

5
.gitignore vendored
View File

@ -1,3 +1,6 @@
.venv
gx/uncommitted/
.vscode/
.vscode/
**/__pycache__/
*.pyc
.DS_Store

Binary file not shown.

Binary file not shown.

View File

@ -4,3 +4,14 @@ class ProviderConfigurationError(RuntimeError):
class ProviderAPICallError(RuntimeError):
"""Raised when the upstream provider responds with an error."""
def __init__(
self,
message: str,
*,
status_code: int | None = None,
response_text: str | None = None,
) -> None:
super().__init__(message)
self.status_code = status_code
self.response_text = response_text

View File

@ -1,24 +1,108 @@
from __future__ import annotations
import asyncio
import logging
import os
from contextlib import asynccontextmanager
from typing import Any
import httpx
from fastapi import Depends, FastAPI, HTTPException, Request
from app.exceptions import ProviderAPICallError, ProviderConfigurationError
from app.models import (
DataImportAnalysisRequest,
DataImportAnalysisResponse,
DataImportAnalysisJobAck,
DataImportAnalysisJobRequest,
LLMRequest,
LLMResponse,
)
from app.services import LLMGateway
from app.services.import_analysis import build_import_messages, resolve_provider_from_model
from app.services.import_analysis import process_import_analysis_job
def _configure_logging() -> None:
level_name = os.getenv("LOG_LEVEL", "INFO").upper()
level = getattr(logging, level_name, logging.INFO)
log_format = os.getenv(
"LOG_FORMAT",
"%(asctime)s %(levelname)s %(name)s:%(lineno)d %(message)s",
)
root = logging.getLogger()
if not root.handlers:
logging.basicConfig(level=level, format=log_format)
else:
root.setLevel(level)
formatter = logging.Formatter(log_format)
for handler in root.handlers:
handler.setLevel(level)
handler.setFormatter(formatter)
_configure_logging()
logger = logging.getLogger(__name__)
def _env_bool(name: str, default: bool) -> bool:
raw = os.getenv(name)
if raw is None:
return default
return raw.strip().lower() in {"1", "true", "yes", "on"}
def _env_float(name: str, default: float) -> float:
raw = os.getenv(name)
if raw is None:
return default
try:
return float(raw)
except ValueError:
logger.warning("Invalid value for %s=%r, using default %.2f", name, raw, default)
return default
def _parse_proxy_config(raw: str | None) -> dict[str, str] | str | None:
if raw is None:
return None
cleaned = raw.strip()
if not cleaned:
return None
# Support comma-separated key=value pairs for scheme-specific proxies.
if "=" in cleaned:
proxies: dict[str, str] = {}
for part in cleaned.split(","):
key, sep, value = part.partition("=")
if not sep:
continue
key = key.strip()
value = value.strip()
if key and value:
proxies[key] = value
if proxies:
return proxies
return cleaned
def _create_http_client() -> httpx.AsyncClient:
timeout_seconds = _env_float("HTTP_CLIENT_TIMEOUT", 30.0)
trust_env = _env_bool("HTTP_CLIENT_TRUST_ENV", True)
proxies = _parse_proxy_config(os.getenv("HTTP_CLIENT_PROXY"))
client_kwargs: dict[str, object] = {
"timeout": httpx.Timeout(timeout_seconds),
"trust_env": trust_env,
}
if proxies:
client_kwargs["proxies"] = proxies
return httpx.AsyncClient(**client_kwargs)
@asynccontextmanager
async def lifespan(app: FastAPI):
client = httpx.AsyncClient(timeout=httpx.Timeout(30.0))
client = _create_http_client()
gateway = LLMGateway()
try:
app.state.http_client = client # type: ignore[attr-defined]
@ -48,46 +132,42 @@ def create_app() -> FastAPI:
try:
return await gateway.chat(payload, client)
except ProviderConfigurationError as exc:
logger.error("Provider configuration error: %s", exc, exc_info=True)
raise HTTPException(status_code=422, detail=str(exc)) from exc
except ProviderAPICallError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
status_code = exc.status_code or 502
log_detail = exc.response_text or str(exc)
logger.error(
"Provider API call error (status %s): %s",
status_code,
log_detail,
exc_info=True,
)
raise HTTPException(status_code=status_code, detail=str(exc)) from exc
@application.post(
"/v1/import/analyze",
response_model=DataImportAnalysisResponse,
summary="Analyze import sample data via configured LLM",
response_model=DataImportAnalysisJobAck,
summary="Schedule async import analysis and notify via callback",
status_code=202,
)
async def analyze_import_data(
payload: DataImportAnalysisRequest,
gateway: LLMGateway = Depends(get_gateway),
payload: DataImportAnalysisJobRequest,
client: httpx.AsyncClient = Depends(get_http_client),
) -> DataImportAnalysisResponse:
try:
provider, model_name = resolve_provider_from_model(payload.llm_model)
except ValueError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
) -> DataImportAnalysisJobAck:
request_copy = payload.model_copy(deep=True)
messages = build_import_messages(payload)
async def _runner() -> None:
await process_import_analysis_job(request_copy, client)
llm_request = LLMRequest(
provider=provider,
model=model_name,
messages=messages,
temperature=payload.temperature if payload.temperature is not None else 0.2,
max_tokens=payload.max_tokens,
)
asyncio.create_task(_runner())
try:
llm_response = await gateway.chat(llm_request, client)
except ProviderConfigurationError as exc:
raise HTTPException(status_code=422, detail=str(exc)) from exc
except ProviderAPICallError as exc:
raise HTTPException(status_code=502, detail=str(exc)) from exc
return DataImportAnalysisJobAck(import_record_id=payload.import_record_id, status="accepted")
return DataImportAnalysisResponse(
import_record_id=payload.import_record_id,
llm_response=llm_response,
)
@application.post("/__mock__/import-callback")
async def mock_import_callback(payload: dict[str, Any]) -> dict[str, str]:
logger.info("Received import analysis callback: %s", payload)
return {"status": "received"}
return application

View File

@ -1,9 +1,11 @@
from __future__ import annotations
from enum import Enum
from typing import Any, List, Optional
from typing import Any, Dict, List, Optional, Union
from pydantic import BaseModel, Field
from pydantic import BaseModel, Field, HttpUrl
from app.settings import DEFAULT_IMPORT_MODEL
class LLMRole(str, Enum):
@ -90,3 +92,46 @@ class DataImportAnalysisRequest(BaseModel):
class DataImportAnalysisResponse(BaseModel):
import_record_id: str
llm_response: LLMResponse
class DataImportAnalysisJobRequest(BaseModel):
import_record_id: str = Field(
..., description="Unique identifier for this import request run."
)
rows: List[Union[Dict[str, Any], List[Any]]] = Field(
...,
description="Sample rows from the import payload. Accepts list of dicts or list of lists.",
)
headers: Optional[List[str]] = Field(
None,
description="Ordered list of table headers associated with the data sample.",
)
raw_csv: Optional[str] = Field(
None,
description="Optional raw CSV representation of the sample rows, if already prepared.",
)
table_schema: Optional[Any] = Field(
None,
description="Optional schema description for the table. Can be a string or JSON-serialisable structure.",
)
callback_url: HttpUrl = Field(
...,
description="URL to notify when the analysis completes. Receives JSON payload with status/results.",
)
llm_model: str = Field(
DEFAULT_IMPORT_MODEL,
description="Target LLM model identifier. Defaults to DEFAULT_IMPORT_MODEL.",
)
temperature: Optional[float] = Field(
None,
description="Optional override for model temperature when generating analysis output.",
)
max_output_tokens: Optional[int] = Field(
None,
description="Optional maximum number of tokens to generate in the analysis response.",
)
class DataImportAnalysisJobAck(BaseModel):
import_record_id: str = Field(..., description="Echo of the import record identifier")
status: str = Field("accepted", description="Processing status acknowledgement.")

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
from typing import Any, Dict, List, Tuple
import httpx
@ -16,6 +17,9 @@ from app.models import (
from app.providers.base import LLMProviderClient
logger = logging.getLogger(__name__)
class AnthropicProvider(LLMProviderClient):
name = LLMProvider.ANTHROPIC.value
api_key_env = "ANTHROPIC_API_KEY"
@ -52,7 +56,19 @@ class AnthropicProvider(LLMProviderClient):
try:
response = await client.post(self.base_url, json=payload, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
body = exc.response.text
logger.error(
"Anthropic upstream returned %s: %s", status_code, body, exc_info=True
)
raise ProviderAPICallError(
f"Anthropic request failed with status {status_code}",
status_code=status_code,
response_text=body,
) from exc
except httpx.HTTPError as exc:
logger.error("Anthropic transport error: %s", exc, exc_info=True)
raise ProviderAPICallError(f"Anthropic request failed: {exc}") from exc
data: Dict[str, Any] = response.json()

View File

@ -1,5 +1,7 @@
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List
import httpx
@ -9,6 +11,26 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
from app.providers.base import LLMProviderClient
logger = logging.getLogger(__name__)
def _resolve_timeout_seconds() -> float:
raw = os.getenv("DEEPSEEK_TIMEOUT_SECONDS")
if raw is None:
return 60.0
try:
return float(raw)
except ValueError:
logger.warning(
"Invalid value for DEEPSEEK_TIMEOUT_SECONDS=%r, falling back to 60 seconds",
raw,
)
return 60.0
DEEPSEEK_TIMEOUT_SECONDS = _resolve_timeout_seconds()
class DeepSeekProvider(LLMProviderClient):
name = LLMProvider.DEEPSEEK.value
api_key_env = "DEEPSEEK_API_KEY"
@ -36,11 +58,24 @@ class DeepSeekProvider(LLMProviderClient):
"Authorization": f"Bearer {self.api_key}",
"Content-Type": "application/json",
}
timeout = httpx.Timeout(DEEPSEEK_TIMEOUT_SECONDS)
try:
response = await client.post(self.base_url, json=payload, headers=headers)
response = await client.post(
self.base_url, json=payload, headers=headers, timeout=timeout
)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
body = exc.response.text
logger.error("DeepSeek upstream returned %s: %s", status_code, body, exc_info=True)
raise ProviderAPICallError(
f"DeepSeek request failed with status {status_code}",
status_code=status_code,
response_text=body,
) from exc
except httpx.HTTPError as exc:
logger.error("DeepSeek transport error: %s", exc, exc_info=True)
raise ProviderAPICallError(f"DeepSeek request failed: {exc}") from exc
data: Dict[str, Any] = response.json()

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
from typing import Any, Dict, List, Tuple
import httpx
@ -16,6 +17,9 @@ from app.models import (
from app.providers.base import LLMProviderClient
logger = logging.getLogger(__name__)
class GeminiProvider(LLMProviderClient):
name = LLMProvider.GEMINI.value
api_key_env = "GEMINI_API_KEY"
@ -53,7 +57,19 @@ class GeminiProvider(LLMProviderClient):
try:
response = await client.post(endpoint, json=payload, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
body = exc.response.text
logger.error(
"Gemini upstream returned %s: %s", status_code, body, exc_info=True
)
raise ProviderAPICallError(
f"Gemini request failed with status {status_code}",
status_code=status_code,
response_text=body,
) from exc
except httpx.HTTPError as exc:
logger.error("Gemini transport error: %s", exc, exc_info=True)
raise ProviderAPICallError(f"Gemini request failed: {exc}") from exc
data: Dict[str, Any] = response.json()

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
from typing import Any, Dict, List
import httpx
@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
from app.providers.base import LLMProviderClient
logger = logging.getLogger(__name__)
class OpenAIProvider(LLMProviderClient):
name = LLMProvider.OPENAI.value
api_key_env = "OPENAI_API_KEY"
@ -40,7 +44,19 @@ class OpenAIProvider(LLMProviderClient):
try:
response = await client.post(self.base_url, json=payload, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
body = exc.response.text
logger.error(
"OpenAI upstream returned %s: %s", status_code, body, exc_info=True
)
raise ProviderAPICallError(
f"OpenAI request failed with status {status_code}",
status_code=status_code,
response_text=body,
) from exc
except httpx.HTTPError as exc:
logger.error("OpenAI transport error: %s", exc, exc_info=True)
raise ProviderAPICallError(f"OpenAI request failed: {exc}") from exc
data: Dict[str, Any] = response.json()

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
import os
from typing import Any, Dict, List
@ -10,6 +11,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
from app.providers.base import LLMProviderClient
logger = logging.getLogger(__name__)
class OpenRouterProvider(LLMProviderClient):
name = LLMProvider.OPENROUTER.value
api_key_env = "OPENROUTER_API_KEY"
@ -51,7 +55,19 @@ class OpenRouterProvider(LLMProviderClient):
try:
response = await client.post(self.base_url, json=payload, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
body = exc.response.text
logger.error(
"OpenRouter upstream returned %s: %s", status_code, body, exc_info=True
)
raise ProviderAPICallError(
f"OpenRouter request failed with status {status_code}",
status_code=status_code,
response_text=body,
) from exc
except httpx.HTTPError as exc:
logger.error("OpenRouter transport error: %s", exc, exc_info=True)
raise ProviderAPICallError(f"OpenRouter request failed: {exc}") from exc
data: Dict[str, Any] = response.json()

View File

@ -1,5 +1,6 @@
from __future__ import annotations
import logging
from typing import Any, Dict, List
import httpx
@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
from app.providers.base import LLMProviderClient
logger = logging.getLogger(__name__)
class QwenProvider(LLMProviderClient):
name = LLMProvider.QWEN.value
api_key_env = "QWEN_API_KEY"
@ -48,7 +52,17 @@ class QwenProvider(LLMProviderClient):
try:
response = await client.post(self.base_url, json=payload, headers=headers)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
status_code = exc.response.status_code
body = exc.response.text
logger.error("Qwen upstream returned %s: %s", status_code, body, exc_info=True)
raise ProviderAPICallError(
f"Qwen request failed with status {status_code}",
status_code=status_code,
response_text=body,
) from exc
except httpx.HTTPError as exc:
logger.error("Qwen transport error: %s", exc, exc_info=True)
raise ProviderAPICallError(f"Qwen request failed: {exc}") from exc
data: Dict[str, Any] = response.json()

View File

@ -1,13 +1,50 @@
from __future__ import annotations
from typing import List, Tuple
import csv
import json
import logging
import os
import re
from functools import lru_cache
from io import StringIO
from pathlib import Path
from typing import Any, Dict, List, Sequence, Tuple
import httpx
from pydantic import ValidationError
from app.exceptions import ProviderAPICallError
from app.models import (
DataImportAnalysisJobRequest,
DataImportAnalysisRequest,
LLMMessage,
LLMProvider,
LLMResponse,
LLMRole,
)
from app.settings import DEFAULT_IMPORT_MODEL, get_supported_import_models
logger = logging.getLogger(__name__)
IMPORT_GATEWAY_BASE_URL = os.getenv(
"IMPORT_GATEWAY_BASE_URL", "http://localhost:8000"
)
def _env_float(name: str, default: float) -> float:
raw = os.getenv(name)
if raw is None:
return default
try:
return float(raw)
except ValueError:
logger.warning("Invalid value for %s=%r, falling back to %.2f", name, raw, default)
return default
IMPORT_CHAT_TIMEOUT_SECONDS = _env_float("IMPORT_CHAT_TIMEOUT_SECONDS", 90.0)
SUPPORTED_IMPORT_MODELS = get_supported_import_models()
def resolve_provider_from_model(llm_model: str) -> Tuple[LLMProvider, str]:
@ -69,23 +106,337 @@ def build_import_messages(
"""Create system and user messages for the import analysis prompt."""
headers_formatted = "\n".join(f"- {header}" for header in request.table_headers)
system_prompt = (
"你是一名数据导入识别助手。请根据给定的表头和示例数据,判断字段含义、"
"典型数据类型以及潜在的数据质量问题。最终请返回一个结构化的JSON。\n"
"JSON结构需包含: field_summaries (数组, 每项含 header、meaning、data_type、quality_notes), "
"detected_issues (字符串数组),以及 overall_suggestion (字符串)。"
)
system_prompt = load_import_template()
user_prompt = (
data_block = (
f"导入记录ID: {request.import_record_id}\n\n"
"表头信息:\n"
f"{headers_formatted}\n\n"
"示例数据:\n"
f"{request.example_data}\n\n"
"请仔细分析示例数据与表头之间的对应关系并返回符合上述JSON结构的内容。"
f"{request.example_data}"
)
return [
LLMMessage(role=LLMRole.SYSTEM, content=system_prompt),
LLMMessage(role=LLMRole.USER, content=user_prompt),
LLMMessage(role=LLMRole.USER, content=data_block),
]
@lru_cache(maxsize=1)
def load_import_template() -> str:
template_path = (
Path(__file__).resolve().parents[2] / "prompt" / "data_import_analysis.md"
)
if not template_path.exists():
raise FileNotFoundError(f"Prompt template not found at {template_path}")
return template_path.read_text(encoding="utf-8").strip()
def derive_headers(
rows: Sequence[Any], provided_headers: Sequence[str] | None
) -> List[str]:
if provided_headers:
return [str(header) for header in provided_headers]
collected: List[str] = []
list_lengths: List[int] = []
for row in rows:
if isinstance(row, dict):
for key in row.keys():
key_str = str(key)
if key_str not in collected:
collected.append(key_str)
elif isinstance(row, (list, tuple)):
list_lengths.append(len(row))
if collected:
return collected
if list_lengths:
max_len = max(list_lengths)
return [f"column_{idx + 1}" for idx in range(max_len)]
return ["column_1"]
def _stringify_cell(value: Any) -> str:
if value is None:
return ""
if isinstance(value, (str, int, float, bool)):
return str(value)
try:
return json.dumps(value, ensure_ascii=False)
except (TypeError, ValueError):
return str(value)
def rows_to_csv_text(
rows: Sequence[Any], headers: Sequence[str], *, max_rows: int = 50
) -> str:
buffer = StringIO()
writer = csv.writer(buffer)
if headers:
writer.writerow(headers)
for idx, row in enumerate(rows):
if max_rows and idx >= max_rows:
break
if isinstance(row, dict):
writer.writerow([_stringify_cell(row.get(header)) for header in headers])
elif isinstance(row, (list, tuple)):
writer.writerow([_stringify_cell(item) for item in row])
else:
writer.writerow([_stringify_cell(row)])
return buffer.getvalue().strip()
def format_table_schema(schema: Any) -> str:
if schema is None:
return ""
if isinstance(schema, str):
return schema.strip()
try:
return json.dumps(schema, ensure_ascii=False, indent=2)
except (TypeError, ValueError):
return str(schema)
def build_analysis_request(
request: DataImportAnalysisJobRequest,
) -> DataImportAnalysisRequest:
headers = derive_headers(request.rows, request.headers)
if request.raw_csv:
csv_text = request.raw_csv.strip()
else:
csv_text = rows_to_csv_text(request.rows, headers)
sections: List[str] = []
if csv_text:
sections.append("CSV样本预览:\n" + csv_text)
schema_text = format_table_schema(request.table_schema)
if schema_text:
sections.append("附加结构信息:\n" + schema_text)
example_data = "\n\n".join(sections) if sections else "未提供样本数据。"
max_length = 30000
if len(example_data) > max_length:
example_data = example_data[: max_length - 3] + "..."
return DataImportAnalysisRequest(
import_record_id=request.import_record_id,
example_data=example_data,
table_headers=headers,
llm_model=request.llm_model or DEFAULT_IMPORT_MODEL,
)
def build_chat_payload(request: DataImportAnalysisJobRequest) -> Dict[str, Any]:
llm_input = request.llm_model or DEFAULT_IMPORT_MODEL
provider, model_name = resolve_provider_from_model(llm_input)
normalized_model = f"{provider.value}:{model_name}"
if SUPPORTED_IMPORT_MODELS and normalized_model not in SUPPORTED_IMPORT_MODELS:
raise ProviderAPICallError(
"Model '{model}' is not allowed. Allowed models: {allowed}".format(
model=normalized_model,
allowed=", ".join(sorted(SUPPORTED_IMPORT_MODELS)),
)
)
analysis_request = build_analysis_request(request)
messages = build_import_messages(analysis_request)
payload: Dict[str, Any] = {
"provider": provider.value,
"model": model_name,
"messages": [message.model_dump() for message in messages],
"temperature": request.temperature if request.temperature is not None else 0.2,
}
if request.max_output_tokens is not None:
payload["max_tokens"] = request.max_output_tokens
return payload
def _extract_json_payload(content: str) -> str:
"""Try to pull a JSON object from an LLM content string."""
# Prefer fenced code blocks such as ```json { ... } ```
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", content, flags=re.DOTALL | re.IGNORECASE)
if fenced:
return fenced.group(1).strip()
stripped = content.strip()
if stripped.startswith("{") and stripped.endswith("}"):
return stripped
start = stripped.find("{")
end = stripped.rfind("}")
if start != -1 and end != -1 and end > start:
return stripped[start : end + 1].strip()
return stripped
def parse_llm_analysis_json(llm_response: LLMResponse) -> Dict[str, Any]:
"""Extract and parse the structured JSON payload from an LLM response."""
if not llm_response.choices:
raise ProviderAPICallError("LLM response did not include any choices to parse.")
content = llm_response.choices[0].message.content or ""
if not content.strip():
raise ProviderAPICallError("LLM response content is empty.")
json_payload = _extract_json_payload(content)
try:
return json.loads(json_payload)
except json.JSONDecodeError as exc:
preview = json_payload[:2000]
logger.error("Failed to parse JSON from LLM response content: %s", preview, exc_info=True)
raise ProviderAPICallError("LLM response JSON could not be parsed.") from exc
async def dispatch_import_analysis_job(
request: DataImportAnalysisJobRequest,
client: httpx.AsyncClient,
) -> Dict[str, Any]:
logger.info("Starting import analysis job %s", request.import_record_id)
payload = build_chat_payload(request)
url = f"{IMPORT_GATEWAY_BASE_URL.rstrip('/')}/v1/chat/completions"
logger.info(
"Dispatching import %s to %s: %s",
request.import_record_id,
url,
json.dumps(payload, ensure_ascii=False),
)
timeout = httpx.Timeout(IMPORT_CHAT_TIMEOUT_SECONDS)
try:
response = await client.post(url, json=payload, timeout=timeout)
response.raise_for_status()
except httpx.HTTPStatusError as exc:
body_preview = ""
if exc.response is not None:
body_preview = exc.response.text[:400]
raise ProviderAPICallError(
f"Failed to invoke chat completions endpoint: {exc}. Response body: {body_preview}"
) from exc
except httpx.HTTPError as exc:
raise ProviderAPICallError(
f"HTTP call to chat completions endpoint failed: {exc}"
) from exc
try:
response_data = response.json()
except ValueError as exc:
raise ProviderAPICallError("Chat completions endpoint returned invalid JSON") from exc
logger.info(
"LLM HTTP status for %s: %s",
request.import_record_id,
response.status_code,
)
logger.info(
"LLM response for %s: %s",
request.import_record_id,
json.dumps(response_data, ensure_ascii=False),
)
try:
llm_response = LLMResponse.model_validate(response_data)
except ValidationError as exc:
raise ProviderAPICallError(
"Chat completions endpoint returned unexpected schema"
) from exc
structured_json = parse_llm_analysis_json(llm_response)
usage_data = extract_usage(llm_response.raw)
logger.info("Completed import analysis job %s", request.import_record_id)
result: Dict[str, Any] = {
"import_record_id": request.import_record_id,
"status": "succeeded",
"llm_response": llm_response.model_dump(),
"analysis": structured_json
}
if usage_data:
result["usage"] = usage_data
return result
# 兼容处理多模型的使用量字段提取
def extract_usage(resp_json: dict) -> dict:
usage = resp_json.get("usage") or resp_json.get("usageMetadata") or {}
return {
"prompt_tokens": usage.get("prompt_tokens") or usage.get("input_tokens") or usage.get("promptTokenCount"),
"completion_tokens": usage.get("completion_tokens") or usage.get("output_tokens") or usage.get("candidatesTokenCount"),
"total_tokens": usage.get("total_tokens") or usage.get("totalTokenCount") or (
(usage.get("prompt_tokens") or usage.get("input_tokens") or 0)
+ (usage.get("completion_tokens") or usage.get("output_tokens") or 0)
)
}
async def notify_import_analysis_callback(
callback_url: str,
payload: Dict[str, Any],
client: httpx.AsyncClient,
) -> None:
callback_target = str(callback_url)
logger.info(
"Posting import analysis callback to %s: %s",
callback_target,
json.dumps(payload, ensure_ascii=False),
)
try:
response = await client.post(callback_target, json=payload)
response.raise_for_status()
except httpx.HTTPError as exc:
logger.error(
"Failed to deliver import analysis callback to %s: %s",
callback_target,
exc,
)
async def process_import_analysis_job(
request: DataImportAnalysisJobRequest,
client: httpx.AsyncClient,
) -> None:
try:
payload = await dispatch_import_analysis_job(request, client)
except ProviderAPICallError as exc:
logger.error(
"LLM call failed for %s: %s",
request.import_record_id,
exc,
)
payload = {
"import_record_id": request.import_record_id,
"status": "failed",
"error": str(exc),
}
except Exception as exc: # pragma: no cover - defensive logging
logger.exception(
"Unexpected failure while processing import analysis job %s",
request.import_record_id,
)
payload = {
"import_record_id": request.import_record_id,
"status": "failed",
"error": str(exc),
}
await notify_import_analysis_callback(request.callback_url, payload, client)

39
app/settings.py Normal file
View File

@ -0,0 +1,39 @@
from __future__ import annotations
import os
from functools import lru_cache
from typing import Dict, Set
from dotenv import load_dotenv
load_dotenv()
PROVIDER_KEY_ENV_MAP: Dict[str, str] = {
"openai": "OPENAI_API_KEY",
"anthropic": "ANTHROPIC_API_KEY",
"openrouter": "OPENROUTER_API_KEY",
"gemini": "GEMINI_API_KEY",
"qwen": "QWEN_API_KEY",
"deepseek": "DEEPSEEK_API_KEY",
}
DEFAULT_IMPORT_MODEL = os.getenv("DEFAULT_IMPORT_MODEL", "openai:gpt-4.1-mini")
@lru_cache(maxsize=1)
def get_supported_import_models() -> Set[str]:
raw = os.getenv("IMPORT_SUPPORTED_MODELS", "")
return {model.strip() for model in raw.split(",") if model.strip()}
@lru_cache(maxsize=1)
def get_available_provider_keys() -> Dict[str, str]:
keys: Dict[str, str] = {}
for provider, env_name in PROVIDER_KEY_ENV_MAP.items():
value = os.getenv(env_name)
if value:
keys[provider] = value
return keys

41
deepseek-result.json Normal file
View File

@ -0,0 +1,41 @@
{
"provider": "deepseek",
"model": "deepseek-chat",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "```json\n{\n \"table_name\": \"national_brand_sales\",\n \"description\": \"全国品牌系统外销售数据\",\n \"columns\": [\n {\n \"original_name\": \"品牌\",\n \"standard_name\": \"brand\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(50)\",\n \"java_type\": \"string\",\n \"nullable\": true,\n \"distinct_count_sample\": 5,\n \"null_ratio_sample\": 0.4,\n \"is_enum_candidate\": false,\n \"description\": \"品牌名称\",\n \"date_format\": null\n },\n {\n \"original_name\": \"产品价类\",\n \"standard_name\": \"price_category\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(10)\",\n \"java_type\": \"string\",\n \"nullable\": false,\n \"distinct_count_sample\": 3,\n \"null_ratio_sample\": 0.0,\n \"is_enum_candidate\": true,\n \"description\": \"产品价格分类(一类/二类/三类)\",\n \"date_format\": null\n },\n {\n \"original_name\": \"是否重点品牌"
}
}
],
"raw": {
"id": "67f3cc80-38bc-4bb7-b336-48d4886722c4",
"object": "chat.completion",
"created": 1761752207,
"model": "deepseek-chat",
"choices": [
{
"index": 0,
"message": {
"role": "assistant",
"content": "```json\n{\n \"table_name\": \"national_brand_sales\",\n \"description\": \"全国品牌系统外销售数据\",\n \"columns\": [\n {\n \"original_name\": \"品牌\",\n \"standard_name\": \"brand\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(50)\",\n \"java_type\": \"string\",\n \"nullable\": true,\n \"distinct_count_sample\": 5,\n \"null_ratio_sample\": 0.4,\n \"is_enum_candidate\": false,\n \"description\": \"品牌名称\",\n \"date_format\": null\n },\n {\n \"original_name\": \"产品价类\",\n \"standard_name\": \"price_category\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(10)\",\n \"java_type\": \"string\",\n \"nullable\": false,\n \"distinct_count_sample\": 3,\n \"null_ratio_sample\": 0.0,\n \"is_enum_candidate\": true,\n \"description\": \"产品价格分类(一类/二类/三类)\",\n \"date_format\": null\n },\n {\n \"original_name\": \"是否重点品牌"
},
"logprobs": null,
"finish_reason": "length"
}
],
"usage": {
"prompt_tokens": 1078,
"completion_tokens": 256,
"total_tokens": 1334,
"prompt_tokens_details": {
"cached_tokens": 1024
},
"prompt_cache_hit_tokens": 1024,
"prompt_cache_miss_tokens": 54
},
"system_fingerprint": "fp_ffc7281d48_prod0820_fp8_kvcache"
}
}

View File

@ -2,7 +2,7 @@
任务目标对提供的数据含表头或table schema与若干行样本数据进行解析生成一份导入分析与处理报告指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。
请从以下个方向进行思考:
请从以下个方向进行思考:
方向 1元数据识别与整理
解析表明根据表头、Origin Table Name、Orign File Name生成表名表名需要有意义
@ -12,7 +12,6 @@
方向 2字段数据类型与格式推断
针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text
说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。
指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。
给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime适用于将样本数据转换成数据库表字段兼容的格式。
若为“可能是枚举”的字段,列出候选枚举值及占比。
@ -23,12 +22,8 @@
"columns": [{
"original_name": "原始名称",
"standard_name": "标准化后的名称: 下划线命名,大小写字母、数字、下划线",
"data_type": "数据类型限制为number/string/datetime",
"db_type": "数据库字段类型",
"java_type": "java字段类型限制为: int/long/double/string/date",
"data_type": "",
"nullable": true/false,
"distinct_count_sample": number,
"null_ratio_sample": 0.x,
"is_enum_candidate": true/false,
"description": "字段简短描述",
"date_format": "转换成Date类型的pattern"
@ -39,4 +34,4 @@
若信息不足,请显式指出“信息不足”并给出补充数据需求清单。
避免武断结论,用“可能 / 候选 / 建议”字样。
不要捏造样本未出现的值。
不要捏造样本未出现的值。

View File

@ -0,0 +1,43 @@
"""Demonstrates calling /v1/chat/completions with the DeepSeek provider."""
from __future__ import annotations
import asyncio
import json
import httpx
from dotenv import load_dotenv
load_dotenv()
API_URL = "http://localhost:8000/v1/chat/completions"
async def main() -> None:
payload = {
"provider": "deepseek",
"model": "deepseek-chat",
"messages": [
{
"role": "system",
"content": "角色你是一名数据分析导入助手Data Ingestion Analyst擅长从原始数据抽取结构化元数据、推断字段类型、识别维度/事实属性并输出导入建模建议Table + JSON\n\n任务目标对提供的数据含表头或table schema与若干行样本数据进行解析生成一份导入分析与处理报告指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。\n\n请从以下两个方向进行思考:\n\n方向 1元数据识别与整理\n解析表明根据表头、Origin Table Name、Orign File Name生成表名表名需要有意义\n解析列名生成标准化字段名snake_case 或小驼峰),并给出原始列名与标准字段名映射。\n为每个字段写出中文/英文注释(若无法确定,给出“待确认”并附可能解释)。\n\n方向 2字段数据类型与格式推断\n针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text\n说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。\n指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。\n给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime适用于将样本数据转换成数据库表字段兼容的格式。\n若为“可能是枚举”的字段,列出候选枚举值及占比。\n\n最终内容都输出为一个json对象格式为字段级与表级定义字段含\n{\n \"table_name\": \"标准化后的表名\",\n \"description\": \"表简短描述\",\n \"columns\": [{\n \"original_name\": \"原始名称\",\n \"standard_name\": \"标准化后的名称: 下划线命名,大小写字母、数字、下划线\",\n \"data_type\": \"数据类型限制为number/string/datetime\",\n \"db_type\": \"数据库字段类型\",\n \"java_type\": \"java字段类型限制为: int/long/double/string/date\",\n \"nullable\": true/false,\n \"distinct_count_sample\": number,\n \"null_ratio_sample\": 0.x,\n \"is_enum_candidate\": true/false,\n \"description\": \"字段简短描述\",\n \"date_format\": \"转换成Date类型的pattern\"\n }]\n}\n\n约束与风格:\n\n若信息不足,请显式指出“信息不足”并给出补充数据需求清单。\n避免武断结论,用“可能 / 候选 / 建议”字样。\n不要捏造样本未出现的值。"
},
{
"role": "user",
"content": "导入记录ID: demo-import-001\n\n表头信息:\n- 品牌\n- 产品价类\n- 是否重点品牌\n- 系统外销售量(箱)\n- 系统外销售金额(万元)\n- 同期系统外销售量(箱)\n- 同期系统外销售金额(万元)\n\n示例数据:\nCSV样本预览:\n品牌,产品价类,是否重点品牌,系统外销售量(箱),系统外销售金额(万元),同期系统外销售量(箱),同期系统外销售金额(万元)\r\n白沙,一类,重点品牌,3332.406875,64283.5593333333,3123.693375,61821.7986666667\r\nnan,二类,重点品牌,1094.4707375,3859.69366666667,869.65725,3067.00966666667\r\nnan,三类,重点品牌,3965.0457375,8388.306,4401.6714875,8802.132\r\n宝岛,一类,否,39.934375,301.617666666667,30.5975,249.399666666667\r\n长白山,一类,重点品牌,2666.53775,12360.8306666667,1916.252,9051.672\r\nnan,二类,重点品牌,2359.910025,7671.26233333333,2335.2480875,7590.791\r\nnan,三类,重点品牌,1263.293875,2826.665,1590.750875,3503.083\r\n大前门,一类,否,81.5806875,343.721333333333,114.1179875,480.809333333333\r\nnan,三类,否,226.445225,319.975666666667,254.6595125,359.894\r\n大青山,二类,否,60.73525,209.415,60.2415,207.712666666667\n\n附加结构信息:\n{\n \"source\": \"excel\",\n \"file_name\": \"全国品牌.xlsx\",\n \"sheet_name\": \"Sheet1\"\n}"
}
],
"temperature": 0.2,
"max_tokens": 350,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
response = await client.post(API_URL, json=payload)
response.raise_for_status()
data = response.json()
print(json.dumps(data, ensure_ascii=False, indent=2))
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,49 @@
"""Minimal example for hitting the /v1/import/analyze endpoint with Excel data."""
from __future__ import annotations
import asyncio
from pathlib import Path
import httpx
import pandas as pd
from dotenv import load_dotenv
load_dotenv()
API_URL = "http://localhost:8000/v1/import/analyze"
CALLBACK_URL = "http://localhost:8000/__mock__/import-callback"
EXCEL_PATH = Path(__file__).resolve().parents[1] / "file" / "全国品牌.xlsx"
async def main() -> None:
excel = pd.ExcelFile(EXCEL_PATH)
sheet_name = excel.sheet_names[0]
df = excel.parse(sheet_name)
sampled = df.head(10)
rows = sampled.to_dict(orient="records")
headers = [str(column) for column in sampled.columns]
payload = {
"import_record_id": "demo-import-001",
"rows": rows,
"headers": headers,
"table_schema": {
"source": "excel",
"file_name": EXCEL_PATH.name,
"sheet_name": sheet_name,
},
"llm_model": "deepseek:deepseek-chat",
"temperature": 0.2,
"callback_url": CALLBACK_URL,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
response = await client.post(API_URL, json=payload)
print("Status:", response.status_code)
if __name__ == "__main__":
asyncio.run(main())

View File

@ -0,0 +1,61 @@
"""Quick demo call against the unified chat endpoint using the OpenRouter provider."""
from __future__ import annotations
import asyncio
import httpx
from dotenv import load_dotenv
import json
load_dotenv()
API_URL = "http://localhost:8000/v1/chat/completions"
def extract_schema(payload: dict) -> dict:
content = payload["choices"][0]["message"]["content"]
if content.startswith("```"):
content = content.split("```json", 1)[-1]
content = content.rsplit("```", 1)[0]
return json.loads(content)
async def main() -> None:
payload = {
"provider": "openrouter",
"model": "anthropic/claude-3.7-sonnet",
"messages": [
{
"role": "system",
"content": "角色你是一名数据分析导入助手Data Ingestion Analyst擅长从原始数据抽取结构化元数据、推断字段类型、识别维度/事实属性并输出导入建模建议Table + JSON\n\n任务目标对提供的数据含表头或table schema与若干行样本数据进行解析生成一份导入分析与处理报告指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。\n\n请从以下两个方向进行思考:\n\n方向 1元数据识别与整理\n解析表明根据表头、Origin Table Name、Orign File Name生成表名表名需要有意义\n解析列名生成标准化字段名snake_case 或小驼峰),并给出原始列名与标准字段名映射。\n为每个字段写出中文/英文注释(若无法确定,给出“待确认”并附可能解释)。\n\n方向 2字段数据类型与格式推断\n针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text\n说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。\n指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。\n给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime适用于将样本数据转换成数据库表字段兼容的格式。\n若为“可能是枚举”的字段,列出候选枚举值及占比。\n\n最终内容都输出为一个json对象格式为字段级与表级定义字段含\n{\n \"table_name\": \"标准化后的表名\",\n \"description\": \"表简短描述\",\n \"columns\": [{\n \"original_name\": \"原始名称\",\n \"standard_name\": \"标准化后的名称: 下划线命名,大小写字母、数字、下划线\",\n \"data_type\": \"数据类型限制为number/string/datetime\",\n \"db_type\": \"数据库字段类型\",\n \"java_type\": \"java字段类型限制为: int/long/double/string/date\",\n \"nullable\": true/false,\n \"distinct_count_sample\": number,\n \"null_ratio_sample\": 0.x,\n \"is_enum_candidate\": true/false,\n \"description\": \"字段简短描述\",\n \"date_format\": \"转换成Date类型的pattern\"\n }]\n}\n\n约束与风格:\n\n若信息不足,请显式指出“信息不足”并给出补充数据需求清单。\n避免武断结论,用“可能 / 候选 / 建议”字样。\n不要捏造样本未出现的值。"
},
{
"role": "user",
"content": "导入记录ID: demo-import-001\n\n表头信息:\n- 品牌\n- 产品价类\n- 是否重点品牌\n- 系统外销售量(箱)\n- 系统外销售金额(万元)\n- 同期系统外销售量(箱)\n- 同期系统外销售金额(万元)\n\n示例数据:\nCSV样本预览:\n品牌,产品价类,是否重点品牌,系统外销售量(箱),系统外销售金额(万元),同期系统外销售量(箱),同期系统外销售金额(万元)\r\n白沙,一类,重点品牌,3332.406875,64283.5593333333,3123.693375,61821.7986666667\r\nnan,二类,重点品牌,1094.4707375,3859.69366666667,869.65725,3067.00966666667\r\nnan,三类,重点品牌,3965.0457375,8388.306,4401.6714875,8802.132\r\n宝岛,一类,否,39.934375,301.617666666667,30.5975,249.399666666667\r\n长白山,一类,重点品牌,2666.53775,12360.8306666667,1916.252,9051.672\r\nnan,二类,重点品牌,2359.910025,7671.26233333333,2335.2480875,7590.791\r\nnan,三类,重点品牌,1263.293875,2826.665,1590.750875,3503.083\r\n大前门,一类,否,81.5806875,343.721333333333,114.1179875,480.809333333333\r\nnan,三类,否,226.445225,319.975666666667,254.6595125,359.894\r\n大青山,二类,否,60.73525,209.415,60.2415,207.712666666667\n\n附加结构信息:\n{\n \"source\": \"excel\",\n \"file_name\": \"全国品牌.xlsx\",\n \"sheet_name\": \"Sheet1\"\n}"
}
],
"temperature": 0.1,
"max_tokens": 2048,
}
async with httpx.AsyncClient(timeout=httpx.Timeout(15.0)) as client:
response = await client.post(API_URL, json=payload)
print("Status:", response.status_code)
try:
data = response.json()
except ValueError:
print("Raw Body:", response.text)
return
print("Body:", data)
try:
schema = extract_schema(data)
print("Table name:", schema.get("table_name"))
except (KeyError, json.JSONDecodeError) as exc:
print("Failed to parse schema:", exc)
if __name__ == "__main__":
asyncio.run(main())