commit 0af5f19af972e4d7419f093d00b4d0407ead4f71 Author: zhaoawd Date: Wed Oct 29 00:38:57 2025 +0800 init,llm gateway & import_analyse diff --git a/.gitignore b/.gitignore new file mode 100644 index 0000000..afedc66 --- /dev/null +++ b/.gitignore @@ -0,0 +1,3 @@ +.venv +gx/uncommitted/ +.vscode/ \ No newline at end of file diff --git a/app/__init__.py b/app/__init__.py new file mode 100644 index 0000000..813f14c --- /dev/null +++ b/app/__init__.py @@ -0,0 +1,3 @@ +from .main import create_app + +__all__ = ["create_app"] diff --git a/app/__pycache__/__init__.cpython-311.pyc b/app/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..976aa34 Binary files /dev/null and b/app/__pycache__/__init__.cpython-311.pyc differ diff --git a/app/__pycache__/__init__.cpython-312.pyc b/app/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..2ae729e Binary files /dev/null and b/app/__pycache__/__init__.cpython-312.pyc differ diff --git a/app/__pycache__/exceptions.cpython-311.pyc b/app/__pycache__/exceptions.cpython-311.pyc new file mode 100644 index 0000000..b9d2a6f Binary files /dev/null and b/app/__pycache__/exceptions.cpython-311.pyc differ diff --git a/app/__pycache__/exceptions.cpython-312.pyc b/app/__pycache__/exceptions.cpython-312.pyc new file mode 100644 index 0000000..705d5f0 Binary files /dev/null and b/app/__pycache__/exceptions.cpython-312.pyc differ diff --git a/app/__pycache__/main.cpython-311.pyc b/app/__pycache__/main.cpython-311.pyc new file mode 100644 index 0000000..3341eec Binary files /dev/null and b/app/__pycache__/main.cpython-311.pyc differ diff --git a/app/__pycache__/main.cpython-312.pyc b/app/__pycache__/main.cpython-312.pyc new file mode 100644 index 0000000..649383e Binary files /dev/null and b/app/__pycache__/main.cpython-312.pyc differ diff --git a/app/__pycache__/models.cpython-311.pyc b/app/__pycache__/models.cpython-311.pyc new file mode 100644 index 0000000..cad4eb9 Binary files /dev/null and b/app/__pycache__/models.cpython-311.pyc differ diff --git a/app/__pycache__/models.cpython-312.pyc b/app/__pycache__/models.cpython-312.pyc new file mode 100644 index 0000000..a13c6bb Binary files /dev/null and b/app/__pycache__/models.cpython-312.pyc differ diff --git a/app/exceptions.py b/app/exceptions.py new file mode 100644 index 0000000..5ea8ff0 --- /dev/null +++ b/app/exceptions.py @@ -0,0 +1,6 @@ +class ProviderConfigurationError(RuntimeError): + """Raised when a provider is missing required configuration.""" + + +class ProviderAPICallError(RuntimeError): + """Raised when the upstream provider responds with an error.""" diff --git a/app/main.py b/app/main.py new file mode 100644 index 0000000..c261879 --- /dev/null +++ b/app/main.py @@ -0,0 +1,103 @@ +from __future__ import annotations + +from contextlib import asynccontextmanager + +import httpx +from fastapi import Depends, FastAPI, HTTPException, Request + +from app.exceptions import ProviderAPICallError, ProviderConfigurationError +from app.models import ( + DataImportAnalysisRequest, + DataImportAnalysisResponse, + LLMRequest, + LLMResponse, +) +from app.services import LLMGateway +from app.services.import_analysis import build_import_messages, resolve_provider_from_model + + +@asynccontextmanager +async def lifespan(app: FastAPI): + client = httpx.AsyncClient(timeout=httpx.Timeout(30.0)) + gateway = LLMGateway() + try: + app.state.http_client = client # type: ignore[attr-defined] + app.state.gateway = gateway # type: ignore[attr-defined] + yield + finally: + await client.aclose() + + +def create_app() -> FastAPI: + application = FastAPI( + title="Unified LLM Gateway", + version="0.1.0", + lifespan=lifespan, + ) + + @application.post( + "/v1/chat/completions", + response_model=LLMResponse, + summary="Dispatch chat completion to upstream provider", + ) + async def create_chat_completion( + payload: LLMRequest, + gateway: LLMGateway = Depends(get_gateway), + client: httpx.AsyncClient = Depends(get_http_client), + ) -> LLMResponse: + try: + return await gateway.chat(payload, client) + except ProviderConfigurationError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + except ProviderAPICallError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + @application.post( + "/v1/import/analyze", + response_model=DataImportAnalysisResponse, + summary="Analyze import sample data via configured LLM", + ) + async def analyze_import_data( + payload: DataImportAnalysisRequest, + gateway: LLMGateway = Depends(get_gateway), + client: httpx.AsyncClient = Depends(get_http_client), + ) -> DataImportAnalysisResponse: + try: + provider, model_name = resolve_provider_from_model(payload.llm_model) + except ValueError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + + messages = build_import_messages(payload) + + llm_request = LLMRequest( + provider=provider, + model=model_name, + messages=messages, + temperature=payload.temperature if payload.temperature is not None else 0.2, + max_tokens=payload.max_tokens, + ) + + try: + llm_response = await gateway.chat(llm_request, client) + except ProviderConfigurationError as exc: + raise HTTPException(status_code=422, detail=str(exc)) from exc + except ProviderAPICallError as exc: + raise HTTPException(status_code=502, detail=str(exc)) from exc + + return DataImportAnalysisResponse( + import_record_id=payload.import_record_id, + llm_response=llm_response, + ) + + return application + + +async def get_gateway(request: Request) -> LLMGateway: + return request.app.state.gateway # type: ignore[return-value, attr-defined] + + +async def get_http_client(request: Request) -> httpx.AsyncClient: + return request.app.state.http_client # type: ignore[return-value, attr-defined] + + +app = create_app() diff --git a/app/models.py b/app/models.py new file mode 100644 index 0000000..401df11 --- /dev/null +++ b/app/models.py @@ -0,0 +1,92 @@ +from __future__ import annotations + +from enum import Enum +from typing import Any, List, Optional + +from pydantic import BaseModel, Field + + +class LLMRole(str, Enum): + USER = "user" + ASSISTANT = "assistant" + SYSTEM = "system" + + +class LLMMessage(BaseModel): + role: LLMRole = Field(..., description="Message author role.") + content: str = Field(..., description="Plain text content of the message.") + + +class LLMProvider(str, Enum): + OPENAI = "openai" + ANTHROPIC = "anthropic" + OPENROUTER = "openrouter" + GEMINI = "gemini" + QWEN = "qwen" + DEEPSEEK = "deepseek" + + +class LLMRequest(BaseModel): + provider: LLMProvider = Field(..., description="Target LLM provider identifier.") + model: str = Field(..., description="Model name understood by the provider.") + messages: List[LLMMessage] = Field(..., description="Ordered chat messages.") + temperature: Optional[float] = Field( + 0.7, description="Sampling temperature when supported." + ) + top_p: Optional[float] = Field( + None, description="Top-p nucleus sampling when supported." + ) + max_tokens: Optional[int] = Field( + None, description="Maximum tokens to generate when supported." + ) + stream: Optional[bool] = Field( + False, description="Enable provider streaming if both sides support it." + ) + extra_params: Optional[dict[str, Any]] = Field( + None, description="Provider-specific parameters to merge into the payload." + ) + + +class LLMChoice(BaseModel): + index: int + message: LLMMessage + + +class LLMResponse(BaseModel): + provider: LLMProvider + model: str + choices: List[LLMChoice] + raw: Optional[dict[str, Any]] = Field( + None, description="Raw provider response for debugging." + ) + + +class DataImportAnalysisRequest(BaseModel): + import_record_id: str = Field(..., description="Unique identifier for this import run.") + example_data: str = Field( + ..., + max_length=30_000, + description="Sample rows from the import payload. Limited to 30k characters.", + ) + table_headers: List[str] = Field( + ..., + min_length=1, + description="Ordered list of table headers associated with the data.", + ) + llm_model: str = Field( + ..., + description="Model identifier. Accepts 'provider:model' format or plain model name.", + ) + temperature: Optional[float] = Field( + None, + description="Optional override for LLM temperature when generating recognition output.", + ) + max_tokens: Optional[int] = Field( + None, + description="Optional override for maximum tokens generated during recognition.", + ) + + +class DataImportAnalysisResponse(BaseModel): + import_record_id: str + llm_response: LLMResponse diff --git a/app/providers/__init__.py b/app/providers/__init__.py new file mode 100644 index 0000000..c5cf057 --- /dev/null +++ b/app/providers/__init__.py @@ -0,0 +1,17 @@ +from .anthropic import AnthropicProvider +from .base import LLMProviderClient +from .deepseek import DeepSeekProvider +from .gemini import GeminiProvider +from .openai import OpenAIProvider +from .openrouter import OpenRouterProvider +from .qwen import QwenProvider + +__all__ = [ + "LLMProviderClient", + "OpenAIProvider", + "AnthropicProvider", + "OpenRouterProvider", + "GeminiProvider", + "QwenProvider", + "DeepSeekProvider", +] diff --git a/app/providers/__pycache__/__init__.cpython-311.pyc b/app/providers/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..4c12767 Binary files /dev/null and b/app/providers/__pycache__/__init__.cpython-311.pyc differ diff --git a/app/providers/__pycache__/__init__.cpython-312.pyc b/app/providers/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..4ba65f4 Binary files /dev/null and b/app/providers/__pycache__/__init__.cpython-312.pyc differ diff --git a/app/providers/__pycache__/anthropic.cpython-311.pyc b/app/providers/__pycache__/anthropic.cpython-311.pyc new file mode 100644 index 0000000..de934d4 Binary files /dev/null and b/app/providers/__pycache__/anthropic.cpython-311.pyc differ diff --git a/app/providers/__pycache__/anthropic.cpython-312.pyc b/app/providers/__pycache__/anthropic.cpython-312.pyc new file mode 100644 index 0000000..5f150c0 Binary files /dev/null and b/app/providers/__pycache__/anthropic.cpython-312.pyc differ diff --git a/app/providers/__pycache__/base.cpython-311.pyc b/app/providers/__pycache__/base.cpython-311.pyc new file mode 100644 index 0000000..649358f Binary files /dev/null and b/app/providers/__pycache__/base.cpython-311.pyc differ diff --git a/app/providers/__pycache__/base.cpython-312.pyc b/app/providers/__pycache__/base.cpython-312.pyc new file mode 100644 index 0000000..df5c088 Binary files /dev/null and b/app/providers/__pycache__/base.cpython-312.pyc differ diff --git a/app/providers/__pycache__/deepseek.cpython-311.pyc b/app/providers/__pycache__/deepseek.cpython-311.pyc new file mode 100644 index 0000000..c2fa3ef Binary files /dev/null and b/app/providers/__pycache__/deepseek.cpython-311.pyc differ diff --git a/app/providers/__pycache__/deepseek.cpython-312.pyc b/app/providers/__pycache__/deepseek.cpython-312.pyc new file mode 100644 index 0000000..01adbde Binary files /dev/null and b/app/providers/__pycache__/deepseek.cpython-312.pyc differ diff --git a/app/providers/__pycache__/gemini.cpython-311.pyc b/app/providers/__pycache__/gemini.cpython-311.pyc new file mode 100644 index 0000000..efc7507 Binary files /dev/null and b/app/providers/__pycache__/gemini.cpython-311.pyc differ diff --git a/app/providers/__pycache__/gemini.cpython-312.pyc b/app/providers/__pycache__/gemini.cpython-312.pyc new file mode 100644 index 0000000..e1bf078 Binary files /dev/null and b/app/providers/__pycache__/gemini.cpython-312.pyc differ diff --git a/app/providers/__pycache__/openai.cpython-311.pyc b/app/providers/__pycache__/openai.cpython-311.pyc new file mode 100644 index 0000000..477cc57 Binary files /dev/null and b/app/providers/__pycache__/openai.cpython-311.pyc differ diff --git a/app/providers/__pycache__/openai.cpython-312.pyc b/app/providers/__pycache__/openai.cpython-312.pyc new file mode 100644 index 0000000..24c222b Binary files /dev/null and b/app/providers/__pycache__/openai.cpython-312.pyc differ diff --git a/app/providers/__pycache__/openrouter.cpython-311.pyc b/app/providers/__pycache__/openrouter.cpython-311.pyc new file mode 100644 index 0000000..c1de310 Binary files /dev/null and b/app/providers/__pycache__/openrouter.cpython-311.pyc differ diff --git a/app/providers/__pycache__/openrouter.cpython-312.pyc b/app/providers/__pycache__/openrouter.cpython-312.pyc new file mode 100644 index 0000000..6766306 Binary files /dev/null and b/app/providers/__pycache__/openrouter.cpython-312.pyc differ diff --git a/app/providers/__pycache__/qwen.cpython-311.pyc b/app/providers/__pycache__/qwen.cpython-311.pyc new file mode 100644 index 0000000..f1ec8a7 Binary files /dev/null and b/app/providers/__pycache__/qwen.cpython-311.pyc differ diff --git a/app/providers/__pycache__/qwen.cpython-312.pyc b/app/providers/__pycache__/qwen.cpython-312.pyc new file mode 100644 index 0000000..638d542 Binary files /dev/null and b/app/providers/__pycache__/qwen.cpython-312.pyc differ diff --git a/app/providers/anthropic.py b/app/providers/anthropic.py new file mode 100644 index 0000000..3cc73fa --- /dev/null +++ b/app/providers/anthropic.py @@ -0,0 +1,97 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Tuple + +import httpx + +from app.exceptions import ProviderAPICallError +from app.models import ( + LLMChoice, + LLMMessage, + LLMProvider, + LLMRequest, + LLMResponse, + LLMRole, +) +from app.providers.base import LLMProviderClient + + +class AnthropicProvider(LLMProviderClient): + name = LLMProvider.ANTHROPIC.value + api_key_env = "ANTHROPIC_API_KEY" + base_url = "https://api.anthropic.com/v1/messages" + anthropic_version = "2023-06-01" + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + self.ensure_stream_supported(request.stream) + + system_prompt, chat_messages = self._convert_messages(request.messages) + + payload = self.merge_payload( + { + "model": request.model, + "messages": chat_messages, + "max_tokens": request.max_tokens or 1024, + "temperature": request.temperature, + "top_p": request.top_p, + }, + request.extra_params, + ) + + if system_prompt: + payload["system"] = system_prompt + + headers = { + "x-api-key": self.api_key, + "anthropic-version": self.anthropic_version, + "content-type": "application/json", + } + + try: + response = await client.post(self.base_url, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPError as exc: + raise ProviderAPICallError(f"Anthropic request failed: {exc}") from exc + + data: Dict[str, Any] = response.json() + message = self._build_message(data) + return LLMResponse( + provider=LLMProvider.ANTHROPIC, + model=data.get("model", request.model), + choices=[LLMChoice(index=0, message=message)], + raw=data, + ) + + @staticmethod + def _convert_messages( + messages: List[LLMMessage], + ) -> Tuple[str | None, List[dict[str, Any]]]: + system_parts: List[str] = [] + chat_payload: List[dict[str, Any]] = [] + + for msg in messages: + if msg.role == LLMRole.SYSTEM: + system_parts.append(msg.content) + continue + + role = "user" if msg.role == LLMRole.USER else "assistant" + chat_payload.append( + {"role": role, "content": [{"type": "text", "text": msg.content}]} + ) + + system_prompt = "\n\n".join(system_parts) if system_parts else None + return system_prompt, chat_payload + + @staticmethod + def _build_message(data: Dict[str, Any]) -> LLMMessage: + role = data.get("role", "assistant") + content_blocks = data.get("content", []) + text_parts = [ + block.get("text", "") + for block in content_blocks + if isinstance(block, dict) and block.get("type") == "text" + ] + content = "\n\n".join(part for part in text_parts if part) + return LLMMessage(role=role, content=content) diff --git a/app/providers/base.py b/app/providers/base.py new file mode 100644 index 0000000..7e86cdd --- /dev/null +++ b/app/providers/base.py @@ -0,0 +1,44 @@ +from __future__ import annotations + +from abc import ABC, abstractmethod +from typing import Any + +import httpx + +from app.exceptions import ProviderConfigurationError +from app.models import LLMRequest, LLMResponse + + +class LLMProviderClient(ABC): + """Base class for provider-specific chat completion clients.""" + + name: str + api_key_env: str | None = None + supports_stream: bool = False + + def __init__(self, api_key: str | None): + if self.api_key_env and not api_key: + raise ProviderConfigurationError( + f"Provider '{self.name}' requires environment variable '{self.api_key_env}'." + ) + self.api_key = api_key or "" + + @abstractmethod + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + """Execute a chat completion call.""" + + @staticmethod + def merge_payload(base: dict[str, Any], extra: dict[str, Any] | None) -> dict[str, Any]: + """Merge provider payload with optional extra params, ignoring None values.""" + merged = {k: v for k, v in base.items() if v is not None} + if extra: + merged.update({k: v for k, v in extra.items() if v is not None}) + return merged + + def ensure_stream_supported(self, stream_requested: bool) -> None: + if stream_requested and not self.supports_stream: + raise ProviderConfigurationError( + f"Provider '{self.name}' does not support streaming mode." + ) diff --git a/app/providers/deepseek.py b/app/providers/deepseek.py new file mode 100644 index 0000000..d51e13c --- /dev/null +++ b/app/providers/deepseek.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from typing import Any, Dict, List + +import httpx + +from app.exceptions import ProviderAPICallError +from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMResponse +from app.providers.base import LLMProviderClient + + +class DeepSeekProvider(LLMProviderClient): + name = LLMProvider.DEEPSEEK.value + api_key_env = "DEEPSEEK_API_KEY" + supports_stream = True + base_url = "https://api.deepseek.com/v1/chat/completions" + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + self.ensure_stream_supported(request.stream) + + payload = self.merge_payload( + { + "model": request.model, + "messages": [msg.model_dump() for msg in request.messages], + "temperature": request.temperature, + "top_p": request.top_p, + "max_tokens": request.max_tokens, + "stream": request.stream, + }, + request.extra_params, + ) + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + try: + response = await client.post(self.base_url, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPError as exc: + raise ProviderAPICallError(f"DeepSeek request failed: {exc}") from exc + + data: Dict[str, Any] = response.json() + choices = self._build_choices(data.get("choices", [])) + + return LLMResponse( + provider=LLMProvider.DEEPSEEK, + model=data.get("model", request.model), + choices=choices, + raw=data, + ) + + @staticmethod + def _build_choices(choices: List[dict[str, Any]]) -> List[LLMChoice]: + built: List[LLMChoice] = [] + for choice in choices: + message_data = choice.get("message") or {} + message = LLMMessage( + role=message_data.get("role", "assistant"), + content=message_data.get("content", ""), + ) + built.append(LLMChoice(index=choice.get("index", len(built)), message=message)) + return built diff --git a/app/providers/gemini.py b/app/providers/gemini.py new file mode 100644 index 0000000..ed9b2ac --- /dev/null +++ b/app/providers/gemini.py @@ -0,0 +1,112 @@ +from __future__ import annotations + +from typing import Any, Dict, List, Tuple + +import httpx + +from app.exceptions import ProviderAPICallError +from app.models import ( + LLMChoice, + LLMMessage, + LLMProvider, + LLMRequest, + LLMResponse, + LLMRole, +) +from app.providers.base import LLMProviderClient + + +class GeminiProvider(LLMProviderClient): + name = LLMProvider.GEMINI.value + api_key_env = "GEMINI_API_KEY" + base_url = "https://generativelanguage.googleapis.com/v1beta" + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + self.ensure_stream_supported(request.stream) + + system_instruction, contents = self._convert_messages(request.messages) + config = { + "temperature": request.temperature, + "topP": request.top_p, + "maxOutputTokens": request.max_tokens, + } + + payload: Dict[str, Any] = self.merge_payload( + {"contents": contents}, request.extra_params + ) + + generation_config = {k: v for k, v in config.items() if v is not None} + if generation_config: + payload["generationConfig"] = generation_config + if system_instruction: + payload["systemInstruction"] = { + "role": "system", + "parts": [{"text": system_instruction}], + } + + endpoint = f"{self.base_url}/models/{request.model}:generateContent?key={self.api_key}" + + headers = {"Content-Type": "application/json"} + + try: + response = await client.post(endpoint, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPError as exc: + raise ProviderAPICallError(f"Gemini request failed: {exc}") from exc + + data: Dict[str, Any] = response.json() + choices = self._build_choices(data.get("candidates", [])) + + return LLMResponse( + provider=LLMProvider.GEMINI, + model=request.model, + choices=choices, + raw=data, + ) + + @staticmethod + def _convert_messages( + messages: List[LLMMessage], + ) -> Tuple[str | None, List[dict[str, Any]]]: + system_parts: List[str] = [] + contents: List[dict[str, Any]] = [] + + for msg in messages: + if msg.role == LLMRole.SYSTEM: + system_parts.append(msg.content) + continue + + role = "user" if msg.role == LLMRole.USER else "model" + contents.append({"role": role, "parts": [{"text": msg.content}]}) + + system_instruction = "\n\n".join(system_parts) if system_parts else None + return system_instruction, contents + + @staticmethod + def _build_choices(candidates: List[dict[str, Any]]) -> List[LLMChoice]: + choices: List[LLMChoice] = [] + for idx, candidate in enumerate(candidates): + content = candidate.get("content", {}) + parts = content.get("parts", []) + text_parts = [ + part.get("text", "") + for part in parts + if isinstance(part, dict) and part.get("text") + ] + text = "\n\n".join(text_parts) + choices.append( + LLMChoice( + index=candidate.get("index", idx), + message=LLMMessage(role="assistant", content=text), + ) + ) + if not choices: + choices.append( + LLMChoice( + index=0, + message=LLMMessage(role="assistant", content=""), + ) + ) + return choices diff --git a/app/providers/openai.py b/app/providers/openai.py new file mode 100644 index 0000000..53e2dda --- /dev/null +++ b/app/providers/openai.py @@ -0,0 +1,66 @@ +from __future__ import annotations + +from typing import Any, Dict, List + +import httpx + +from app.exceptions import ProviderAPICallError +from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMResponse +from app.providers.base import LLMProviderClient + + +class OpenAIProvider(LLMProviderClient): + name = LLMProvider.OPENAI.value + api_key_env = "OPENAI_API_KEY" + supports_stream = True + base_url = "https://api.openai.com/v1/chat/completions" + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + self.ensure_stream_supported(request.stream) + + payload = self.merge_payload( + { + "model": request.model, + "messages": [msg.model_dump() for msg in request.messages], + "temperature": request.temperature, + "top_p": request.top_p, + "max_tokens": request.max_tokens, + "stream": request.stream, + }, + request.extra_params, + ) + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + try: + response = await client.post(self.base_url, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPError as exc: + raise ProviderAPICallError(f"OpenAI request failed: {exc}") from exc + + data: Dict[str, Any] = response.json() + choices = self._build_choices(data.get("choices", [])) + + return LLMResponse( + provider=LLMProvider.OPENAI, + model=data.get("model", request.model), + choices=choices, + raw=data, + ) + + @staticmethod + def _build_choices(choices: List[dict[str, Any]]) -> List[LLMChoice]: + built: List[LLMChoice] = [] + for choice in choices: + message_data = choice.get("message") or {} + message = LLMMessage( + role=message_data.get("role", "assistant"), # fallback to assistant + content=message_data.get("content", ""), + ) + built.append(LLMChoice(index=choice.get("index", len(built)), message=message)) + return built diff --git a/app/providers/openrouter.py b/app/providers/openrouter.py new file mode 100644 index 0000000..5578962 --- /dev/null +++ b/app/providers/openrouter.py @@ -0,0 +1,77 @@ +from __future__ import annotations + +import os +from typing import Any, Dict, List + +import httpx + +from app.exceptions import ProviderAPICallError +from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMResponse +from app.providers.base import LLMProviderClient + + +class OpenRouterProvider(LLMProviderClient): + name = LLMProvider.OPENROUTER.value + api_key_env = "OPENROUTER_API_KEY" + supports_stream = True + base_url = "https://openrouter.ai/api/v1/chat/completions" + + def __init__(self, api_key: str | None): + super().__init__(api_key) + self.site_url = os.getenv("OPENROUTER_SITE_URL") + self.app_name = os.getenv("OPENROUTER_APP_NAME") + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + self.ensure_stream_supported(request.stream) + + payload = self.merge_payload( + { + "model": request.model, + "messages": [msg.model_dump() for msg in request.messages], + "temperature": request.temperature, + "top_p": request.top_p, + "max_tokens": request.max_tokens, + "stream": request.stream, + }, + request.extra_params, + ) + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + if self.site_url: + headers["HTTP-Referer"] = self.site_url + if self.app_name: + headers["X-Title"] = self.app_name + + try: + response = await client.post(self.base_url, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPError as exc: + raise ProviderAPICallError(f"OpenRouter request failed: {exc}") from exc + + data: Dict[str, Any] = response.json() + choices = self._build_choices(data.get("choices", [])) + + return LLMResponse( + provider=LLMProvider.OPENROUTER, + model=data.get("model", request.model), + choices=choices, + raw=data, + ) + + @staticmethod + def _build_choices(choices: List[dict[str, Any]]) -> List[LLMChoice]: + built: List[LLMChoice] = [] + for choice in choices: + message_data = choice.get("message") or {} + message = LLMMessage( + role=message_data.get("role", "assistant"), + content=message_data.get("content", ""), + ) + built.append(LLMChoice(index=choice.get("index", len(built)), message=message)) + return built diff --git a/app/providers/qwen.py b/app/providers/qwen.py new file mode 100644 index 0000000..c2e7f1f --- /dev/null +++ b/app/providers/qwen.py @@ -0,0 +1,87 @@ +from __future__ import annotations + +from typing import Any, Dict, List + +import httpx + +from app.exceptions import ProviderAPICallError +from app.models import LLMChoice, LLMMessage, LLMProvider, LLMRequest, LLMResponse +from app.providers.base import LLMProviderClient + + +class QwenProvider(LLMProviderClient): + name = LLMProvider.QWEN.value + api_key_env = "QWEN_API_KEY" + base_url = ( + "https://dashscope.aliyuncs.com/api/v1/services/aigc/text-generation/generation" + ) + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + self.ensure_stream_supported(request.stream) + + parameters = { + "temperature": request.temperature, + "top_p": request.top_p, + } + if request.max_tokens is not None: + parameters["max_output_tokens"] = request.max_tokens + + # Strip None values from parameters + parameters = {k: v for k, v in parameters.items() if v is not None} + + payload: Dict[str, Any] = { + "model": request.model, + "input": {"messages": [msg.model_dump() for msg in request.messages]}, + } + if parameters: + payload["parameters"] = parameters + + payload = self.merge_payload(payload, request.extra_params) + + headers = { + "Authorization": f"Bearer {self.api_key}", + "Content-Type": "application/json", + } + + try: + response = await client.post(self.base_url, json=payload, headers=headers) + response.raise_for_status() + except httpx.HTTPError as exc: + raise ProviderAPICallError(f"Qwen request failed: {exc}") from exc + + data: Dict[str, Any] = response.json() + choices = self._build_choices(data.get("output", {})) + + return LLMResponse( + provider=LLMProvider.QWEN, + model=request.model, + choices=choices, + raw=data, + ) + + @staticmethod + def _build_choices(output: Dict[str, Any]) -> List[LLMChoice]: + choices_payload = output.get("choices", []) + if not choices_payload and output.get("text"): + return [ + LLMChoice( + index=0, + message=LLMMessage(role="assistant", content=output["text"]), + ) + ] + + choices: List[LLMChoice] = [] + for idx, choice in enumerate(choices_payload): + message_data = choice.get("message") or {} + message = LLMMessage( + role=message_data.get("role", "assistant"), + content=message_data.get("content", ""), + ) + choices.append(LLMChoice(index=choice.get("index", idx), message=message)) + if not choices: + choices.append( + LLMChoice(index=0, message=LLMMessage(role="assistant", content="")) + ) + return choices diff --git a/app/services/__init__.py b/app/services/__init__.py new file mode 100644 index 0000000..849697d --- /dev/null +++ b/app/services/__init__.py @@ -0,0 +1,3 @@ +from .gateway import LLMGateway + +__all__ = ["LLMGateway"] diff --git a/app/services/__pycache__/__init__.cpython-311.pyc b/app/services/__pycache__/__init__.cpython-311.pyc new file mode 100644 index 0000000..38d21f5 Binary files /dev/null and b/app/services/__pycache__/__init__.cpython-311.pyc differ diff --git a/app/services/__pycache__/__init__.cpython-312.pyc b/app/services/__pycache__/__init__.cpython-312.pyc new file mode 100644 index 0000000..659873f Binary files /dev/null and b/app/services/__pycache__/__init__.cpython-312.pyc differ diff --git a/app/services/__pycache__/gateway.cpython-311.pyc b/app/services/__pycache__/gateway.cpython-311.pyc new file mode 100644 index 0000000..142ab5a Binary files /dev/null and b/app/services/__pycache__/gateway.cpython-311.pyc differ diff --git a/app/services/__pycache__/gateway.cpython-312.pyc b/app/services/__pycache__/gateway.cpython-312.pyc new file mode 100644 index 0000000..d2d742f Binary files /dev/null and b/app/services/__pycache__/gateway.cpython-312.pyc differ diff --git a/app/services/__pycache__/import_analysis.cpython-311.pyc b/app/services/__pycache__/import_analysis.cpython-311.pyc new file mode 100644 index 0000000..d2492e3 Binary files /dev/null and b/app/services/__pycache__/import_analysis.cpython-311.pyc differ diff --git a/app/services/__pycache__/import_analysis.cpython-312.pyc b/app/services/__pycache__/import_analysis.cpython-312.pyc new file mode 100644 index 0000000..b1baa45 Binary files /dev/null and b/app/services/__pycache__/import_analysis.cpython-312.pyc differ diff --git a/app/services/gateway.py b/app/services/gateway.py new file mode 100644 index 0000000..bb40725 --- /dev/null +++ b/app/services/gateway.py @@ -0,0 +1,53 @@ +from __future__ import annotations + +import os +from typing import Dict, Type + +import httpx + +from app.exceptions import ProviderConfigurationError +from app.models import LLMProvider, LLMRequest, LLMResponse +from app.providers import ( + AnthropicProvider, + DeepSeekProvider, + GeminiProvider, + LLMProviderClient, + OpenAIProvider, + OpenRouterProvider, + QwenProvider, +) + + +class LLMGateway: + """Simple registry that dispatches chat requests to provider clients.""" + + def __init__(self) -> None: + self._providers: Dict[LLMProvider, LLMProviderClient] = {} + self._factory: Dict[LLMProvider, Type[LLMProviderClient]] = { + LLMProvider.OPENAI: OpenAIProvider, + LLMProvider.ANTHROPIC: AnthropicProvider, + LLMProvider.OPENROUTER: OpenRouterProvider, + LLMProvider.GEMINI: GeminiProvider, + LLMProvider.QWEN: QwenProvider, + LLMProvider.DEEPSEEK: DeepSeekProvider, + } + + def get_provider(self, provider: LLMProvider) -> LLMProviderClient: + if provider not in self._factory: + raise ProviderConfigurationError(f"Unsupported provider '{provider.value}'.") + + if provider not in self._providers: + self._providers[provider] = self._build_provider(provider) + return self._providers[provider] + + def _build_provider(self, provider: LLMProvider) -> LLMProviderClient: + provider_cls = self._factory[provider] + api_key_env = getattr(provider_cls, "api_key_env", None) + api_key = os.getenv(api_key_env) if api_key_env else None + return provider_cls(api_key) + + async def chat( + self, request: LLMRequest, client: httpx.AsyncClient + ) -> LLMResponse: + provider_client = self.get_provider(request.provider) + return await provider_client.chat(request, client) diff --git a/app/services/import_analysis.py b/app/services/import_analysis.py new file mode 100644 index 0000000..6a20a41 --- /dev/null +++ b/app/services/import_analysis.py @@ -0,0 +1,91 @@ +from __future__ import annotations + +from typing import List, Tuple + +from app.models import ( + DataImportAnalysisRequest, + LLMMessage, + LLMProvider, + LLMRole, +) + + +def resolve_provider_from_model(llm_model: str) -> Tuple[LLMProvider, str]: + """Resolve provider based on the llm_model string. + + The llm_model may be provided as 'provider:model' or 'provider/model'. + If no provider prefix is present, try an educated guess from common model name patterns. + """ + + normalized = llm_model.strip() + provider_hint: str | None = None + model_name = normalized + + for delimiter in (":", "/", "|"): + if delimiter in normalized: + provider_hint, model_name = normalized.split(delimiter, 1) + provider_hint = provider_hint.strip().lower() + model_name = model_name.strip() + break + + provider_map = {provider.value: provider for provider in LLMProvider} + + if provider_hint: + if provider_hint not in provider_map: + raise ValueError( + f"Unsupported provider '{provider_hint}'. Expected one of: {', '.join(provider_map.keys())}." + ) + return provider_map[provider_hint], model_name + + return _guess_provider_from_model(model_name), model_name + + +def _guess_provider_from_model(model_name: str) -> LLMProvider: + lowered = model_name.lower() + + if lowered.startswith(("gpt", "o1", "text-", "dall-e", "whisper")): + return LLMProvider.OPENAI + if lowered.startswith(("claude", "anthropic")): + return LLMProvider.ANTHROPIC + if lowered.startswith(("gemini", "models/gemini")): + return LLMProvider.GEMINI + if lowered.startswith("qwen"): + return LLMProvider.QWEN + if lowered.startswith("deepseek"): + return LLMProvider.DEEPSEEK + if lowered.startswith(("openrouter", "router-")): + return LLMProvider.OPENROUTER + + supported = ", ".join(provider.value for provider in LLMProvider) + raise ValueError( + f"Unable to infer provider from model '{model_name}'. " + f"Please prefix with 'provider:model'. Supported providers: {supported}." + ) + + +def build_import_messages( + request: DataImportAnalysisRequest, +) -> List[LLMMessage]: + """Create system and user messages for the import analysis prompt.""" + headers_formatted = "\n".join(f"- {header}" for header in request.table_headers) + + system_prompt = ( + "你是一名数据导入识别助手。请根据给定的表头和示例数据,判断字段含义、" + "典型数据类型以及潜在的数据质量问题。最终请返回一个结构化的JSON。\n" + "JSON结构需包含: field_summaries (数组, 每项含 header、meaning、data_type、quality_notes), " + "detected_issues (字符串数组),以及 overall_suggestion (字符串)。" + ) + + user_prompt = ( + f"导入记录ID: {request.import_record_id}\n\n" + "表头信息:\n" + f"{headers_formatted}\n\n" + "示例数据:\n" + f"{request.example_data}\n\n" + "请仔细分析示例数据与表头之间的对应关系,并返回符合上述JSON结构的内容。" + ) + + return [ + LLMMessage(role=LLMRole.SYSTEM, content=system_prompt), + LLMMessage(role=LLMRole.USER, content=user_prompt), + ] diff --git a/demo/GE_result_desc_prompt.txt b/demo/GE_result_desc_prompt.txt new file mode 100644 index 0000000..5122d59 --- /dev/null +++ b/demo/GE_result_desc_prompt.txt @@ -0,0 +1,47 @@ +系统角色(System) +你是“数据画像抽取器”。输入是一段 Great Expectations 的 profiling/validation 结果 JSON, +可能包含:列级期望(expect_*)、统计、样例值、类型推断等;也可能带表级/批次元数据。 +请将其归一化为一个可被程序消费的“表画像”JSON,对不确定项给出置信度与理由。 +禁止臆造不存在的列、时间范围或数值。 + +用户消息(User) +【输入:GE结果JSON】 +{{GE_RESULT_JSON}} + +【输出要求(只输出JSON,不要解释文字)】 +{ + "table": "<库.表 或 表名>", + "row_count": , // 若未知可为 null + "role": "fact|dimension|unknown", // 依据指标/维度占比与唯一性启发式 + "grain": ["<列1>", "<列2>", ...], // 事实粒度猜测(如含 dt/店/类目) + "time": { "column": "|null", "granularity": "day|week|month|unknown", "range": ["YYYY-MM-DD","YYYY-MM-DD"]|null, "has_gaps": true|false|null }, + "columns": [ + { + "name": "", + "dtype": "", + "semantic_type": "dimension|metric|time|text|id|unknown", + "null_rate": <0~1|null>, + "distinct_count": , + "distinct_ratio": <0~1|null>, + "stats": { "min": ,"max": ,"mean": ,"std": ,"skewness": }, + "enumish": true|false|null, // 低熵/可枚举 + "top_values": [{"value":"","pct":<0~1>}, ...],// 取前K个(≤10) + "pk_candidate_score": <0~1>, // 唯一性+非空综合评分 + "metric_candidate_score": <0~1>, // 数值/偏态/业务词命中 + "comment": "<列注释或GE描述|可为空>" + } + ], + "primary_key_candidates": [["colA","colB"], ...], // 依据 unique/compound unique 期望 + "fk_candidates": [{"from":"","to":"","confidence":<0~1>}], + "quality": { + "failed_expectations": [{"name":"","column":"","summary":"<一句话>"}], + "warning_hints": ["空值率>0.2的列: ...", "时间列存在缺口: ..."] + }, + "confidence_notes": ["<为什么判定role/grain/time列>"] +} + +【判定规则(简要)】 +- time列:类型为日期/时间 OR 命中 dt/date/day 等命名;若有 min/max 可给出 range;若间隔缺口≥1天记 has_gaps=true。 +- semantic_type:数值+右偏/方差大→更偏 metric;高唯一/ID命名→id;高基数+文本→text;低熵+有限取值→dimension。 +- role:metric列占比高且存在time列→倾向 fact;几乎全是枚举/ID且少数值→dimension。 +- 置信不高时给出 null 或 unknown,并写入 confidence_notes。 diff --git a/demo/e-commerce-orders_desc.md b/demo/e-commerce-orders_desc.md new file mode 100644 index 0000000..2a72f2d --- /dev/null +++ b/demo/e-commerce-orders_desc.md @@ -0,0 +1,127 @@ +E-commerce Customer Order Behavior Dataset +A synthetic e-commerce dataset containing 10,000 orders with realistic customer behavior patterns, suitable for e-commerce analytics and machine learning tasks. + +Dataset Card for E-commerce Orders +Dataset Summary +This dataset simulates customer order behavior in an e-commerce platform, containing detailed information about orders, customers, products, and delivery patterns. The data is synthetically generated with realistic distributions and patterns. + +Supported Tasks +regression: Predict order quantities or prices +classification: Predict delivery status or customer segments +clustering: Identify customer behavior patterns +time-series-forecasting: Analyze order patterns over time +Languages +Not applicable (tabular data) + +Dataset Structure +Data Instances +Each instance represents a single e-commerce order with the following fields: + +{ + 'order_id': '5ea92c47-c5b2-4bdd-8a50-d77efd77ec89', + 'customer_id': 2350, + 'product_id': 995, + 'category': 'Electronics', + 'price': 403.17, + 'quantity': 3, + 'order_date': '2024-04-20 14:59:58.897063', + 'shipping_date': '2024-04-22 14:59:58.897063', + 'delivery_status': 'Delivered', + 'payment_method': 'PayPal', + 'device_type': 'Mobile', + 'channel': 'Paid Search', + 'shipping_address': '72166 Cunningham Crescent East Nicholasside Mississippi 85568', + 'billing_address': '38199 Edwin Plain Johnborough Maine 81826', + 'customer_segment': 'Returning' +} + +Data Fields +Field Name Type Description Value Range +order_id string Unique order identifier (UUID4) - +customer_id int Customer identifier 1-3,000 +product_id int Product identifier 1-1,000 +category string Product category Electronics, Clothing, Home, Books, Beauty, Toys +price float Product price $5.00-$500.00 +quantity int Order quantity 1-10 +order_date datetime Order placement timestamp Last 12 months +shipping_date datetime Shipping timestamp 1-7 days after order_date +delivery_status string Delivery status Pending, Shipped, Delivered, Returned +payment_method string Payment method used Credit Card, PayPal, Debit Card, Apple Pay, Google Pay +device_type string Ordering device Desktop, Mobile, Tablet +channel string Marketing channel Organic, Paid Search, Email, Social +shipping_address string Delivery address Street, City, State, ZIP +billing_address string Billing address Street, City, State, ZIP +customer_segment string Customer type New, Returning, VIP +Data Splits +This dataset is provided as a single CSV file without splits. + +Dataset Creation +Source Data +This is a synthetic dataset generated using Python with pandas, numpy, and Faker libraries. The data generation process ensures: + +Realistic customer behavior patterns +Proper data distributions +Valid relationships between fields +Realistic address formatting +Annotations +No manual annotations (synthetic data) + +Considerations for Using the Data +Social Impact of Dataset +This dataset is designed for: + +Development of e-commerce analytics systems +Testing of order processing systems +Training of machine learning models for e-commerce +Educational purposes in data science +Discussion of Biases +As a synthetic dataset, care has been taken to: + +Use realistic distributions for order patterns +Maintain proper relationships between dates +Create realistic customer segments +Avoid demographic biases in address generation +However, users should note that: + +The data patterns are simplified compared to real e-commerce data +The customer behavior patterns are based on general assumptions +Geographic distribution might not reflect real-world patterns +Dataset Statistics +Total Records: 10,000 + +Distribution Statistics: + +Delivery Status: + +Delivered: 70% +Shipped: 20% +Pending: 5% +Returned: 5% +Customer Segments: + +VIP: ~15% +Returning: ~35% +New: ~50% +Loading and Usage +Using Huggingface Datasets: + +from datasets import load_dataset + +dataset = load_dataset("path/to/e-commerce-orders") + +# Example: Load as pandas DataFrame +df = dataset['train'].to_pandas() + +# Example: Access specific columns +orders = dataset['train']['order_id'] +prices = dataset['train']['price'] + +Data Quality +The dataset has been validated to ensure: + +No missing values +Proper value ranges +Valid categorical values +Proper date relationships +Unique order IDs +Valid address formats \ No newline at end of file diff --git a/demo/snippet.json b/demo/snippet.json new file mode 100644 index 0000000..51e433f --- /dev/null +++ b/demo/snippet.json @@ -0,0 +1,523 @@ +[ + { + "id": "snpt_daily_gmv_trend", + "title": "日GMV趋势", + "desc": "按日统计GMV与订单量趋势", + "type": "trend", + "applicability": { + "required_columns": [ + "order_date", + "price", + "quantity" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": null, + "fk_join_available": false, + "notes": [ + "GMV=SUM(price*quantity)", + "请避免选择地址等PII字段" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + } + ], + "dialect_sql": { + "mysql": "SELECT DATE(order_date) AS dt, SUM(price*quantity) AS gmv, COUNT(*) AS orders\nFROM {{table}}\nWHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\nGROUP BY dt\nORDER BY dt;" + }, + "business_caliber": "GMV口径:price×quantity;订单量:记录条数;粒度=日。", + "examples": [ + "近30天GMV趋势", + "2025Q1每日GMV与订单数" + ] + }, + { + "id": "snpt_daily_gmv_ma7", + "title": "7日GMV均线", + "desc": "GMV按日与7日滑动平均", + "type": "trend", + "applicability": { + "required_columns": [ + "order_date", + "price", + "quantity" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": null, + "fk_join_available": false, + "notes": [ + "窗口=包含当日的过去7天", + "若日期有缺口,均线可能偏移" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + } + ], + "dialect_sql": { + "mysql": "WITH d AS (\n SELECT DATE(order_date) AS dt, SUM(price*quantity) AS gmv\n FROM {{table}}\n WHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\n GROUP BY dt\n)\nSELECT dt,\n gmv,\n AVG(gmv) OVER (ORDER BY dt ROWS BETWEEN 6 PRECEDING AND CURRENT ROW) AS gmv_ma7\nFROM d\nORDER BY dt;" + }, + "business_caliber": "GMV=price×quantity;窗口=7天(含当日),按自然日排序计算。", + "examples": [ + "本季度GMV与7日均线", + "促销期走势平滑对比" + ] + }, + { + "id": "snpt_yoy_daily_gmv", + "title": "GMV同比(日)", + "desc": "对比去年同日GMV与同比%", + "type": "ratio", + "applicability": { + "required_columns": [ + "order_date", + "price", + "quantity" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": null, + "fk_join_available": false, + "notes": [ + "需要查询窗口覆盖到去年的对应日期", + "闰年按日期对齐处理" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + } + ], + "dialect_sql": { + "mysql": "WITH cur AS (\n SELECT DATE(order_date) AS dt, SUM(price*quantity) AS gmv\n FROM {{table}}\n WHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\n GROUP BY dt\n),\nprev AS (\n SELECT DATE(DATE_SUB(order_date, INTERVAL 1 YEAR)) AS dt, SUM(price*quantity) AS gmv_last\n FROM {{table}}\n WHERE DATE(order_date) BETWEEN DATE_SUB({{start_date}}, INTERVAL 1 YEAR) AND DATE_SUB({{end_date}}, INTERVAL 1 YEAR)\n GROUP BY DATE(DATE_SUB(order_date, INTERVAL 1 YEAR))\n)\nSELECT c.dt,\n c.gmv,\n p.gmv_last,\n CASE WHEN p.gmv_last IS NULL OR p.gmv_last=0 THEN NULL ELSE (c.gmv - p.gmv_last)/p.gmv_last END AS yoy\nFROM cur c LEFT JOIN prev p ON c.dt = p.dt\nORDER BY c.dt;" + }, + "business_caliber": "同比=当日GMV与去年同日GMV之差/去年同日GMV;GMV=price×quantity。", + "examples": [ + "最近90天GMV同比曲线", + "节假日同比表现" + ] + }, + { + "id": "snpt_topn_category_gmv", + "title": "类目GMV排行", + "desc": "按类目统计GMV并取TopN", + "type": "topn", + "applicability": { + "required_columns": [ + "order_date", + "category", + "price", + "quantity" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": 6, + "fk_join_available": false, + "notes": [ + "类目枚举较少,建议TopN<=6用于展示", + "可追加订单量与件数" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + }, + { + "name": "top_n", + "type": "int", + "default": 10 + } + ], + "dialect_sql": { + "mysql": "SELECT category,\n SUM(price*quantity) AS gmv,\n COUNT(*) AS orders,\n SUM(quantity) AS qty\nFROM {{table}}\nWHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\nGROUP BY category\nORDER BY gmv DESC\nLIMIT {{top_n}};" + }, + "business_caliber": "GMV=price×quantity;统计范围=指定日期内;粒度=类目。", + "examples": [ + "上月类目Top5", + "本季度类目GMV结构" + ] + }, + { + "id": "snpt_share_channel", + "title": "渠道GMV占比", + "desc": "统计各渠道GMV及占比", + "type": "ratio", + "applicability": { + "required_columns": [ + "order_date", + "channel", + "price", + "quantity" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": 4, + "fk_join_available": false, + "notes": [ + "占比以总GMV为分母;占比之和≈100%", + "适合饼图/堆叠柱" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + } + ], + "dialect_sql": { + "mysql": "WITH base AS (\n SELECT channel, SUM(price*quantity) AS gmv\n FROM {{table}}\n WHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\n GROUP BY channel\n), total AS (\n SELECT SUM(gmv) AS tg FROM base\n)\nSELECT b.channel, b.gmv, b.gmv/t.tg AS gmv_share\nFROM base b CROSS JOIN total t\nORDER BY b.gmv DESC;" + }, + "business_caliber": "渠道GMV占比=渠道GMV/全部渠道GMV;时间范围由参数限定。", + "examples": [ + "本月各渠道占比", + "Q1渠道结构对比" + ] + }, + { + "id": "snpt_topn_product_gmv", + "title": "商品GMV排行", + "desc": "按商品ID统计GMV并取TopN", + "type": "topn", + "applicability": { + "required_columns": [ + "order_date", + "product_id", + "price", + "quantity" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": 1000, + "fk_join_available": true, + "notes": [ + "product_id基数较高,建议LIMIT<=50", + "可与商品维表联查名称等属性" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + }, + { + "name": "top_n", + "type": "int", + "default": 20 + } + ], + "dialect_sql": { + "mysql": "SELECT product_id,\n SUM(price*quantity) AS gmv,\n SUM(quantity) AS qty,\n COUNT(*) AS orders\nFROM {{table}}\nWHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\nGROUP BY product_id\nORDER BY gmv DESC\nLIMIT {{top_n}};" + }, + "business_caliber": "GMV=price×quantity;粒度=商品ID。", + "examples": [ + "上周热销商品Top20", + "年度销量Top10商品" + ] + }, + { + "id": "snpt_join_product_dim", + "title": "商品维表联查", + "desc": "以product_id关联商品维表或使用纯ID", + "type": "join", + "applicability": { + "required_columns": [ + "product_id" + ], + "time_column": null, + "constraints": { + "dim_cardinality_hint": 1000, + "fk_join_available": true, + "notes": [ + "若无维表则保留纯ID版输出", + "谨慎选择PII字段,勿输出地址类字段" + ] + } + }, + "variables": [ + { + "name": "dim_product", + "type": "identifier" + }, + { + "name": "select_cols", + "type": "string", + "default": "f.product_id, f.price, f.quantity" + } + ], + "dialect_sql": { + "mysql": "-- 命名版\nSELECT {{select_cols}}\nFROM {{table}} f\nLEFT JOIN {{dim_product}} d ON f.product_id = d.product_id;\n\n-- 纯ID版\nSELECT product_id, price, quantity FROM {{table}};" + }, + "business_caliber": "外键:product_id→商品维表主键;度量来源于事实表price与quantity。", + "examples": [ + "联查商品名称后做TopN", + "仅用ID进行商品分析" + ] + }, + { + "id": "snpt_join_customer_dim", + "title": "客户维表联查", + "desc": "以customer_id关联客户维表或使用纯ID", + "type": "join", + "applicability": { + "required_columns": [ + "customer_id" + ], + "time_column": null, + "constraints": { + "dim_cardinality_hint": 2713, + "fk_join_available": true, + "notes": [ + "如无维表,可直接按customer_id聚合", + "避免输出shipping_address/billing_address等PII" + ] + } + }, + "variables": [ + { + "name": "dim_customer", + "type": "identifier" + }, + { + "name": "select_cols", + "type": "string", + "default": "c.customer_name, f.customer_id, SUM(f.price*f.quantity) AS gmv" + } + ], + "dialect_sql": { + "mysql": "-- 命名版\nSELECT {{select_cols}}\nFROM {{table}} f\nLEFT JOIN {{dim_customer}} c ON f.customer_id = c.customer_id\nGROUP BY c.customer_name, f.customer_id;\n\n-- 纯ID版\nSELECT customer_id, SUM(price*quantity) AS gmv\nFROM {{table}}\nGROUP BY customer_id;" + }, + "business_caliber": "外键:customer_id→客户维表主键;GMV=price×quantity。", + "examples": [ + "客户分群GMV", + "重点客户消费额排行" + ] + }, + { + "id": "snpt_quality_dup_order", + "title": "主键重复检查", + "desc": "检查order_id唯一性并抽样输出", + "type": "quality", + "applicability": { + "required_columns": [ + "order_id" + ], + "time_column": null, + "constraints": { + "dim_cardinality_hint": 10000, + "fk_join_available": false, + "notes": [ + "画像显示order_id应唯一;若结果非空为异常" + ] + } + }, + "variables": [ + { + "name": "limit_sample", + "type": "int", + "default": 50 + } + ], + "dialect_sql": { + "mysql": "WITH d AS (\n SELECT order_id, COUNT(*) AS cnt\n FROM {{table}}\n GROUP BY order_id\n HAVING COUNT(*)>1\n)\nSELECT * FROM d LIMIT {{limit_sample}};" + }, + "business_caliber": "主键口径:order_id全表唯一;用于数据质量预警与排查。", + "examples": [ + "是否存在重复订单?", + "查看重复订单样本" + ] + }, + { + "id": "snpt_quality_price_outlier", + "title": "价格异常检测", + "desc": "基于当日均值±3σ识别异常价", + "type": "quality", + "applicability": { + "required_columns": [ + "order_date", + "price" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": null, + "fk_join_available": false, + "notes": [ + "仅质量预警,不直接代表业务错误", + "当天样本过少时波动较大" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + }, + { + "name": "limit_sample", + "type": "int", + "default": 100 + } + ], + "dialect_sql": { + "mysql": "WITH stats AS (\n SELECT DATE(order_date) AS dt, AVG(price) AS mu, STDDEV_POP(price) AS sigma\n FROM {{table}}\n WHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\n GROUP BY dt\n)\nSELECT f.*\nFROM {{table}} f\nJOIN stats s ON DATE(f.order_date)=s.dt\nWHERE (f.price > s.mu + 3*s.sigma OR f.price < s.mu - 3*s.sigma)\nLIMIT {{limit_sample}};" + }, + "business_caliber": "异常定义:价格超出当日均值±3×标准差(总体标准差)。", + "examples": [ + "近30天价格异常样本", + "促销期异常价监控" + ] + }, + { + "id": "snpt_sample_recent_orders", + "title": "近期明细抽样", + "desc": "抽样查看近期订单核心字段", + "type": "sample", + "applicability": { + "required_columns": [ + "order_date", + "order_id", + "customer_id", + "product_id", + "category", + "price", + "quantity", + "channel", + "payment_method", + "delivery_status" + ], + "time_column": "order_date", + "constraints": { + "dim_cardinality_hint": null, + "fk_join_available": true, + "notes": [ + "为保护隐私,不展示shipping_address与billing_address", + "仅用于人工核验" + ] + } + }, + "variables": [ + { + "name": "start_date", + "type": "date" + }, + { + "name": "end_date", + "type": "date" + }, + { + "name": "limit_rows", + "type": "int", + "default": 100 + } + ], + "dialect_sql": { + "mysql": "SELECT DATE(order_date) AS dt,\n order_id, customer_id, product_id, category,\n price, quantity, channel, payment_method, delivery_status\nFROM {{table}}\nWHERE DATE(order_date) BETWEEN {{start_date}} AND {{end_date}}\nORDER BY dt DESC\nLIMIT {{limit_rows}};" + }, + "business_caliber": "明细抽样用于数据核验;不输出PII地址信息。", + "examples": [ + "抽样查看上周订单", + "核对节假日订单明细" + ] + }, + { + "id": "snpt_filter_paid_delivered", + "title": "支付已送达筛选", + "desc": "过滤支付方式为信用卡且配送状态为已送达", + "type": "sample", + "applicability": { + "required_columns": [ + "payment_method", + "delivery_status" + ], + "time_column": null, + "constraints": { + "dim_cardinality_hint": 5, + "fk_join_available": false, + "notes": [ + "此片段为WHERE条件模板,可拼接到任意查询", + "delivery_status枚举包含Delivered/Pending/Returned/Shipped" + ] + } + }, + "variables": [], + "dialect_sql": { + "mysql": "WHERE payment_method = 'Credit Card' AND delivery_status = 'Delivered'" + }, + "business_caliber": "口径:支付渠道=信用卡;物流状态=已送达(Delivered)。可与时间或维度条件叠加。", + "examples": [ + "筛选信用卡已送达订单", + "在TopN商品中仅看已送达信用卡订单" + ] + }, + { + "id": "snpt_filter_device_channel", + "title": "设备渠道筛选", + "desc": "按设备类型与渠道过滤分析范围", + "type": "sample", + "applicability": { + "required_columns": [ + "device_type", + "channel" + ], + "time_column": null, + "constraints": { + "dim_cardinality_hint": 7, + "fk_join_available": false, + "notes": [ + "device_type枚举:Desktop/Mobile/Tablet", + "channel枚举:Email/Organic/Paid Search/Social" + ] + } + }, + "variables": [], + "dialect_sql": { + "mysql": "WHERE device_type IN ('Mobile','Desktop') AND channel IN ('Paid Search','Social')" + }, + "business_caliber": "限制分析在指定设备与渠道;可直接作为WHERE子句片段复用。", + "examples": [ + "仅看移动端付费渠道GMV", + "桌面+社媒渠道订单明细" + ] + } +] \ No newline at end of file diff --git a/demo/snippet_alias_generator.json b/demo/snippet_alias_generator.json new file mode 100644 index 0000000..0c592dc --- /dev/null +++ b/demo/snippet_alias_generator.json @@ -0,0 +1,499 @@ +[ + { + "id": "snpt_daily_gmv_trend", + "aliases": [ + { + "text": "每日GMV走势", + "tone": "中性" + }, + { + "text": "日销售额趋势", + "tone": "中性" + }, + { + "text": "每天卖了多少", + "tone": "口语" + }, + { + "text": "按日GMV曲线", + "tone": "专业" + } + ], + "keywords": [ + "GMV", + "销售额", + "日趋势", + "每日", + "订单量", + "orders", + "price", + "quantity", + "order_date", + "time series", + "趋势图", + "按日聚合" + ], + "intent_tags": [ + "trend" + ] + }, + { + "id": "snpt_daily_gmv_ma7", + "aliases": [ + { + "text": "GMV七日均线", + "tone": "专业" + }, + { + "text": "7天滑动平均", + "tone": "中性" + }, + { + "text": "GMV周均走势", + "tone": "中性" + }, + { + "text": "GMV平滑曲线", + "tone": "专业" + } + ], + "keywords": [ + "GMV", + "移动平均", + "MA7", + "七日均线", + "滑动窗口", + "time series", + "order_date", + "price", + "quantity", + "平滑", + "趋势", + "按日聚合" + ], + "intent_tags": [ + "trend" + ] + }, + { + "id": "snpt_yoy_daily_gmv", + "aliases": [ + { + "text": "GMV日同比", + "tone": "专业" + }, + { + "text": "每日同比增速", + "tone": "中性" + }, + { + "text": "跟去年同日比", + "tone": "口语" + }, + { + "text": "GMV YoY(日)", + "tone": "专业" + } + ], + "keywords": [ + "同比", + "YoY", + "GMV", + "去年同日", + "增长率", + "price", + "quantity", + "order_date", + "对比分析", + "比值", + "日粒度", + "ratio" + ], + "intent_tags": [ + "ratio" + ] + }, + { + "id": "snpt_topn_category_gmv", + "aliases": [ + { + "text": "类目GMV排行", + "tone": "中性" + }, + { + "text": "类目TopN销量", + "tone": "中性" + }, + { + "text": "哪个分类最卖", + "tone": "口语" + }, + { + "text": "按类目GMV排序", + "tone": "专业" + } + ], + "keywords": [ + "TopN", + "分类", + "类目", + "category", + "GMV", + "price", + "quantity", + "排行", + "榜单", + "按类目聚合", + "订单量", + "销量" + ], + "intent_tags": [ + "topn", + "by_dimension" + ] + }, + { + "id": "snpt_share_channel", + "aliases": [ + { + "text": "渠道GMV占比", + "tone": "中性" + }, + { + "text": "各渠道份额", + "tone": "中性" + }, + { + "text": "哪个渠道占多", + "tone": "口语" + }, + { + "text": "渠道结构占比", + "tone": "专业" + } + ], + "keywords": [ + "占比", + "份额", + "share", + "channel", + "GMV", + "price", + "quantity", + "比例", + "结构分析", + "按渠道聚合", + "饼图", + "堆叠" + ], + "intent_tags": [ + "ratio", + "by_dimension" + ] + }, + { + "id": "snpt_topn_product_gmv", + "aliases": [ + { + "text": "商品GMV排行", + "tone": "中性" + }, + { + "text": "热销商品TopN", + "tone": "中性" + }, + { + "text": "哪款卖得最好", + "tone": "口语" + }, + { + "text": "按商品GMV排序", + "tone": "专业" + } + ], + "keywords": [ + "TopN", + "product_id", + "商品", + "GMV", + "price", + "quantity", + "热销", + "排行", + "销量", + "订单数", + "高基数", + "榜单" + ], + "intent_tags": [ + "topn", + "by_dimension" + ] + }, + { + "id": "snpt_join_product_dim", + "aliases": [ + { + "text": "关联商品维度", + "tone": "专业" + }, + { + "text": "商品ID联表", + "tone": "中性" + }, + { + "text": "把商品名连上", + "tone": "口语" + }, + { + "text": "product维表join", + "tone": "专业" + } + ], + "keywords": [ + "join", + "维表", + "product_id", + "维度扩展", + "明细补充", + "维度属性", + "联表查询", + "外键", + "选择列", + "维度贴标签", + "by id", + "映射" + ], + "intent_tags": [ + "by_dimension" + ] + }, + { + "id": "snpt_join_customer_dim", + "aliases": [ + { + "text": "关联客户维度", + "tone": "专业" + }, + { + "text": "客户ID联表", + "tone": "中性" + }, + { + "text": "把客户信息补齐", + "tone": "口语" + }, + { + "text": "customer维表join", + "tone": "专业" + } + ], + "keywords": [ + "join", + "维表", + "customer_id", + "客户属性", + "GMV聚合", + "外键关联", + "联表查询", + "ID映射", + "维度丰富", + "分群分析", + "by id", + "扩展字段" + ], + "intent_tags": [ + "by_dimension" + ] + }, + { + "id": "snpt_quality_dup_order", + "aliases": [ + { + "text": "订单主键去重检", + "tone": "专业" + }, + { + "text": "重复order_id查找", + "tone": "中性" + }, + { + "text": "有没重复订单", + "tone": "口语" + }, + { + "text": "主键唯一性校验", + "tone": "专业" + } + ], + "keywords": [ + "数据质量", + "重复", + "去重", + "order_id", + "唯一性", + "主键检查", + "异常数据", + "质量预警", + "count>1", + "样本抽取", + "校验", + "重复检测" + ], + "intent_tags": [ + "quality" + ] + }, + { + "id": "snpt_quality_price_outlier", + "aliases": [ + { + "text": "价格3σ异常检", + "tone": "专业" + }, + { + "text": "当日异常价格", + "tone": "中性" + }, + { + "text": "看哪单价格怪", + "tone": "口语" + }, + { + "text": "价格离群监控", + "tone": "专业" + } + ], + "keywords": [ + "异常检测", + "3σ", + "stddev", + "价格", + "price", + "离群点", + "质量规则", + "time series", + "order_date", + "阈值告警", + "数据监控", + "波动" + ], + "intent_tags": [ + "quality" + ] + }, + { + "id": "snpt_sample_recent_orders", + "aliases": [ + { + "text": "近期明细抽样", + "tone": "中性" + }, + { + "text": "抽查最近订单", + "tone": "口语" + }, + { + "text": "近期订单样本", + "tone": "中性" + }, + { + "text": "核验明细抽样", + "tone": "专业" + } + ], + "keywords": [ + "抽样", + "sample", + "明细", + "order_date", + "order_id", + "customer_id", + "product_id", + "category", + "channel", + "payment_method", + "delivery_status", + "核验" + ], + "intent_tags": [ + "by_dimension" + ] + }, + { + "id": "snpt_filter_paid_delivered", + "aliases": [ + { + "text": "信用卡送达筛选", + "tone": "中性" + }, + { + "text": "只看信用卡已送达", + "tone": "口语" + }, + { + "text": "信用卡且已送达", + "tone": "中性" + }, + { + "text": "付款信用卡已送达", + "tone": "专业" + } + ], + "keywords": [ + "支付方式", + "信用卡", + "Credit Card", + "配送状态", + "Delivered", + "已送达", + "过滤条件", + "where子句", + "订单筛选", + "支付渠道", + "状态筛选", + "条件片段" + ], + "intent_tags": [ + "by_dimension" + ] + }, + { + "id": "snpt_filter_device_channel", + "aliases": [ + { + "text": "设备渠道筛选", + "tone": "中性" + }, + { + "text": "只看移动付费社媒", + "tone": "口语" + }, + { + "text": "设备+渠道过滤", + "tone": "专业" + }, + { + "text": "端与渠道条件", + "tone": "中性" + } + ], + "keywords": [ + "device_type", + "channel", + "设备类型", + "渠道", + "过滤条件", + "where子句", + "Mobile", + "Desktop", + "Paid Search", + "Social", + "范围限定", + "条件片段" + ], + "intent_tags": [ + "by_dimension" + ] + } +] \ No newline at end of file diff --git a/demo/snippet_alias_generator.txt b/demo/snippet_alias_generator.txt new file mode 100644 index 0000000..2482ab0 --- /dev/null +++ b/demo/snippet_alias_generator.txt @@ -0,0 +1,52 @@ +系统角色(System) +你是“SQL片段别名生成器”。 +输入为一个或多个 SQL 片段对象(来自 snippet.json),输出为针对每个片段生成的多样化别名(口语 / 中性 / 专业)、关键词与意图标签。 +要求逐个处理所有片段对象,输出同样数量的 JSON 元素。 + +用户消息(User) +【上下文】 + +SQL片段对象数组:{{SNIPPET_ARRAY}} // snippet.json中的一个或多个片段 + +【任务要求】 +请针对输入数组中的 每个 SQL 片段,输出一个 JSON 对象,结构如下: + +{ + "id": "<与输入片段id一致>", + "aliases": [ + {"text": "…", "tone": "口语|中性|专业"}, + {"text": "…", "tone": "专业"} + ], + "keywords": [ + "GMV","销售额","TopN","category","类目","趋势","同比","客户","订单","质量","异常检测","join","过滤","sample" + ], + "intent_tags": ["aggregate","trend","topn","ratio","quality","join","sample","filter","by_dimension"] +} + +生成逻辑规范 +1.逐条输出:输入数组中每个片段对应一个输出对象(id 保持一致)。 + +2.aliases生成 +至少 3 个别名,分别覆盖语气类型:口语 / 中性 / 专业。 +≤20字,语义需等价,不得添加不存在的字段或业务口径。 +示例: + GMV趋势分析(中性) + 每天卖多少钱(口语) + 按日GMV曲线(专业) +3.keywords生成 +8~15个关键词,需涵盖片段核心维度、指标、分析类型和语义近义词。 +中英文混合(如 "GMV"/"销售额"、"同比"/"YoY"、"类目"/"category" 等)。 +包含用于匹配的分析意图关键词(如 “趋势”、“排行”、“占比”、“质量检查”、“过滤” 等)。 + +4.intent_tags生成 + +从以下集合中选取,与片段type及用途一致: +["aggregate","trend","topn","ratio","quality","join","sample","filter","by_dimension"] + +若为条件片段(WHERE句型),补充 "filter";若含维度分组逻辑,补充 "by_dimension"。 + +5.语言与内容要求 + +保持正式书面风格,不添加解释说明。 + +只输出JSON数组,不包含文字描述或额外文本。 \ No newline at end of file diff --git a/demo/snippet_generator.txt b/demo/snippet_generator.txt new file mode 100644 index 0000000..1d0e1c0 --- /dev/null +++ b/demo/snippet_generator.txt @@ -0,0 +1,46 @@ +系统角色(System) +你是“SQL片段生成器”。只能基于给定“表画像”生成可复用的分析片段。 +为每个片段产出:标题、用途描述、片段类型、变量、适用条件、SQL模板(mysql方言),并注明业务口径与安全限制。 +不要发明画像里没有的列。时间/维度/指标须与画像匹配。 + +用户消息(User) +【表画像JSON】 +{{TABLE_PROFILE_JSON}} + +【输出要求(只输出JSON数组)】 +[ + { + "id": "snpt_", + "title": "中文标题(≤16字)", + "desc": "一句话用途", + "type": "aggregate|trend|topn|ratio|quality|join|sample", + "applicability": { + "required_columns": ["", ...], + "time_column": "", + "constraints": { + "dim_cardinality_hint": , // 用于TopN限制与性能提示 + "fk_join_available": true|false, + "notes": ["高基数维度建议LIMIT<=50", "..."] + } + }, + "variables": [ + {"name":"start_date","type":"date"}, + {"name":"end_date","type":"date"}, + {"name":"top_n","type":"int","default":10} + ], + "dialect_sql": { + "mysql": "" + }, + "business_caliber": "清晰口径说明,如 UV以device_id去重;粒度=日-类目", + "examples": ["示例问法1","示例问法2"] + } +] + +【片段选择建议】 +- 若存在 time 列:生成 trend_by_day / yoy_qoq / moving_avg。 +- 若存在 enumish 维度(distinct 5~200):生成 topn_by_dimension / share_of_total。 +- 若 metric 列:生成 sum/avg/max、分位数/异常检测(3σ/箱线)。 +- 有主键/唯一:生成 去重/明细抽样/质量检查。 +- 有 fk_candidates:同时生成“join维表命名版”和“纯ID版”。 +- 高枚举维度:在 constraints.notes 中强调 LIMIT 建议与可能的性能风险。 +- 除了完整的sql片段,还有sql里部分内容的sql片段,比如 where payment_method = 'Credit Card' and delivery_status = 'Deliverd' 的含义是支付方式为信用卡且配送状态是已送达 \ No newline at end of file diff --git a/demo/table-desc.json b/demo/table-desc.json new file mode 100644 index 0000000..4f75d31 --- /dev/null +++ b/demo/table-desc.json @@ -0,0 +1,277 @@ +{ + "table": "ecommerce_orders", + "row_count": 10000, + "role": "fact", + "grain": ["order_id"], + "time": { + "column": "order_date", + "granularity": "day", + "range": ["2024-04-20", "2025-04-19"], + "has_gaps": false + }, + "columns": [ + { + "name": "order_id", + "dtype": "string", + "semantic_type": "id", + "null_rate": 0.0, + "distinct_count": 10000, + "distinct_ratio": 1.0, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 1.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "customer_id", + "dtype": "integer", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 2713, + "distinct_ratio": 0.2713, + "stats": {"min": 1, "max": 2999, "mean": 995.29, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 0.3, + "metric_candidate_score": 0.1, + "comment": "" + }, + { + "name": "product_id", + "dtype": "integer", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 1000, + "distinct_ratio": 0.0999, + "stats": {"min": 1, "max": 1000, "mean": 504.87, "std": null, "skewness": null}, + "enumish": true, + "top_values": [], + "pk_candidate_score": 0.1, + "metric_candidate_score": 0.1, + "comment": "" + }, + { + "name": "category", + "dtype": "string", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 6, + "distinct_ratio": 0.0006, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": "Beauty", "pct": null}, + {"value": "Books", "pct": null}, + {"value": "Clothing", "pct": null}, + {"value": "Electronics", "pct": null}, + {"value": "Home", "pct": null}, + {"value": "Toys", "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "price", + "dtype": "float", + "semantic_type": "metric", + "null_rate": 0.0, + "distinct_count": 9013, + "distinct_ratio": 0.9013, + "stats": {"min": 5.06, "max": 499.93, "mean": 252.55, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.9, + "comment": "" + }, + { + "name": "quantity", + "dtype": "integer", + "semantic_type": "metric", + "null_rate": 0.0, + "distinct_count": 9, + "distinct_ratio": 0.0009, + "stats": {"min": 1, "max": 9, "mean": 2.12, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": 1, "pct": null}, + {"value": 2, "pct": null}, + {"value": 3, "pct": null}, + {"value": 4, "pct": null}, + {"value": 5, "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.7, + "comment": "" + }, + { + "name": "order_date", + "dtype": "string", + "semantic_type": "time", + "null_rate": 0.0, + "distinct_count": 365, + "distinct_ratio": 0.0365, + "stats": {"min": "2024-04-20", "max": "2025-04-19", "mean": null, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "shipping_date", + "dtype": "string", + "semantic_type": "time", + "null_rate": 0.0, + "distinct_count": 371, + "distinct_ratio": 0.0371, + "stats": {"min": "2024-04-21", "max": "2025-04-26", "mean": null, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "delivery_status", + "dtype": "string", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 4, + "distinct_ratio": 0.0004, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": "Delivered", "pct": null}, + {"value": "Pending", "pct": null}, + {"value": "Returned", "pct": null}, + {"value": "Shipped", "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "payment_method", + "dtype": "string", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 5, + "distinct_ratio": 0.0005, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": "Apple Pay", "pct": null}, + {"value": "Credit Card", "pct": null}, + {"value": "Debit Card", "pct": null}, + {"value": "Google Pay", "pct": null}, + {"value": "PayPal", "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "device_type", + "dtype": "string", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 3, + "distinct_ratio": 0.0003, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": "Desktop", "pct": null}, + {"value": "Mobile", "pct": null}, + {"value": "Tablet", "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "channel", + "dtype": "string", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 4, + "distinct_ratio": 0.0004, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": "Email", "pct": null}, + {"value": "Organic", "pct": null}, + {"value": "Paid Search", "pct": null}, + {"value": "Social", "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "shipping_address", + "dtype": "string", + "semantic_type": "text", + "null_rate": 0.0, + "distinct_count": 10000, + "distinct_ratio": 1.0, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 0.9, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "billing_address", + "dtype": "string", + "semantic_type": "text", + "null_rate": 0.0, + "distinct_count": 10000, + "distinct_ratio": 1.0, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": false, + "top_values": [], + "pk_candidate_score": 0.9, + "metric_candidate_score": 0.0, + "comment": "" + }, + { + "name": "customer_segment", + "dtype": "string", + "semantic_type": "dimension", + "null_rate": 0.0, + "distinct_count": 3, + "distinct_ratio": 0.0003, + "stats": {"min": null, "max": null, "mean": null, "std": null, "skewness": null}, + "enumish": true, + "top_values": [ + {"value": "New", "pct": null}, + {"value": "Returning", "pct": null}, + {"value": "VIP", "pct": null} + ], + "pk_candidate_score": 0.0, + "metric_candidate_score": 0.0, + "comment": "" + } + ], + "primary_key_candidates": [["order_id"]], + "fk_candidates": [ + {"from": "customer_id", "to": "dim_customer(customer_id)", "confidence": 0.9}, + {"from": "product_id", "to": "dim_product(product_id)", "confidence": 0.9} + ], + "quality": { + "failed_expectations": [], + "warning_hints": [] + }, + "confidence_notes": [ + "表含时间列(order_date, shipping_date)且含度量列(price, quantity),推断为fact表。", + "order_id唯一性=1.0,确认主键。", + "order_date日期范围连续无缺口,粒度为日级。", + "高基数数值字段(price, quantity)符合指标特征。", + "低熵字段(category, delivery_status, payment_method等)为枚举维度。" + ] +} diff --git a/demo/user-query.json b/demo/user-query.json new file mode 100644 index 0000000..fbc0762 --- /dev/null +++ b/demo/user-query.json @@ -0,0 +1,102 @@ +[ + { + "question": "近一年每个月的销售额和订单量变化趋势如何?", + "intent": "trend_analysis", + "related_fields": ["order_date", "price", "quantity"] + }, + { + "question": "哪个产品类目的GMV最高?", + "intent": "topn_category", + "related_fields": ["category", "price", "quantity"] + }, + { + "question": "不同支付方式的订单数量和平均客单价是多少?", + "intent": "aggregate_comparison", + "related_fields": ["payment_method", "price", "quantity"] + }, + { + "question": "各营销渠道(如Paid Search、Social)的GMV占比是多少?", + "intent": "ratio_analysis", + "related_fields": ["channel", "price", "quantity"] + }, + { + "question": "移动端和桌面端的订单表现差异大吗?", + "intent": "device_comparison", + "related_fields": ["device_type", "price", "quantity"] + }, + { + "question": "已送达订单的平均配送时长是多少天?", + "intent": "shipping_time_analysis", + "related_fields": ["order_date", "shipping_date", "delivery_status"] + }, + { + "question": "退货(Returned)订单主要集中在哪些产品类目?", + "intent": "return_analysis", + "related_fields": ["delivery_status", "category"] + }, + { + "question": "不同客户类型(新客、回头客、VIP)的平均订单金额是多少?", + "intent": "segment_analysis", + "related_fields": ["customer_segment", "price", "quantity"] + }, + { + "question": "每个客户的平均下单频率是多少?", + "intent": "customer_behavior", + "related_fields": ["customer_id", "order_date"] + }, + { + "question": "近期是否存在价格异常或超高订单?", + "intent": "quality_outlier", + "related_fields": ["price", "order_date"] + }, + { + "question": "哪个支付方式的退货率最高?", + "intent": "return_ratio_by_payment", + "related_fields": ["payment_method", "delivery_status"] + }, + { + "question": "哪些商品在VIP客户中最受欢迎?", + "intent": "vip_product_preference", + "related_fields": ["customer_segment", "product_id", "price", "quantity"] + }, + { + "question": "下单后平均几天发货?", + "intent": "shipping_speed", + "related_fields": ["order_date", "shipping_date"] + }, + { + "question": "从哪些渠道来的新用户最多?", + "intent": "user_acquisition_channel", + "related_fields": ["channel", "customer_segment"] + }, + { + "question": "订单数量在周末和工作日有什么差异?", + "intent": "weekday_pattern", + "related_fields": ["order_date"] + }, + { + "question": "每个设备类型的平均订单金额是多少?", + "intent": "device_gmv_comparison", + "related_fields": ["device_type", "price", "quantity"] + }, + { + "question": "本月退货率与上月相比是否上升?", + "intent": "return_trend", + "related_fields": ["delivery_status", "order_date"] + }, + { + "question": "哪些客户下单金额最高?", + "intent": "top_customers", + "related_fields": ["customer_id", "price", "quantity"] + }, + { + "question": "不同类目的平均客单价(GMV/订单量)是多少?", + "intent": "category_avg_order_value", + "related_fields": ["category", "price", "quantity"] + }, + { + "question": "不同渠道的订单平均转化周期(下单到发货)是多少?", + "intent": "conversion_cycle", + "related_fields": ["channel", "order_date", "shipping_date"] + } +] diff --git a/file/全国品牌.xlsx b/file/全国品牌.xlsx new file mode 100644 index 0000000..1b1735a Binary files /dev/null and b/file/全国品牌.xlsx differ diff --git a/file/长江电力CYPC概览历史工作票-向家坝-机械类-2024.01.01-2025.06.30.xlsx b/file/长江电力CYPC概览历史工作票-向家坝-机械类-2024.01.01-2025.06.30.xlsx new file mode 100644 index 0000000..693d7a0 Binary files /dev/null and b/file/长江电力CYPC概览历史工作票-向家坝-机械类-2024.01.01-2025.06.30.xlsx differ diff --git a/ge_v1.py b/ge_v1.py new file mode 100644 index 0000000..e1165fa --- /dev/null +++ b/ge_v1.py @@ -0,0 +1,332 @@ +"""Great Expectations profiling helper for Excel sources. + +This script loads a user-provided Excel file into pandas, profiles it with +Great Expectations, writes a lightweight analysis summary to JSON, and exposes +the path to GE Data Docs for manual inspection. +""" + +from __future__ import annotations + +import argparse +import json +import os +import shutil +from pathlib import Path +from typing import Any, Dict + +import numpy as np +import pandas as pd +import great_expectations as gx +from great_expectations.core.batch import RuntimeBatchRequest +from great_expectations.data_context import FileDataContext +from great_expectations.exceptions import ( + DataContextError, + InvalidDataContextConfigError, +) + +try: + from great_expectations.profile.user_configurable_profiler import ( + UserConfigurableProfiler, + ) +except ImportError: + try: + from great_expectations.profiler.user_configurable_profiler import ( + UserConfigurableProfiler, + ) + except ImportError as err: + raise ImportError( + "UserConfigurableProfiler is not available; please install a compatible " + "version of great_expectations (>=0.15,<0.19) or add the profiling extra." + ) from err + + +RESULTS_DIR = Path("results") +DEFAULT_EXCEL_PATH = Path("file") / "全国品牌.xlsx" +DEFAULT_BATCH_ID = "initial_profile" + + +def parse_cli_args() -> argparse.Namespace: + """Parse command line options for Excel ingestion.""" + + parser = argparse.ArgumentParser(description="Profile an Excel file with GE") + parser.add_argument( + "--excel-path", + type=Path, + default=DEFAULT_EXCEL_PATH, + help="Path to the Excel file to analyse (default: ./file/全国品牌.xlsx)", + ) + parser.add_argument( + "--sheet-name", + default=0, + help="Excel sheet name or index to load (default: 0)", + ) + parser.add_argument( + "--header-row", + type=int, + default=0, + help="Row index (0-based) to use as the header (default: 0)", + ) + parser.add_argument( + "--clean-results", + action="store_true", + help="Remove the previous results directory before running", + ) + parser.add_argument( + "--ge-root", + type=Path, + default=Path("gx_project"), + help="Directory to host the Great Expectations project (default: ./gx_project)", + ) + return parser.parse_args() + + +def reset_results_dir(clean_results: bool) -> None: + """Remove prior results folder when requested and ensure directory exists.""" + + if clean_results and RESULTS_DIR.exists(): + shutil.rmtree(RESULTS_DIR) + RESULTS_DIR.mkdir(parents=True, exist_ok=True) + + +def load_excel_as_dataframe(excel_path: Path, sheet_name: Any, header_row: int) -> pd.DataFrame: + """Load Excel data into a DataFrame and provide basic logging.""" + + if not excel_path.exists(): + raise FileNotFoundError(f"Excel file not found: {excel_path}") + + df = pd.read_excel(excel_path, sheet_name=sheet_name, header=header_row) + print(f"Loaded Excel data: {excel_path} ({len(df)} rows, {len(df.columns)} columns)") + return df + + +def get_datasource_config(datasource_name: str, data_connector_name: str) -> Dict[str, Any]: + """Assemble a minimal Pandas datasource configuration.""" + + return { + "name": datasource_name, + "class_name": "Datasource", + "execution_engine": {"class_name": "PandasExecutionEngine"}, + "data_connectors": { + data_connector_name: { + "class_name": "RuntimeDataConnector", + "runtime_keys": ["batch_id"], + } + }, + } + + +def clean_value(value: Any) -> Any: + """Convert numpy/pandas scalar types into JSON serialisable values.""" + + if isinstance(value, (np.generic,)): + return value.item() + if isinstance(value, pd.Timestamp): + return value.isoformat() + if pd.isna(value): + return None + return value + + +def build_column_profile(series: pd.Series) -> Dict[str, Any]: + """Generate a compact per-column profile for JSON output.""" + + stats = series.describe() + profiled_stats = {key: clean_value(val) for key, val in stats.items()} + + return { + "name": str(series.name), + "dtype": str(series.dtype), + "non_null_count": int(series.count()), + "null_count": int(series.isna().sum()), + "unique_count": int(series.nunique(dropna=True)), + "stats": profiled_stats, + } + + +def build_analysis_summary(df: pd.DataFrame, sample_size: int = 5) -> Dict[str, Any]: + """Collate basic statistics to accompany GE outputs.""" + + summary = { + "shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])}, + "columns": [build_column_profile(df[col]) for col in df.columns], + "sample_rows": [ + {key: clean_value(value) for key, value in row.items()} for row in df.head(sample_size).to_dict(orient="records") + ], + } + return summary + + +def serialize_batch_request(batch_request: Any) -> Dict[str, Any]: + """Convert differing batch request types into plain dictionaries.""" + + if hasattr(batch_request, "to_json_dict"): + return batch_request.to_json_dict() + if hasattr(batch_request, "dict"): + return batch_request.dict() + if hasattr(batch_request, "model_dump"): + return batch_request.model_dump() + return {"repr": repr(batch_request)} + + +def ensure_data_context(ge_root: Path) -> gx.DataContext: + """Create or repair a file-backed GE data context as needed.""" + + ge_root = ge_root.resolve() + config_path = ge_root / "gx" / "great_expectations.yml" + if not config_path.exists(): + FileDataContext.create(project_root_dir=str(ge_root)) + + try: + return gx.get_context(project_root_dir=str(ge_root)) + except InvalidDataContextConfigError: + print("Existing Great Expectations config invalid; recreating project root.") + shutil.rmtree(ge_root, ignore_errors=True) + FileDataContext.create(project_root_dir=str(ge_root)) + return gx.get_context(project_root_dir=str(ge_root)) + + +def run_ge_profiling( + context: gx.DataContext, + df: pd.DataFrame, + datasource_name: str, + data_connector_name: str, + data_asset_name: str, + expectation_suite_name: str, +) -> Dict[str, Any]: + """Register datasource, build expectations, and capture validation results.""" + + if hasattr(context, "sources"): + datasource = context.sources.add_or_update_pandas(name=datasource_name) + try: + datasource.delete_asset(data_asset_name) + except (gx.exceptions.DataConnectorError, ValueError, KeyError, LookupError, AttributeError): + pass + asset = datasource.add_dataframe_asset(name=data_asset_name) + batch_request = asset.build_batch_request(dataframe=df) + print(f"Datasource registered (fluent): {datasource_name}") + else: + datasource_config = get_datasource_config(datasource_name, data_connector_name) + try: + context.add_datasource(**datasource_config) + print(f"Datasource registered: {datasource_name}") + except gx.exceptions.GreatExpectationsError as err: + print(f"Datasource registration issue: {err}") + batch_request = RuntimeBatchRequest( + datasource_name=datasource_name, + data_connector_name=data_connector_name, + data_asset_name=data_asset_name, + runtime_parameters={"batch_data": df}, + batch_identifiers={"batch_id": DEFAULT_BATCH_ID}, + ) + + try: + context.delete_expectation_suite(expectation_suite_name=expectation_suite_name) + except DataContextError: + pass + + if hasattr(context, "create_expectation_suite"): + context.create_expectation_suite( + expectation_suite_name=expectation_suite_name, overwrite_existing=True + ) + else: + context.add_expectation_suite(expectation_suite_name=expectation_suite_name) + + validator = context.get_validator( + batch_request=batch_request, expectation_suite_name=expectation_suite_name + ) + profiler = UserConfigurableProfiler(profile_dataset=validator) + expectation_suite = profiler.build_suite() + context.add_or_update_expectation_suite(expectation_suite=expectation_suite) + validation_result = validator.validate(result_format="SUMMARY") + + context.build_data_docs() + data_docs_path = ( + Path(context.root_directory) + / "uncommitted" + / "data_docs" + / "local_site" + / "index.html" + ) + + print( + f"Expectation suite saved: {expectation_suite_name} ({len(expectation_suite.expectations)} expectations)" + ) + + return { + "batch_request": serialize_batch_request(batch_request), + "expectation_suite_name": expectation_suite_name, + "expectations_count": len(expectation_suite.expectations), + "validation_result": validation_result.to_json_dict(), + "data_docs_path": os.path.abspath(data_docs_path), + } + + +def assemble_payload( + excel_path: Path, + sheet_name: Any, + dataframe_summary: Dict[str, Any], + ge_summary: Dict[str, Any], +) -> Dict[str, Any]: + """Combine pandas and GE artefacts into a single JSON payload.""" + + return { + "source": { + "excel_path": str(excel_path.resolve()), + "sheet_name": sheet_name, + }, + "analysis": dataframe_summary, + "great_expectations": ge_summary, + } + + +def save_json_payload(payload: Dict[str, Any], output_path: Path) -> None: + """Persist the combined analysis payload to disk.""" + + output_path.parent.mkdir(parents=True, exist_ok=True) + with output_path.open("w", encoding="utf-8") as f: + json.dump(payload, f, ensure_ascii=False, indent=2) + print(f"JSON analysis saved to: {output_path}") + + +def main() -> None: + args = parse_cli_args() + reset_results_dir(clean_results=args.clean_results) + + context = ensure_data_context(args.ge_root) + print(f"Great Expectations Data Context initialized at {context.root_directory}.") + + df = load_excel_as_dataframe(args.excel_path, args.sheet_name, args.header_row) + dataframe_summary = build_analysis_summary(df) + + file_stem = args.excel_path.stem + datasource_name = f"{file_stem}_datasource" + data_connector_name = "runtime_data_connector" + data_asset_name = f"{file_stem}_asset" + expectation_suite_name = f"{file_stem}_suite" + + ge_summary = run_ge_profiling( + context, + df, + datasource_name, + data_connector_name, + data_asset_name, + expectation_suite_name, + ) + + payload = assemble_payload( + excel_path=args.excel_path, + sheet_name=args.sheet_name, + dataframe_summary=dataframe_summary, + ge_summary=ge_summary, + ) + + output_path = RESULTS_DIR / f"{file_stem}_analysis.json" + save_json_payload(payload, output_path) + + print( + f"Data Docs generated. Open in browser: file://{ge_summary['data_docs_path']}" + ) + + +if __name__ == "__main__": + main() diff --git a/ge_v2.py b/ge_v2.py new file mode 100644 index 0000000..68d7a42 --- /dev/null +++ b/ge_v2.py @@ -0,0 +1,104 @@ +import great_expectations as gx +from datasets import load_dataset +import pandas as pd +import os +import webbrowser +from great_expectations.profile.user_configurable_profiler import ( + UserConfigurableProfiler, +) + +# --- 1. 加载Hugging Face数据集并转换为Pandas DataFrame --- +print("🚚 1. 从Hugging Face加载 'millat/e-commerce-orders' 数据集...") +# 加载数据集,只取训练集部分 +hf_dataset = load_dataset("millat/e-commerce-orders", split="train") +# 转换为Pandas DataFrame,这是GX最常使用的数据格式 +df = hf_dataset.to_pandas() +print(f"✅ 数据集加载成功,包含 {len(df)} 行数据。") +print("\n📝 数据集前5行预览:") +print(df.head()) + + +# --- 2. 初始化Great Expectations (GX) 项目 --- +# 这将在当前目录下创建一个名为 "ge_project" 的文件夹来存放所有GX的配置和结果 +print("\n🏗️ 2. 初始化Great Expectations项目...") +context = gx.get_context() +print("✅ GX项目上下文(Context)创建成功。") + + +# --- 3. 添加数据源并将DataFrame连接到GX --- +# 我们将Pandas DataFrame添加为一个数据源,这样GX就知道如何访问它 +print("\n🔗 3. 将DataFrame添加为GX的数据源...") +datasource_name = "my_ecommerce_datasource" +# Fluent API: add_or_update 确保多次运行脚本也不会重复出错 +datasource = context.sources.add_or_update_pandas(name=datasource_name) + +data_asset_name = "orders_table" +data_asset = datasource.add_dataframe_asset(name=data_asset_name, dataframe=df) +print("✅ 数据源和数据资产(Data Asset)配置完成。") + + +# --- 4. 使用自动剖析器生成期望套件 --- +print("\n🔍 4. 使用自动剖析器 (Profiler) 扫描数据并生成期望...") +# 创建一个请求,告诉GX我们要处理哪个数据资产 +batch_request = data_asset.build_batch_request() + +# 定义期望套件的名称 +expectation_suite_name = "ecommerce_profiling_suite" +# 创建或获取期望套件 +try: + suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name) + print(f" - 已找到现有的期望套件 '{expectation_suite_name}'。") +except gx.exceptions.DataContextError: + suite = context.add_expectation_suite(expectation_suite_name=expectation_suite_name) + print(f" - 已创建新的期望套件 '{expectation_suite_name}'。") + +# 构建一个 Validator,供剖析器消费 +validator = context.get_validator( + batch_request=batch_request, + expectation_suite_name=expectation_suite_name, +) +# 这是核心步骤:使用 UserConfigurableProfiler 自动分析数据并创建期望 +profiler = UserConfigurableProfiler(profile_dataset=validator) +suite = profiler.build_suite() +# 保存由剖析器生成的期望套件 +context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name) +print("✅ 自动剖析完成,期望已生成并保存。") + + +# --- 5. 创建并运行检查点(Checkpoint)以验证数据 --- +print("\n🛡️ 5. 创建并运行检查点 (Checkpoint) 以验证数据...") +checkpoint_name = "ecommerce_profiling_checkpoint" +try: + # 检查检查点是否已存在 + checkpoint = context.get_checkpoint(name=checkpoint_name) + print(f" - 已加载现有的检查点 '{checkpoint_name}'。") +except gx.exceptions.CheckpointNotFoundError: + # 如果不存在,则创建一个新的 + checkpoint_config = { + "name": checkpoint_name, + "validations": [ + { + "batch_request": batch_request, + "expectation_suite_name": expectation_suite_name, + } + ], + } + context.add_or_update_checkpoint(**checkpoint_config) + checkpoint = context.get_checkpoint(name=checkpoint_name) + print(f" - 已创建新的检查点 '{checkpoint_name}'。") + +# 运行检查点,它会将数据与我们刚刚生成的期望套件进行对比 +checkpoint_result = checkpoint.run() +print("✅ 检查点运行完毕,数据验证完成。") + + +# --- 6. 构建并打开数据文档(Data Docs)查看结果 --- +print("\n📊 6. 构建并打开数据文档 (Data Docs) 查看剖析报告...") +# 这会生成一个HTML报告 +context.build_data_docs() + +# 获取Data Docs的路径并自动在浏览器中打开 +docs_path = os.path.join(context.root_directory, "uncommitted", "data_docs", "local_site", "index.html") +print(f"\n🎉 剖析报告已生成!请在浏览器中查看:\nfile://{os.path.abspath(docs_path)}") + +webbrowser.open(f"file://{os.path.abspath(docs_path)}") diff --git a/requirements.txt b/requirements.txt new file mode 100644 index 0000000..bf32ac2 --- /dev/null +++ b/requirements.txt @@ -0,0 +1,11 @@ +fastapi>=0.111.0 +uvicorn[standard]>=0.29.0 +pydantic>=2.6.0 +sqlalchemy>=2.0.28 +pymysql>=1.1.0 +great_expectations>=0.18.0,<0.19.0 +pandas>=2.0 +numpy>=1.24 +openpyxl>=3.1 +httpx==0.27.2 +python-dotenv==1.0.1 diff --git a/scripts/__pycache__/deepseek_request.cpython-312.pyc b/scripts/__pycache__/deepseek_request.cpython-312.pyc new file mode 100644 index 0000000..7b67e7b Binary files /dev/null and b/scripts/__pycache__/deepseek_request.cpython-312.pyc differ diff --git a/scripts/deepseek_request.py b/scripts/deepseek_request.py new file mode 100644 index 0000000..220c28e --- /dev/null +++ b/scripts/deepseek_request.py @@ -0,0 +1,110 @@ +from __future__ import annotations + +import argparse +import asyncio +import json +from typing import Any, Dict + +import httpx + + +DEFAULT_URL = "http://127.0.0.1:8000/v1/chat/completions" + + +def parse_args() -> argparse.Namespace: + parser = argparse.ArgumentParser( + description="Send a DeepSeek chat completion request to the local LLM gateway." + ) + parser.add_argument( + "--url", + default=DEFAULT_URL, + help=f"Gateway endpoint URL (default: {DEFAULT_URL})", + ) + parser.add_argument( + "--model", + default="deepseek-chat", + help="DeepSeek model to use (default: deepseek-chat).", + ) + parser.add_argument( + "--system", + default="You are a helpful assistant.", + help="Optional system prompt.", + ) + parser.add_argument( + "--prompt", + default="写一段简短的中文欢迎词。", + help="User message content to send.", + ) + parser.add_argument( + "--temperature", + type=float, + default=0.7, + help="Sampling temperature.", + ) + parser.add_argument( + "--max-tokens", + type=int, + default=512, + help="Maximum tokens for the response.", + ) + parser.add_argument( + "--stream", + action="store_true", + help="Enable streaming mode (DeepSeek supports it).", + ) + parser.add_argument( + "--extra", + help="Optional JSON string with extra provider parameters.", + ) + return parser.parse_args() + + +async def send_request(url: str, payload: Dict[str, Any]) -> Dict[str, Any]: + async with httpx.AsyncClient(timeout=httpx.Timeout(60.0)) as client: + response = await client.post(url, json=payload) + response.raise_for_status() + return response.json() + + +def build_payload(args: argparse.Namespace) -> Dict[str, Any]: + extra_params = None + if args.extra: + try: + extra_params = json.loads(args.extra) + except json.JSONDecodeError as exc: + raise SystemExit(f"Invalid JSON passed to --extra: {exc}") from exc + + payload: Dict[str, Any] = { + "provider": "deepseek", + "model": args.model, + "messages": [ + {"role": "system", "content": args.system}, + {"role": "user", "content": args.prompt}, + ], + "temperature": args.temperature, + "max_tokens": args.max_tokens, + "stream": args.stream, + } + + if extra_params: + payload["extra_params"] = extra_params + + return payload + + +async def main() -> None: + args = parse_args() + payload = build_payload(args) + try: + result = await send_request(args.url, payload) + except httpx.HTTPStatusError as exc: + detail = exc.response.text + raise SystemExit(f"Gateway returned {exc.response.status_code}: {detail}") from exc + except httpx.HTTPError as exc: + raise SystemExit(f"HTTP error calling gateway: {exc}") from exc + + print(json.dumps(result, ensure_ascii=False, indent=2)) + + +if __name__ == "__main__": + asyncio.run(main()) diff --git a/todo.md b/todo.md new file mode 100644 index 0000000..015fbef --- /dev/null +++ b/todo.md @@ -0,0 +1,19 @@ +##基础选型: +Python + Great Expectations +##设计理念: +1. 服务化 (Service-Oriented): 将 GE 的能力封装成一个独立的微服务,通过 RESTful API 对外提供数据分析质量的定义、校验、报告。 +2. 配置驱动 (Configuration-Driven): 所有的期望(Expectations)、数据源连接、校验点(Checkpoints)都是可配置的,期望有默认版本和自定义版本。 +3. 聚焦验证 (Validation):通过GE发现验证问题,解决问题留给后续的数据清洗和修改 +4. 异步(Asynchronous): 应对多任务同时分析,不阻塞流程 +5. 增量分析(Incremental- Analysis ):数据会多次分析和修改才能使用 + +##架构设计: +使用 FastAPI 构建 RESTful API 服务,具备高性能和自带 OpenAPI (Swagger UI) 文档,异步支持度高。 +1. 分析流程管理 +解析 API 请求,管理分析任务的生命周期,支持异步,接受请求-调度服务-存储报告-触发通知。 +2. GE封装 +以编程配置方式管理数据源(Datasources)、期望套件(Expectation Suites)和校验点(Checkpoints),执行数据分析(Profiling)和数据校验(Validation)。 +3. 期望仓库和结果存储 +使用 Git 仓库(mysql?,期望是json)来存储期望仓库,对数据质量规则进行版本控制、审计和协作,将每次的校验结果结构化后存入数据库,便于后续进行数据质量趋势分析、历史追溯和仪表盘展示。 +4. 提供llm api gateway服务 +llm api 调用,多供应商统一网关(OpenAI / Anthropic / Openrouter / Gemini / Qwen / DeepSeek 一键切换) \ No newline at end of file