Compare commits
10 Commits
59b5e9410b
...
a0ed43a8b7
| Author | SHA1 | Date | |
|---|---|---|---|
| a0ed43a8b7 | |||
| 455b884551 | |||
| 39911d78ab | |||
| 89af7cd0a4 | |||
| 261418fd35 | |||
| 2d158750fa | |||
| a78c8b9446 | |||
| 59c9efa5d8 | |||
| f43590585b | |||
| 76b8c9d79b |
29
.env
Normal file
29
.env
Normal file
@ -0,0 +1,29 @@
|
||||
# LLM provider API keys
|
||||
OPENAI_API_KEY=
|
||||
ANTHROPIC_API_KEY=
|
||||
OPENROUTER_API_KEY="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"
|
||||
OPENROUTER_SITE_URL=
|
||||
OPENROUTER_APP_NAME=
|
||||
GEMINI_API_KEY=
|
||||
QWEN_API_KEY=
|
||||
DEEPSEEK_API_KEY="sk-657f0752a1564563be7ce35b6a0a7b46"
|
||||
DEEPSEEK_TIMEOUT_SECONDS=120
|
||||
|
||||
# Data import analysis defaults
|
||||
IMPORT_SUPPORTED_MODELS=openai:gpt-5,deepseek:deepseek-chat,openrouter:anthropic/claude-4.0-sonnet
|
||||
DEFAULT_IMPORT_MODEL=deepseek:deepseek-chat
|
||||
|
||||
# Service configuration
|
||||
IMPORT_GATEWAY_BASE_URL=http://localhost:8000
|
||||
|
||||
# HTTP client configuration
|
||||
HTTP_CLIENT_TIMEOUT=30
|
||||
HTTP_CLIENT_TRUST_ENV=false
|
||||
# HTTP_CLIENT_PROXY=
|
||||
|
||||
# Import analysis configuration
|
||||
IMPORT_CHAT_TIMEOUT_SECONDS=120
|
||||
|
||||
# Logging
|
||||
LOG_LEVEL=INFO
|
||||
# LOG_FORMAT=%(asctime)s %(levelname)s %(name)s:%(lineno)d %(message)s
|
||||
5
.gitignore
vendored
5
.gitignore
vendored
@ -1,3 +1,6 @@
|
||||
.venv
|
||||
gx/uncommitted/
|
||||
.vscode/
|
||||
.vscode/
|
||||
**/__pycache__/
|
||||
*.pyc
|
||||
.DS_Store
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -4,3 +4,14 @@ class ProviderConfigurationError(RuntimeError):
|
||||
|
||||
class ProviderAPICallError(RuntimeError):
|
||||
"""Raised when the upstream provider responds with an error."""
|
||||
|
||||
def __init__(
|
||||
self,
|
||||
message: str,
|
||||
*,
|
||||
status_code: int | None = None,
|
||||
response_text: str | None = None,
|
||||
) -> None:
|
||||
super().__init__(message)
|
||||
self.status_code = status_code
|
||||
self.response_text = response_text
|
||||
|
||||
144
app/main.py
144
app/main.py
@ -1,24 +1,108 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import logging
|
||||
import os
|
||||
from contextlib import asynccontextmanager
|
||||
from typing import Any
|
||||
|
||||
import httpx
|
||||
from fastapi import Depends, FastAPI, HTTPException, Request
|
||||
|
||||
from app.exceptions import ProviderAPICallError, ProviderConfigurationError
|
||||
from app.models import (
|
||||
DataImportAnalysisRequest,
|
||||
DataImportAnalysisResponse,
|
||||
DataImportAnalysisJobAck,
|
||||
DataImportAnalysisJobRequest,
|
||||
LLMRequest,
|
||||
LLMResponse,
|
||||
)
|
||||
from app.services import LLMGateway
|
||||
from app.services.import_analysis import build_import_messages, resolve_provider_from_model
|
||||
from app.services.import_analysis import process_import_analysis_job
|
||||
|
||||
|
||||
def _configure_logging() -> None:
|
||||
level_name = os.getenv("LOG_LEVEL", "INFO").upper()
|
||||
level = getattr(logging, level_name, logging.INFO)
|
||||
log_format = os.getenv(
|
||||
"LOG_FORMAT",
|
||||
"%(asctime)s %(levelname)s %(name)s:%(lineno)d %(message)s",
|
||||
)
|
||||
|
||||
root = logging.getLogger()
|
||||
|
||||
if not root.handlers:
|
||||
logging.basicConfig(level=level, format=log_format)
|
||||
else:
|
||||
root.setLevel(level)
|
||||
formatter = logging.Formatter(log_format)
|
||||
for handler in root.handlers:
|
||||
handler.setLevel(level)
|
||||
handler.setFormatter(formatter)
|
||||
|
||||
|
||||
_configure_logging()
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _env_bool(name: str, default: bool) -> bool:
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
return raw.strip().lower() in {"1", "true", "yes", "on"}
|
||||
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
try:
|
||||
return float(raw)
|
||||
except ValueError:
|
||||
logger.warning("Invalid value for %s=%r, using default %.2f", name, raw, default)
|
||||
return default
|
||||
|
||||
|
||||
def _parse_proxy_config(raw: str | None) -> dict[str, str] | str | None:
|
||||
if raw is None:
|
||||
return None
|
||||
|
||||
cleaned = raw.strip()
|
||||
if not cleaned:
|
||||
return None
|
||||
|
||||
# Support comma-separated key=value pairs for scheme-specific proxies.
|
||||
if "=" in cleaned:
|
||||
proxies: dict[str, str] = {}
|
||||
for part in cleaned.split(","):
|
||||
key, sep, value = part.partition("=")
|
||||
if not sep:
|
||||
continue
|
||||
key = key.strip()
|
||||
value = value.strip()
|
||||
if key and value:
|
||||
proxies[key] = value
|
||||
if proxies:
|
||||
return proxies
|
||||
|
||||
return cleaned
|
||||
|
||||
|
||||
def _create_http_client() -> httpx.AsyncClient:
|
||||
timeout_seconds = _env_float("HTTP_CLIENT_TIMEOUT", 30.0)
|
||||
trust_env = _env_bool("HTTP_CLIENT_TRUST_ENV", True)
|
||||
proxies = _parse_proxy_config(os.getenv("HTTP_CLIENT_PROXY"))
|
||||
client_kwargs: dict[str, object] = {
|
||||
"timeout": httpx.Timeout(timeout_seconds),
|
||||
"trust_env": trust_env,
|
||||
}
|
||||
if proxies:
|
||||
client_kwargs["proxies"] = proxies
|
||||
return httpx.AsyncClient(**client_kwargs)
|
||||
|
||||
|
||||
@asynccontextmanager
|
||||
async def lifespan(app: FastAPI):
|
||||
client = httpx.AsyncClient(timeout=httpx.Timeout(30.0))
|
||||
client = _create_http_client()
|
||||
gateway = LLMGateway()
|
||||
try:
|
||||
app.state.http_client = client # type: ignore[attr-defined]
|
||||
@ -48,46 +132,42 @@ def create_app() -> FastAPI:
|
||||
try:
|
||||
return await gateway.chat(payload, client)
|
||||
except ProviderConfigurationError as exc:
|
||||
logger.error("Provider configuration error: %s", exc, exc_info=True)
|
||||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||||
except ProviderAPICallError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
status_code = exc.status_code or 502
|
||||
log_detail = exc.response_text or str(exc)
|
||||
logger.error(
|
||||
"Provider API call error (status %s): %s",
|
||||
status_code,
|
||||
log_detail,
|
||||
exc_info=True,
|
||||
)
|
||||
raise HTTPException(status_code=status_code, detail=str(exc)) from exc
|
||||
|
||||
@application.post(
|
||||
"/v1/import/analyze",
|
||||
response_model=DataImportAnalysisResponse,
|
||||
summary="Analyze import sample data via configured LLM",
|
||||
response_model=DataImportAnalysisJobAck,
|
||||
summary="Schedule async import analysis and notify via callback",
|
||||
status_code=202,
|
||||
)
|
||||
async def analyze_import_data(
|
||||
payload: DataImportAnalysisRequest,
|
||||
gateway: LLMGateway = Depends(get_gateway),
|
||||
payload: DataImportAnalysisJobRequest,
|
||||
client: httpx.AsyncClient = Depends(get_http_client),
|
||||
) -> DataImportAnalysisResponse:
|
||||
try:
|
||||
provider, model_name = resolve_provider_from_model(payload.llm_model)
|
||||
except ValueError as exc:
|
||||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||||
) -> DataImportAnalysisJobAck:
|
||||
request_copy = payload.model_copy(deep=True)
|
||||
|
||||
messages = build_import_messages(payload)
|
||||
async def _runner() -> None:
|
||||
await process_import_analysis_job(request_copy, client)
|
||||
|
||||
llm_request = LLMRequest(
|
||||
provider=provider,
|
||||
model=model_name,
|
||||
messages=messages,
|
||||
temperature=payload.temperature if payload.temperature is not None else 0.2,
|
||||
max_tokens=payload.max_tokens,
|
||||
)
|
||||
asyncio.create_task(_runner())
|
||||
|
||||
try:
|
||||
llm_response = await gateway.chat(llm_request, client)
|
||||
except ProviderConfigurationError as exc:
|
||||
raise HTTPException(status_code=422, detail=str(exc)) from exc
|
||||
except ProviderAPICallError as exc:
|
||||
raise HTTPException(status_code=502, detail=str(exc)) from exc
|
||||
return DataImportAnalysisJobAck(import_record_id=payload.import_record_id, status="accepted")
|
||||
|
||||
return DataImportAnalysisResponse(
|
||||
import_record_id=payload.import_record_id,
|
||||
llm_response=llm_response,
|
||||
)
|
||||
@application.post("/__mock__/import-callback")
|
||||
async def mock_import_callback(payload: dict[str, Any]) -> dict[str, str]:
|
||||
logger.info("Received import analysis callback: %s", payload)
|
||||
return {"status": "received"}
|
||||
|
||||
return application
|
||||
|
||||
|
||||
@ -1,9 +1,11 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from enum import Enum
|
||||
from typing import Any, List, Optional
|
||||
from typing import Any, Dict, List, Optional, Union
|
||||
|
||||
from pydantic import BaseModel, Field
|
||||
from pydantic import BaseModel, Field, HttpUrl
|
||||
|
||||
from app.settings import DEFAULT_IMPORT_MODEL
|
||||
|
||||
|
||||
class LLMRole(str, Enum):
|
||||
@ -90,3 +92,46 @@ class DataImportAnalysisRequest(BaseModel):
|
||||
class DataImportAnalysisResponse(BaseModel):
|
||||
import_record_id: str
|
||||
llm_response: LLMResponse
|
||||
|
||||
|
||||
class DataImportAnalysisJobRequest(BaseModel):
|
||||
import_record_id: str = Field(
|
||||
..., description="Unique identifier for this import request run."
|
||||
)
|
||||
rows: List[Union[Dict[str, Any], List[Any]]] = Field(
|
||||
...,
|
||||
description="Sample rows from the import payload. Accepts list of dicts or list of lists.",
|
||||
)
|
||||
headers: Optional[List[str]] = Field(
|
||||
None,
|
||||
description="Ordered list of table headers associated with the data sample.",
|
||||
)
|
||||
raw_csv: Optional[str] = Field(
|
||||
None,
|
||||
description="Optional raw CSV representation of the sample rows, if already prepared.",
|
||||
)
|
||||
table_schema: Optional[Any] = Field(
|
||||
None,
|
||||
description="Optional schema description for the table. Can be a string or JSON-serialisable structure.",
|
||||
)
|
||||
callback_url: HttpUrl = Field(
|
||||
...,
|
||||
description="URL to notify when the analysis completes. Receives JSON payload with status/results.",
|
||||
)
|
||||
llm_model: str = Field(
|
||||
DEFAULT_IMPORT_MODEL,
|
||||
description="Target LLM model identifier. Defaults to DEFAULT_IMPORT_MODEL.",
|
||||
)
|
||||
temperature: Optional[float] = Field(
|
||||
None,
|
||||
description="Optional override for model temperature when generating analysis output.",
|
||||
)
|
||||
max_output_tokens: Optional[int] = Field(
|
||||
None,
|
||||
description="Optional maximum number of tokens to generate in the analysis response.",
|
||||
)
|
||||
|
||||
|
||||
class DataImportAnalysisJobAck(BaseModel):
|
||||
import_record_id: str = Field(..., description="Echo of the import record identifier")
|
||||
status: str = Field("accepted", description="Processing status acknowledgement.")
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
import httpx
|
||||
@ -16,6 +17,9 @@ from app.models import (
|
||||
from app.providers.base import LLMProviderClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class AnthropicProvider(LLMProviderClient):
|
||||
name = LLMProvider.ANTHROPIC.value
|
||||
api_key_env = "ANTHROPIC_API_KEY"
|
||||
@ -52,7 +56,19 @@ class AnthropicProvider(LLMProviderClient):
|
||||
try:
|
||||
response = await client.post(self.base_url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status_code = exc.response.status_code
|
||||
body = exc.response.text
|
||||
logger.error(
|
||||
"Anthropic upstream returned %s: %s", status_code, body, exc_info=True
|
||||
)
|
||||
raise ProviderAPICallError(
|
||||
f"Anthropic request failed with status {status_code}",
|
||||
status_code=status_code,
|
||||
response_text=body,
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error("Anthropic transport error: %s", exc, exc_info=True)
|
||||
raise ProviderAPICallError(f"Anthropic request failed: {exc}") from exc
|
||||
|
||||
data: Dict[str, Any] = response.json()
|
||||
|
||||
@ -1,5 +1,7 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import httpx
|
||||
@ -9,6 +11,26 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
|
||||
from app.providers.base import LLMProviderClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
def _resolve_timeout_seconds() -> float:
|
||||
raw = os.getenv("DEEPSEEK_TIMEOUT_SECONDS")
|
||||
if raw is None:
|
||||
return 60.0
|
||||
try:
|
||||
return float(raw)
|
||||
except ValueError:
|
||||
logger.warning(
|
||||
"Invalid value for DEEPSEEK_TIMEOUT_SECONDS=%r, falling back to 60 seconds",
|
||||
raw,
|
||||
)
|
||||
return 60.0
|
||||
|
||||
|
||||
DEEPSEEK_TIMEOUT_SECONDS = _resolve_timeout_seconds()
|
||||
|
||||
|
||||
class DeepSeekProvider(LLMProviderClient):
|
||||
name = LLMProvider.DEEPSEEK.value
|
||||
api_key_env = "DEEPSEEK_API_KEY"
|
||||
@ -36,11 +58,24 @@ class DeepSeekProvider(LLMProviderClient):
|
||||
"Authorization": f"Bearer {self.api_key}",
|
||||
"Content-Type": "application/json",
|
||||
}
|
||||
timeout = httpx.Timeout(DEEPSEEK_TIMEOUT_SECONDS)
|
||||
|
||||
try:
|
||||
response = await client.post(self.base_url, json=payload, headers=headers)
|
||||
response = await client.post(
|
||||
self.base_url, json=payload, headers=headers, timeout=timeout
|
||||
)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status_code = exc.response.status_code
|
||||
body = exc.response.text
|
||||
logger.error("DeepSeek upstream returned %s: %s", status_code, body, exc_info=True)
|
||||
raise ProviderAPICallError(
|
||||
f"DeepSeek request failed with status {status_code}",
|
||||
status_code=status_code,
|
||||
response_text=body,
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error("DeepSeek transport error: %s", exc, exc_info=True)
|
||||
raise ProviderAPICallError(f"DeepSeek request failed: {exc}") from exc
|
||||
|
||||
data: Dict[str, Any] = response.json()
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List, Tuple
|
||||
|
||||
import httpx
|
||||
@ -16,6 +17,9 @@ from app.models import (
|
||||
from app.providers.base import LLMProviderClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class GeminiProvider(LLMProviderClient):
|
||||
name = LLMProvider.GEMINI.value
|
||||
api_key_env = "GEMINI_API_KEY"
|
||||
@ -53,7 +57,19 @@ class GeminiProvider(LLMProviderClient):
|
||||
try:
|
||||
response = await client.post(endpoint, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status_code = exc.response.status_code
|
||||
body = exc.response.text
|
||||
logger.error(
|
||||
"Gemini upstream returned %s: %s", status_code, body, exc_info=True
|
||||
)
|
||||
raise ProviderAPICallError(
|
||||
f"Gemini request failed with status {status_code}",
|
||||
status_code=status_code,
|
||||
response_text=body,
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error("Gemini transport error: %s", exc, exc_info=True)
|
||||
raise ProviderAPICallError(f"Gemini request failed: {exc}") from exc
|
||||
|
||||
data: Dict[str, Any] = response.json()
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import httpx
|
||||
@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
|
||||
from app.providers.base import LLMProviderClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OpenAIProvider(LLMProviderClient):
|
||||
name = LLMProvider.OPENAI.value
|
||||
api_key_env = "OPENAI_API_KEY"
|
||||
@ -40,7 +44,19 @@ class OpenAIProvider(LLMProviderClient):
|
||||
try:
|
||||
response = await client.post(self.base_url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status_code = exc.response.status_code
|
||||
body = exc.response.text
|
||||
logger.error(
|
||||
"OpenAI upstream returned %s: %s", status_code, body, exc_info=True
|
||||
)
|
||||
raise ProviderAPICallError(
|
||||
f"OpenAI request failed with status {status_code}",
|
||||
status_code=status_code,
|
||||
response_text=body,
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error("OpenAI transport error: %s", exc, exc_info=True)
|
||||
raise ProviderAPICallError(f"OpenAI request failed: {exc}") from exc
|
||||
|
||||
data: Dict[str, Any] = response.json()
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
import os
|
||||
from typing import Any, Dict, List
|
||||
|
||||
@ -10,6 +11,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
|
||||
from app.providers.base import LLMProviderClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class OpenRouterProvider(LLMProviderClient):
|
||||
name = LLMProvider.OPENROUTER.value
|
||||
api_key_env = "OPENROUTER_API_KEY"
|
||||
@ -51,7 +55,19 @@ class OpenRouterProvider(LLMProviderClient):
|
||||
try:
|
||||
response = await client.post(self.base_url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status_code = exc.response.status_code
|
||||
body = exc.response.text
|
||||
logger.error(
|
||||
"OpenRouter upstream returned %s: %s", status_code, body, exc_info=True
|
||||
)
|
||||
raise ProviderAPICallError(
|
||||
f"OpenRouter request failed with status {status_code}",
|
||||
status_code=status_code,
|
||||
response_text=body,
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error("OpenRouter transport error: %s", exc, exc_info=True)
|
||||
raise ProviderAPICallError(f"OpenRouter request failed: {exc}") from exc
|
||||
|
||||
data: Dict[str, Any] = response.json()
|
||||
|
||||
@ -1,5 +1,6 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import logging
|
||||
from typing import Any, Dict, List
|
||||
|
||||
import httpx
|
||||
@ -9,6 +10,9 @@ from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMRespon
|
||||
from app.providers.base import LLMProviderClient
|
||||
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
|
||||
class QwenProvider(LLMProviderClient):
|
||||
name = LLMProvider.QWEN.value
|
||||
api_key_env = "QWEN_API_KEY"
|
||||
@ -48,7 +52,17 @@ class QwenProvider(LLMProviderClient):
|
||||
try:
|
||||
response = await client.post(self.base_url, json=payload, headers=headers)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
status_code = exc.response.status_code
|
||||
body = exc.response.text
|
||||
logger.error("Qwen upstream returned %s: %s", status_code, body, exc_info=True)
|
||||
raise ProviderAPICallError(
|
||||
f"Qwen request failed with status {status_code}",
|
||||
status_code=status_code,
|
||||
response_text=body,
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error("Qwen transport error: %s", exc, exc_info=True)
|
||||
raise ProviderAPICallError(f"Qwen request failed: {exc}") from exc
|
||||
|
||||
data: Dict[str, Any] = response.json()
|
||||
|
||||
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
Binary file not shown.
@ -1,13 +1,50 @@
|
||||
from __future__ import annotations
|
||||
|
||||
from typing import List, Tuple
|
||||
import csv
|
||||
import json
|
||||
import logging
|
||||
import os
|
||||
import re
|
||||
from functools import lru_cache
|
||||
from io import StringIO
|
||||
from pathlib import Path
|
||||
from typing import Any, Dict, List, Sequence, Tuple
|
||||
|
||||
import httpx
|
||||
from pydantic import ValidationError
|
||||
|
||||
from app.exceptions import ProviderAPICallError
|
||||
from app.models import (
|
||||
DataImportAnalysisJobRequest,
|
||||
DataImportAnalysisRequest,
|
||||
LLMMessage,
|
||||
LLMProvider,
|
||||
LLMResponse,
|
||||
LLMRole,
|
||||
)
|
||||
from app.settings import DEFAULT_IMPORT_MODEL, get_supported_import_models
|
||||
|
||||
logger = logging.getLogger(__name__)
|
||||
|
||||
IMPORT_GATEWAY_BASE_URL = os.getenv(
|
||||
"IMPORT_GATEWAY_BASE_URL", "http://localhost:8000"
|
||||
)
|
||||
|
||||
|
||||
def _env_float(name: str, default: float) -> float:
|
||||
raw = os.getenv(name)
|
||||
if raw is None:
|
||||
return default
|
||||
try:
|
||||
return float(raw)
|
||||
except ValueError:
|
||||
logger.warning("Invalid value for %s=%r, falling back to %.2f", name, raw, default)
|
||||
return default
|
||||
|
||||
|
||||
IMPORT_CHAT_TIMEOUT_SECONDS = _env_float("IMPORT_CHAT_TIMEOUT_SECONDS", 90.0)
|
||||
|
||||
SUPPORTED_IMPORT_MODELS = get_supported_import_models()
|
||||
|
||||
|
||||
def resolve_provider_from_model(llm_model: str) -> Tuple[LLMProvider, str]:
|
||||
@ -69,23 +106,337 @@ def build_import_messages(
|
||||
"""Create system and user messages for the import analysis prompt."""
|
||||
headers_formatted = "\n".join(f"- {header}" for header in request.table_headers)
|
||||
|
||||
system_prompt = (
|
||||
"你是一名数据导入识别助手。请根据给定的表头和示例数据,判断字段含义、"
|
||||
"典型数据类型以及潜在的数据质量问题。最终请返回一个结构化的JSON。\n"
|
||||
"JSON结构需包含: field_summaries (数组, 每项含 header、meaning、data_type、quality_notes), "
|
||||
"detected_issues (字符串数组),以及 overall_suggestion (字符串)。"
|
||||
)
|
||||
system_prompt = load_import_template()
|
||||
|
||||
user_prompt = (
|
||||
data_block = (
|
||||
f"导入记录ID: {request.import_record_id}\n\n"
|
||||
"表头信息:\n"
|
||||
f"{headers_formatted}\n\n"
|
||||
"示例数据:\n"
|
||||
f"{request.example_data}\n\n"
|
||||
"请仔细分析示例数据与表头之间的对应关系,并返回符合上述JSON结构的内容。"
|
||||
f"{request.example_data}"
|
||||
)
|
||||
|
||||
return [
|
||||
LLMMessage(role=LLMRole.SYSTEM, content=system_prompt),
|
||||
LLMMessage(role=LLMRole.USER, content=user_prompt),
|
||||
LLMMessage(role=LLMRole.USER, content=data_block),
|
||||
]
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def load_import_template() -> str:
|
||||
template_path = (
|
||||
Path(__file__).resolve().parents[2] / "prompt" / "data_import_analysis.md"
|
||||
)
|
||||
if not template_path.exists():
|
||||
raise FileNotFoundError(f"Prompt template not found at {template_path}")
|
||||
return template_path.read_text(encoding="utf-8").strip()
|
||||
|
||||
|
||||
def derive_headers(
|
||||
rows: Sequence[Any], provided_headers: Sequence[str] | None
|
||||
) -> List[str]:
|
||||
if provided_headers:
|
||||
return [str(header) for header in provided_headers]
|
||||
|
||||
collected: List[str] = []
|
||||
list_lengths: List[int] = []
|
||||
|
||||
for row in rows:
|
||||
if isinstance(row, dict):
|
||||
for key in row.keys():
|
||||
key_str = str(key)
|
||||
if key_str not in collected:
|
||||
collected.append(key_str)
|
||||
elif isinstance(row, (list, tuple)):
|
||||
list_lengths.append(len(row))
|
||||
|
||||
if collected:
|
||||
return collected
|
||||
|
||||
if list_lengths:
|
||||
max_len = max(list_lengths)
|
||||
return [f"column_{idx + 1}" for idx in range(max_len)]
|
||||
|
||||
return ["column_1"]
|
||||
|
||||
|
||||
def _stringify_cell(value: Any) -> str:
|
||||
if value is None:
|
||||
return ""
|
||||
if isinstance(value, (str, int, float, bool)):
|
||||
return str(value)
|
||||
try:
|
||||
return json.dumps(value, ensure_ascii=False)
|
||||
except (TypeError, ValueError):
|
||||
return str(value)
|
||||
|
||||
|
||||
def rows_to_csv_text(
|
||||
rows: Sequence[Any], headers: Sequence[str], *, max_rows: int = 50
|
||||
) -> str:
|
||||
buffer = StringIO()
|
||||
writer = csv.writer(buffer)
|
||||
|
||||
if headers:
|
||||
writer.writerow(headers)
|
||||
|
||||
for idx, row in enumerate(rows):
|
||||
if max_rows and idx >= max_rows:
|
||||
break
|
||||
if isinstance(row, dict):
|
||||
writer.writerow([_stringify_cell(row.get(header)) for header in headers])
|
||||
elif isinstance(row, (list, tuple)):
|
||||
writer.writerow([_stringify_cell(item) for item in row])
|
||||
else:
|
||||
writer.writerow([_stringify_cell(row)])
|
||||
|
||||
return buffer.getvalue().strip()
|
||||
|
||||
|
||||
def format_table_schema(schema: Any) -> str:
|
||||
if schema is None:
|
||||
return ""
|
||||
if isinstance(schema, str):
|
||||
return schema.strip()
|
||||
try:
|
||||
return json.dumps(schema, ensure_ascii=False, indent=2)
|
||||
except (TypeError, ValueError):
|
||||
return str(schema)
|
||||
|
||||
|
||||
def build_analysis_request(
|
||||
request: DataImportAnalysisJobRequest,
|
||||
) -> DataImportAnalysisRequest:
|
||||
headers = derive_headers(request.rows, request.headers)
|
||||
|
||||
if request.raw_csv:
|
||||
csv_text = request.raw_csv.strip()
|
||||
else:
|
||||
csv_text = rows_to_csv_text(request.rows, headers)
|
||||
|
||||
sections: List[str] = []
|
||||
if csv_text:
|
||||
sections.append("CSV样本预览:\n" + csv_text)
|
||||
schema_text = format_table_schema(request.table_schema)
|
||||
if schema_text:
|
||||
sections.append("附加结构信息:\n" + schema_text)
|
||||
|
||||
example_data = "\n\n".join(sections) if sections else "未提供样本数据。"
|
||||
|
||||
max_length = 30000
|
||||
if len(example_data) > max_length:
|
||||
example_data = example_data[: max_length - 3] + "..."
|
||||
|
||||
return DataImportAnalysisRequest(
|
||||
import_record_id=request.import_record_id,
|
||||
example_data=example_data,
|
||||
table_headers=headers,
|
||||
llm_model=request.llm_model or DEFAULT_IMPORT_MODEL,
|
||||
)
|
||||
|
||||
|
||||
def build_chat_payload(request: DataImportAnalysisJobRequest) -> Dict[str, Any]:
|
||||
llm_input = request.llm_model or DEFAULT_IMPORT_MODEL
|
||||
provider, model_name = resolve_provider_from_model(llm_input)
|
||||
normalized_model = f"{provider.value}:{model_name}"
|
||||
|
||||
if SUPPORTED_IMPORT_MODELS and normalized_model not in SUPPORTED_IMPORT_MODELS:
|
||||
raise ProviderAPICallError(
|
||||
"Model '{model}' is not allowed. Allowed models: {allowed}".format(
|
||||
model=normalized_model,
|
||||
allowed=", ".join(sorted(SUPPORTED_IMPORT_MODELS)),
|
||||
)
|
||||
)
|
||||
|
||||
analysis_request = build_analysis_request(request)
|
||||
|
||||
messages = build_import_messages(analysis_request)
|
||||
|
||||
payload: Dict[str, Any] = {
|
||||
"provider": provider.value,
|
||||
"model": model_name,
|
||||
"messages": [message.model_dump() for message in messages],
|
||||
"temperature": request.temperature if request.temperature is not None else 0.2,
|
||||
}
|
||||
|
||||
if request.max_output_tokens is not None:
|
||||
payload["max_tokens"] = request.max_output_tokens
|
||||
|
||||
return payload
|
||||
|
||||
|
||||
def _extract_json_payload(content: str) -> str:
|
||||
"""Try to pull a JSON object from an LLM content string."""
|
||||
# Prefer fenced code blocks such as ```json { ... } ```
|
||||
fenced = re.search(r"```(?:json)?\s*(\{.*?\})\s*```", content, flags=re.DOTALL | re.IGNORECASE)
|
||||
if fenced:
|
||||
return fenced.group(1).strip()
|
||||
|
||||
stripped = content.strip()
|
||||
if stripped.startswith("{") and stripped.endswith("}"):
|
||||
return stripped
|
||||
|
||||
start = stripped.find("{")
|
||||
end = stripped.rfind("}")
|
||||
if start != -1 and end != -1 and end > start:
|
||||
return stripped[start : end + 1].strip()
|
||||
|
||||
return stripped
|
||||
|
||||
|
||||
def parse_llm_analysis_json(llm_response: LLMResponse) -> Dict[str, Any]:
|
||||
"""Extract and parse the structured JSON payload from an LLM response."""
|
||||
if not llm_response.choices:
|
||||
raise ProviderAPICallError("LLM response did not include any choices to parse.")
|
||||
|
||||
content = llm_response.choices[0].message.content or ""
|
||||
if not content.strip():
|
||||
raise ProviderAPICallError("LLM response content is empty.")
|
||||
|
||||
json_payload = _extract_json_payload(content)
|
||||
|
||||
try:
|
||||
return json.loads(json_payload)
|
||||
except json.JSONDecodeError as exc:
|
||||
preview = json_payload[:2000]
|
||||
logger.error("Failed to parse JSON from LLM response content: %s", preview, exc_info=True)
|
||||
raise ProviderAPICallError("LLM response JSON could not be parsed.") from exc
|
||||
|
||||
|
||||
async def dispatch_import_analysis_job(
|
||||
request: DataImportAnalysisJobRequest,
|
||||
client: httpx.AsyncClient,
|
||||
) -> Dict[str, Any]:
|
||||
logger.info("Starting import analysis job %s", request.import_record_id)
|
||||
|
||||
payload = build_chat_payload(request)
|
||||
url = f"{IMPORT_GATEWAY_BASE_URL.rstrip('/')}/v1/chat/completions"
|
||||
|
||||
logger.info(
|
||||
"Dispatching import %s to %s: %s",
|
||||
request.import_record_id,
|
||||
url,
|
||||
json.dumps(payload, ensure_ascii=False),
|
||||
)
|
||||
|
||||
timeout = httpx.Timeout(IMPORT_CHAT_TIMEOUT_SECONDS)
|
||||
|
||||
try:
|
||||
response = await client.post(url, json=payload, timeout=timeout)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPStatusError as exc:
|
||||
body_preview = ""
|
||||
if exc.response is not None:
|
||||
body_preview = exc.response.text[:400]
|
||||
raise ProviderAPICallError(
|
||||
f"Failed to invoke chat completions endpoint: {exc}. Response body: {body_preview}"
|
||||
) from exc
|
||||
except httpx.HTTPError as exc:
|
||||
raise ProviderAPICallError(
|
||||
f"HTTP call to chat completions endpoint failed: {exc}"
|
||||
) from exc
|
||||
|
||||
try:
|
||||
response_data = response.json()
|
||||
except ValueError as exc:
|
||||
raise ProviderAPICallError("Chat completions endpoint returned invalid JSON") from exc
|
||||
|
||||
logger.info(
|
||||
"LLM HTTP status for %s: %s",
|
||||
request.import_record_id,
|
||||
response.status_code,
|
||||
)
|
||||
logger.info(
|
||||
"LLM response for %s: %s",
|
||||
request.import_record_id,
|
||||
json.dumps(response_data, ensure_ascii=False),
|
||||
)
|
||||
|
||||
try:
|
||||
llm_response = LLMResponse.model_validate(response_data)
|
||||
except ValidationError as exc:
|
||||
raise ProviderAPICallError(
|
||||
"Chat completions endpoint returned unexpected schema"
|
||||
) from exc
|
||||
|
||||
structured_json = parse_llm_analysis_json(llm_response)
|
||||
usage_data = extract_usage(llm_response.raw)
|
||||
|
||||
logger.info("Completed import analysis job %s", request.import_record_id)
|
||||
result: Dict[str, Any] = {
|
||||
"import_record_id": request.import_record_id,
|
||||
"status": "succeeded",
|
||||
"llm_response": llm_response.model_dump(),
|
||||
"analysis": structured_json
|
||||
}
|
||||
|
||||
if usage_data:
|
||||
result["usage"] = usage_data
|
||||
|
||||
return result
|
||||
|
||||
# 兼容处理多模型的使用量字段提取
|
||||
def extract_usage(resp_json: dict) -> dict:
|
||||
usage = resp_json.get("usage") or resp_json.get("usageMetadata") or {}
|
||||
return {
|
||||
"prompt_tokens": usage.get("prompt_tokens") or usage.get("input_tokens") or usage.get("promptTokenCount"),
|
||||
"completion_tokens": usage.get("completion_tokens") or usage.get("output_tokens") or usage.get("candidatesTokenCount"),
|
||||
"total_tokens": usage.get("total_tokens") or usage.get("totalTokenCount") or (
|
||||
(usage.get("prompt_tokens") or usage.get("input_tokens") or 0)
|
||||
+ (usage.get("completion_tokens") or usage.get("output_tokens") or 0)
|
||||
)
|
||||
}
|
||||
|
||||
async def notify_import_analysis_callback(
|
||||
callback_url: str,
|
||||
payload: Dict[str, Any],
|
||||
client: httpx.AsyncClient,
|
||||
) -> None:
|
||||
callback_target = str(callback_url)
|
||||
logger.info(
|
||||
"Posting import analysis callback to %s: %s",
|
||||
callback_target,
|
||||
json.dumps(payload, ensure_ascii=False),
|
||||
)
|
||||
|
||||
|
||||
try:
|
||||
response = await client.post(callback_target, json=payload)
|
||||
response.raise_for_status()
|
||||
except httpx.HTTPError as exc:
|
||||
logger.error(
|
||||
"Failed to deliver import analysis callback to %s: %s",
|
||||
callback_target,
|
||||
exc,
|
||||
)
|
||||
|
||||
|
||||
async def process_import_analysis_job(
|
||||
request: DataImportAnalysisJobRequest,
|
||||
client: httpx.AsyncClient,
|
||||
) -> None:
|
||||
try:
|
||||
payload = await dispatch_import_analysis_job(request, client)
|
||||
except ProviderAPICallError as exc:
|
||||
logger.error(
|
||||
"LLM call failed for %s: %s",
|
||||
request.import_record_id,
|
||||
exc,
|
||||
)
|
||||
payload = {
|
||||
"import_record_id": request.import_record_id,
|
||||
"status": "failed",
|
||||
"error": str(exc),
|
||||
}
|
||||
except Exception as exc: # pragma: no cover - defensive logging
|
||||
logger.exception(
|
||||
"Unexpected failure while processing import analysis job %s",
|
||||
request.import_record_id,
|
||||
)
|
||||
payload = {
|
||||
"import_record_id": request.import_record_id,
|
||||
"status": "failed",
|
||||
"error": str(exc),
|
||||
}
|
||||
|
||||
await notify_import_analysis_callback(request.callback_url, payload, client)
|
||||
|
||||
39
app/settings.py
Normal file
39
app/settings.py
Normal file
@ -0,0 +1,39 @@
|
||||
from __future__ import annotations
|
||||
|
||||
import os
|
||||
from functools import lru_cache
|
||||
from typing import Dict, Set
|
||||
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
PROVIDER_KEY_ENV_MAP: Dict[str, str] = {
|
||||
"openai": "OPENAI_API_KEY",
|
||||
"anthropic": "ANTHROPIC_API_KEY",
|
||||
"openrouter": "OPENROUTER_API_KEY",
|
||||
"gemini": "GEMINI_API_KEY",
|
||||
"qwen": "QWEN_API_KEY",
|
||||
"deepseek": "DEEPSEEK_API_KEY",
|
||||
}
|
||||
|
||||
|
||||
DEFAULT_IMPORT_MODEL = os.getenv("DEFAULT_IMPORT_MODEL", "openai:gpt-4.1-mini")
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_supported_import_models() -> Set[str]:
|
||||
raw = os.getenv("IMPORT_SUPPORTED_MODELS", "")
|
||||
return {model.strip() for model in raw.split(",") if model.strip()}
|
||||
|
||||
|
||||
@lru_cache(maxsize=1)
|
||||
def get_available_provider_keys() -> Dict[str, str]:
|
||||
keys: Dict[str, str] = {}
|
||||
for provider, env_name in PROVIDER_KEY_ENV_MAP.items():
|
||||
value = os.getenv(env_name)
|
||||
if value:
|
||||
keys[provider] = value
|
||||
return keys
|
||||
41
deepseek-result.json
Normal file
41
deepseek-result.json
Normal file
@ -0,0 +1,41 @@
|
||||
{
|
||||
"provider": "deepseek",
|
||||
"model": "deepseek-chat",
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "```json\n{\n \"table_name\": \"national_brand_sales\",\n \"description\": \"全国品牌系统外销售数据\",\n \"columns\": [\n {\n \"original_name\": \"品牌\",\n \"standard_name\": \"brand\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(50)\",\n \"java_type\": \"string\",\n \"nullable\": true,\n \"distinct_count_sample\": 5,\n \"null_ratio_sample\": 0.4,\n \"is_enum_candidate\": false,\n \"description\": \"品牌名称\",\n \"date_format\": null\n },\n {\n \"original_name\": \"产品价类\",\n \"standard_name\": \"price_category\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(10)\",\n \"java_type\": \"string\",\n \"nullable\": false,\n \"distinct_count_sample\": 3,\n \"null_ratio_sample\": 0.0,\n \"is_enum_candidate\": true,\n \"description\": \"产品价格分类(一类/二类/三类)\",\n \"date_format\": null\n },\n {\n \"original_name\": \"是否重点品牌"
|
||||
}
|
||||
}
|
||||
],
|
||||
"raw": {
|
||||
"id": "67f3cc80-38bc-4bb7-b336-48d4886722c4",
|
||||
"object": "chat.completion",
|
||||
"created": 1761752207,
|
||||
"model": "deepseek-chat",
|
||||
"choices": [
|
||||
{
|
||||
"index": 0,
|
||||
"message": {
|
||||
"role": "assistant",
|
||||
"content": "```json\n{\n \"table_name\": \"national_brand_sales\",\n \"description\": \"全国品牌系统外销售数据\",\n \"columns\": [\n {\n \"original_name\": \"品牌\",\n \"standard_name\": \"brand\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(50)\",\n \"java_type\": \"string\",\n \"nullable\": true,\n \"distinct_count_sample\": 5,\n \"null_ratio_sample\": 0.4,\n \"is_enum_candidate\": false,\n \"description\": \"品牌名称\",\n \"date_format\": null\n },\n {\n \"original_name\": \"产品价类\",\n \"standard_name\": \"price_category\",\n \"data_type\": \"string\",\n \"db_type\": \"varchar(10)\",\n \"java_type\": \"string\",\n \"nullable\": false,\n \"distinct_count_sample\": 3,\n \"null_ratio_sample\": 0.0,\n \"is_enum_candidate\": true,\n \"description\": \"产品价格分类(一类/二类/三类)\",\n \"date_format\": null\n },\n {\n \"original_name\": \"是否重点品牌"
|
||||
},
|
||||
"logprobs": null,
|
||||
"finish_reason": "length"
|
||||
}
|
||||
],
|
||||
"usage": {
|
||||
"prompt_tokens": 1078,
|
||||
"completion_tokens": 256,
|
||||
"total_tokens": 1334,
|
||||
"prompt_tokens_details": {
|
||||
"cached_tokens": 1024
|
||||
},
|
||||
"prompt_cache_hit_tokens": 1024,
|
||||
"prompt_cache_miss_tokens": 54
|
||||
},
|
||||
"system_fingerprint": "fp_ffc7281d48_prod0820_fp8_kvcache"
|
||||
}
|
||||
}
|
||||
@ -2,7 +2,7 @@
|
||||
|
||||
任务目标:对提供的数据(含表头或table schema与若干行样本数据)进行解析,生成一份导入分析与处理报告,指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。
|
||||
|
||||
请从以下四个方向进行思考:
|
||||
请从以下两个方向进行思考:
|
||||
|
||||
方向 1:元数据识别与整理
|
||||
解析表明:根据表头、Origin Table Name、Orign File Name生成表名,表名需要有意义
|
||||
@ -12,7 +12,6 @@
|
||||
方向 2:字段数据类型与格式推断
|
||||
针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text)。
|
||||
说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。
|
||||
指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。
|
||||
给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime,适用于将样本数据转换成数据库表字段兼容的格式。
|
||||
若为“可能是枚举”的字段,列出候选枚举值及占比。
|
||||
|
||||
@ -23,12 +22,8 @@
|
||||
"columns": [{
|
||||
"original_name": "原始名称",
|
||||
"standard_name": "标准化后的名称: 下划线命名,大小写字母、数字、下划线",
|
||||
"data_type": "数据类型限制为:number/string/datetime",
|
||||
"db_type": "数据库字段类型",
|
||||
"java_type": "java字段类型限制为: int/long/double/string/date",
|
||||
"data_type": "",
|
||||
"nullable": true/false,
|
||||
"distinct_count_sample": number,
|
||||
"null_ratio_sample": 0.x,
|
||||
"is_enum_candidate": true/false,
|
||||
"description": "字段简短描述",
|
||||
"date_format": "转换成Date类型的pattern"
|
||||
@ -39,4 +34,4 @@
|
||||
|
||||
若信息不足,请显式指出“信息不足”并给出补充数据需求清单。
|
||||
避免武断结论,用“可能 / 候选 / 建议”字样。
|
||||
不要捏造样本未出现的值。
|
||||
不要捏造样本未出现的值。
|
||||
|
||||
Binary file not shown.
43
test/chat_completions_deepseek_example.py
Normal file
43
test/chat_completions_deepseek_example.py
Normal file
@ -0,0 +1,43 @@
|
||||
"""Demonstrates calling /v1/chat/completions with the DeepSeek provider."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
import json
|
||||
|
||||
import httpx
|
||||
from dotenv import load_dotenv
|
||||
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_URL = "http://localhost:8000/v1/chat/completions"
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
payload = {
|
||||
"provider": "deepseek",
|
||||
"model": "deepseek-chat",
|
||||
"messages": [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "角色:你是一名数据分析导入助手(Data Ingestion Analyst),擅长从原始数据抽取结构化元数据、推断字段类型、识别维度/事实属性,并输出导入建模建议(Table + JSON)。\n\n任务目标:对提供的数据(含表头或table schema与若干行样本数据)进行解析,生成一份导入分析与处理报告,指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。\n\n请从以下两个方向进行思考:\n\n方向 1:元数据识别与整理\n解析表明:根据表头、Origin Table Name、Orign File Name生成表名,表名需要有意义\n解析列名:生成标准化字段名(snake_case 或小驼峰),并给出原始列名与标准字段名映射。\n为每个字段写出中文/英文注释(若无法确定,给出“待确认”并附可能解释)。\n\n方向 2:字段数据类型与格式推断\n针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text)。\n说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。\n指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。\n给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime,适用于将样本数据转换成数据库表字段兼容的格式。\n若为“可能是枚举”的字段,列出候选枚举值及占比。\n\n最终内容都输出为一个json对象,格式为(字段级与表级定义),字段含:\n{\n \"table_name\": \"标准化后的表名\",\n \"description\": \"表简短描述\",\n \"columns\": [{\n \"original_name\": \"原始名称\",\n \"standard_name\": \"标准化后的名称: 下划线命名,大小写字母、数字、下划线\",\n \"data_type\": \"数据类型限制为:number/string/datetime\",\n \"db_type\": \"数据库字段类型\",\n \"java_type\": \"java字段类型限制为: int/long/double/string/date\",\n \"nullable\": true/false,\n \"distinct_count_sample\": number,\n \"null_ratio_sample\": 0.x,\n \"is_enum_candidate\": true/false,\n \"description\": \"字段简短描述\",\n \"date_format\": \"转换成Date类型的pattern\"\n }]\n}\n\n约束与风格:\n\n若信息不足,请显式指出“信息不足”并给出补充数据需求清单。\n避免武断结论,用“可能 / 候选 / 建议”字样。\n不要捏造样本未出现的值。"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "导入记录ID: demo-import-001\n\n表头信息:\n- 品牌\n- 产品价类\n- 是否重点品牌\n- 系统外销售量(箱)\n- 系统外销售金额(万元)\n- 同期系统外销售量(箱)\n- 同期系统外销售金额(万元)\n\n示例数据:\nCSV样本预览:\n品牌,产品价类,是否重点品牌,系统外销售量(箱),系统外销售金额(万元),同期系统外销售量(箱),同期系统外销售金额(万元)\r\n白沙,一类,重点品牌,3332.406875,64283.5593333333,3123.693375,61821.7986666667\r\nnan,二类,重点品牌,1094.4707375,3859.69366666667,869.65725,3067.00966666667\r\nnan,三类,重点品牌,3965.0457375,8388.306,4401.6714875,8802.132\r\n宝岛,一类,否,39.934375,301.617666666667,30.5975,249.399666666667\r\n长白山,一类,重点品牌,2666.53775,12360.8306666667,1916.252,9051.672\r\nnan,二类,重点品牌,2359.910025,7671.26233333333,2335.2480875,7590.791\r\nnan,三类,重点品牌,1263.293875,2826.665,1590.750875,3503.083\r\n大前门,一类,否,81.5806875,343.721333333333,114.1179875,480.809333333333\r\nnan,三类,否,226.445225,319.975666666667,254.6595125,359.894\r\n大青山,二类,否,60.73525,209.415,60.2415,207.712666666667\n\n附加结构信息:\n{\n \"source\": \"excel\",\n \"file_name\": \"全国品牌.xlsx\",\n \"sheet_name\": \"Sheet1\"\n}"
|
||||
}
|
||||
],
|
||||
"temperature": 0.2,
|
||||
"max_tokens": 350,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
|
||||
response = await client.post(API_URL, json=payload)
|
||||
response.raise_for_status()
|
||||
data = response.json()
|
||||
print(json.dumps(data, ensure_ascii=False, indent=2))
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
49
test/data_import_analysis_example.py
Normal file
49
test/data_import_analysis_example.py
Normal file
@ -0,0 +1,49 @@
|
||||
"""Minimal example for hitting the /v1/import/analyze endpoint with Excel data."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
from pathlib import Path
|
||||
|
||||
import httpx
|
||||
import pandas as pd
|
||||
from dotenv import load_dotenv
|
||||
|
||||
load_dotenv()
|
||||
|
||||
API_URL = "http://localhost:8000/v1/import/analyze"
|
||||
CALLBACK_URL = "http://localhost:8000/__mock__/import-callback"
|
||||
EXCEL_PATH = Path(__file__).resolve().parents[1] / "file" / "全国品牌.xlsx"
|
||||
|
||||
|
||||
async def main() -> None:
|
||||
excel = pd.ExcelFile(EXCEL_PATH)
|
||||
sheet_name = excel.sheet_names[0]
|
||||
df = excel.parse(sheet_name)
|
||||
sampled = df.head(10)
|
||||
|
||||
rows = sampled.to_dict(orient="records")
|
||||
headers = [str(column) for column in sampled.columns]
|
||||
|
||||
payload = {
|
||||
"import_record_id": "demo-import-001",
|
||||
"rows": rows,
|
||||
"headers": headers,
|
||||
"table_schema": {
|
||||
"source": "excel",
|
||||
"file_name": EXCEL_PATH.name,
|
||||
"sheet_name": sheet_name,
|
||||
},
|
||||
"llm_model": "deepseek:deepseek-chat",
|
||||
"temperature": 0.2,
|
||||
"callback_url": CALLBACK_URL,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client:
|
||||
response = await client.post(API_URL, json=payload)
|
||||
print("Status:", response.status_code)
|
||||
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
61
test/openrouter_chat_example.py
Normal file
61
test/openrouter_chat_example.py
Normal file
@ -0,0 +1,61 @@
|
||||
"""Quick demo call against the unified chat endpoint using the OpenRouter provider."""
|
||||
|
||||
from __future__ import annotations
|
||||
|
||||
import asyncio
|
||||
|
||||
import httpx
|
||||
from dotenv import load_dotenv
|
||||
import json
|
||||
|
||||
load_dotenv()
|
||||
|
||||
|
||||
API_URL = "http://localhost:8000/v1/chat/completions"
|
||||
|
||||
def extract_schema(payload: dict) -> dict:
|
||||
content = payload["choices"][0]["message"]["content"]
|
||||
if content.startswith("```"):
|
||||
content = content.split("```json", 1)[-1]
|
||||
content = content.rsplit("```", 1)[0]
|
||||
return json.loads(content)
|
||||
|
||||
async def main() -> None:
|
||||
payload = {
|
||||
"provider": "openrouter",
|
||||
"model": "anthropic/claude-3.7-sonnet",
|
||||
"messages": [
|
||||
{
|
||||
"role": "system",
|
||||
"content": "角色:你是一名数据分析导入助手(Data Ingestion Analyst),擅长从原始数据抽取结构化元数据、推断字段类型、识别维度/事实属性,并输出导入建模建议(Table + JSON)。\n\n任务目标:对提供的数据(含表头或table schema与若干行样本数据)进行解析,生成一份导入分析与处理报告,指导如何将其导入为标准化表结构及 JSON 元数据定义,不要省略任何字段信息,全量输出。\n\n请从以下两个方向进行思考:\n\n方向 1:元数据识别与整理\n解析表明:根据表头、Origin Table Name、Orign File Name生成表名,表名需要有意义\n解析列名:生成标准化字段名(snake_case 或小驼峰),并给出原始列名与标准字段名映射。\n为每个字段写出中文/英文注释(若无法确定,给出“待确认”并附可能解释)。\n\n方向 2:字段数据类型与格式推断\n针对每列:输出推断数据类型(如 varchar(n) / int / bigint / tinyint / float / double / decimal(p,s) / date / datetime / text)。\n说明推断依据:样本值分布、长度范围、格式正则、是否存在空值、是否数值但含前导零等。\n指出数据质量初步观察:缺失率、是否有异常/离群值(简单规则即可)、是否需标准化(如去空格、去重、枚举值归一)。\n给出“建议处理动作”:如 trim、cast_float、cast_int、cast_double、cast_date、cast_time、cast_datetime,适用于将样本数据转换成数据库表字段兼容的格式。\n若为“可能是枚举”的字段,列出候选枚举值及占比。\n\n最终内容都输出为一个json对象,格式为(字段级与表级定义),字段含:\n{\n \"table_name\": \"标准化后的表名\",\n \"description\": \"表简短描述\",\n \"columns\": [{\n \"original_name\": \"原始名称\",\n \"standard_name\": \"标准化后的名称: 下划线命名,大小写字母、数字、下划线\",\n \"data_type\": \"数据类型限制为:number/string/datetime\",\n \"db_type\": \"数据库字段类型\",\n \"java_type\": \"java字段类型限制为: int/long/double/string/date\",\n \"nullable\": true/false,\n \"distinct_count_sample\": number,\n \"null_ratio_sample\": 0.x,\n \"is_enum_candidate\": true/false,\n \"description\": \"字段简短描述\",\n \"date_format\": \"转换成Date类型的pattern\"\n }]\n}\n\n约束与风格:\n\n若信息不足,请显式指出“信息不足”并给出补充数据需求清单。\n避免武断结论,用“可能 / 候选 / 建议”字样。\n不要捏造样本未出现的值。"
|
||||
},
|
||||
{
|
||||
"role": "user",
|
||||
"content": "导入记录ID: demo-import-001\n\n表头信息:\n- 品牌\n- 产品价类\n- 是否重点品牌\n- 系统外销售量(箱)\n- 系统外销售金额(万元)\n- 同期系统外销售量(箱)\n- 同期系统外销售金额(万元)\n\n示例数据:\nCSV样本预览:\n品牌,产品价类,是否重点品牌,系统外销售量(箱),系统外销售金额(万元),同期系统外销售量(箱),同期系统外销售金额(万元)\r\n白沙,一类,重点品牌,3332.406875,64283.5593333333,3123.693375,61821.7986666667\r\nnan,二类,重点品牌,1094.4707375,3859.69366666667,869.65725,3067.00966666667\r\nnan,三类,重点品牌,3965.0457375,8388.306,4401.6714875,8802.132\r\n宝岛,一类,否,39.934375,301.617666666667,30.5975,249.399666666667\r\n长白山,一类,重点品牌,2666.53775,12360.8306666667,1916.252,9051.672\r\nnan,二类,重点品牌,2359.910025,7671.26233333333,2335.2480875,7590.791\r\nnan,三类,重点品牌,1263.293875,2826.665,1590.750875,3503.083\r\n大前门,一类,否,81.5806875,343.721333333333,114.1179875,480.809333333333\r\nnan,三类,否,226.445225,319.975666666667,254.6595125,359.894\r\n大青山,二类,否,60.73525,209.415,60.2415,207.712666666667\n\n附加结构信息:\n{\n \"source\": \"excel\",\n \"file_name\": \"全国品牌.xlsx\",\n \"sheet_name\": \"Sheet1\"\n}"
|
||||
}
|
||||
],
|
||||
"temperature": 0.1,
|
||||
"max_tokens": 2048,
|
||||
}
|
||||
|
||||
async with httpx.AsyncClient(timeout=httpx.Timeout(15.0)) as client:
|
||||
response = await client.post(API_URL, json=payload)
|
||||
print("Status:", response.status_code)
|
||||
try:
|
||||
data = response.json()
|
||||
except ValueError:
|
||||
print("Raw Body:", response.text)
|
||||
return
|
||||
|
||||
print("Body:", data)
|
||||
try:
|
||||
schema = extract_schema(data)
|
||||
print("Table name:", schema.get("table_name"))
|
||||
except (KeyError, json.JSONDecodeError) as exc:
|
||||
print("Failed to parse schema:", exc)
|
||||
|
||||
|
||||
if __name__ == "__main__":
|
||||
asyncio.run(main())
|
||||
|
||||
|
||||
Reference in New Issue
Block a user