init,llm gateway & import_analyse

This commit is contained in:
zhaoawd
2025-10-29 00:38:57 +08:00
commit 0af5f19af9
62 changed files with 3169 additions and 0 deletions

332
ge_v1.py Normal file
View File

@ -0,0 +1,332 @@
"""Great Expectations profiling helper for Excel sources.
This script loads a user-provided Excel file into pandas, profiles it with
Great Expectations, writes a lightweight analysis summary to JSON, and exposes
the path to GE Data Docs for manual inspection.
"""
from __future__ import annotations
import argparse
import json
import os
import shutil
from pathlib import Path
from typing import Any, Dict
import numpy as np
import pandas as pd
import great_expectations as gx
from great_expectations.core.batch import RuntimeBatchRequest
from great_expectations.data_context import FileDataContext
from great_expectations.exceptions import (
DataContextError,
InvalidDataContextConfigError,
)
try:
from great_expectations.profile.user_configurable_profiler import (
UserConfigurableProfiler,
)
except ImportError:
try:
from great_expectations.profiler.user_configurable_profiler import (
UserConfigurableProfiler,
)
except ImportError as err:
raise ImportError(
"UserConfigurableProfiler is not available; please install a compatible "
"version of great_expectations (>=0.15,<0.19) or add the profiling extra."
) from err
RESULTS_DIR = Path("results")
DEFAULT_EXCEL_PATH = Path("file") / "全国品牌.xlsx"
DEFAULT_BATCH_ID = "initial_profile"
def parse_cli_args() -> argparse.Namespace:
"""Parse command line options for Excel ingestion."""
parser = argparse.ArgumentParser(description="Profile an Excel file with GE")
parser.add_argument(
"--excel-path",
type=Path,
default=DEFAULT_EXCEL_PATH,
help="Path to the Excel file to analyse (default: ./file/全国品牌.xlsx)",
)
parser.add_argument(
"--sheet-name",
default=0,
help="Excel sheet name or index to load (default: 0)",
)
parser.add_argument(
"--header-row",
type=int,
default=0,
help="Row index (0-based) to use as the header (default: 0)",
)
parser.add_argument(
"--clean-results",
action="store_true",
help="Remove the previous results directory before running",
)
parser.add_argument(
"--ge-root",
type=Path,
default=Path("gx_project"),
help="Directory to host the Great Expectations project (default: ./gx_project)",
)
return parser.parse_args()
def reset_results_dir(clean_results: bool) -> None:
"""Remove prior results folder when requested and ensure directory exists."""
if clean_results and RESULTS_DIR.exists():
shutil.rmtree(RESULTS_DIR)
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
def load_excel_as_dataframe(excel_path: Path, sheet_name: Any, header_row: int) -> pd.DataFrame:
"""Load Excel data into a DataFrame and provide basic logging."""
if not excel_path.exists():
raise FileNotFoundError(f"Excel file not found: {excel_path}")
df = pd.read_excel(excel_path, sheet_name=sheet_name, header=header_row)
print(f"Loaded Excel data: {excel_path} ({len(df)} rows, {len(df.columns)} columns)")
return df
def get_datasource_config(datasource_name: str, data_connector_name: str) -> Dict[str, Any]:
"""Assemble a minimal Pandas datasource configuration."""
return {
"name": datasource_name,
"class_name": "Datasource",
"execution_engine": {"class_name": "PandasExecutionEngine"},
"data_connectors": {
data_connector_name: {
"class_name": "RuntimeDataConnector",
"runtime_keys": ["batch_id"],
}
},
}
def clean_value(value: Any) -> Any:
"""Convert numpy/pandas scalar types into JSON serialisable values."""
if isinstance(value, (np.generic,)):
return value.item()
if isinstance(value, pd.Timestamp):
return value.isoformat()
if pd.isna(value):
return None
return value
def build_column_profile(series: pd.Series) -> Dict[str, Any]:
"""Generate a compact per-column profile for JSON output."""
stats = series.describe()
profiled_stats = {key: clean_value(val) for key, val in stats.items()}
return {
"name": str(series.name),
"dtype": str(series.dtype),
"non_null_count": int(series.count()),
"null_count": int(series.isna().sum()),
"unique_count": int(series.nunique(dropna=True)),
"stats": profiled_stats,
}
def build_analysis_summary(df: pd.DataFrame, sample_size: int = 5) -> Dict[str, Any]:
"""Collate basic statistics to accompany GE outputs."""
summary = {
"shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])},
"columns": [build_column_profile(df[col]) for col in df.columns],
"sample_rows": [
{key: clean_value(value) for key, value in row.items()} for row in df.head(sample_size).to_dict(orient="records")
],
}
return summary
def serialize_batch_request(batch_request: Any) -> Dict[str, Any]:
"""Convert differing batch request types into plain dictionaries."""
if hasattr(batch_request, "to_json_dict"):
return batch_request.to_json_dict()
if hasattr(batch_request, "dict"):
return batch_request.dict()
if hasattr(batch_request, "model_dump"):
return batch_request.model_dump()
return {"repr": repr(batch_request)}
def ensure_data_context(ge_root: Path) -> gx.DataContext:
"""Create or repair a file-backed GE data context as needed."""
ge_root = ge_root.resolve()
config_path = ge_root / "gx" / "great_expectations.yml"
if not config_path.exists():
FileDataContext.create(project_root_dir=str(ge_root))
try:
return gx.get_context(project_root_dir=str(ge_root))
except InvalidDataContextConfigError:
print("Existing Great Expectations config invalid; recreating project root.")
shutil.rmtree(ge_root, ignore_errors=True)
FileDataContext.create(project_root_dir=str(ge_root))
return gx.get_context(project_root_dir=str(ge_root))
def run_ge_profiling(
context: gx.DataContext,
df: pd.DataFrame,
datasource_name: str,
data_connector_name: str,
data_asset_name: str,
expectation_suite_name: str,
) -> Dict[str, Any]:
"""Register datasource, build expectations, and capture validation results."""
if hasattr(context, "sources"):
datasource = context.sources.add_or_update_pandas(name=datasource_name)
try:
datasource.delete_asset(data_asset_name)
except (gx.exceptions.DataConnectorError, ValueError, KeyError, LookupError, AttributeError):
pass
asset = datasource.add_dataframe_asset(name=data_asset_name)
batch_request = asset.build_batch_request(dataframe=df)
print(f"Datasource registered (fluent): {datasource_name}")
else:
datasource_config = get_datasource_config(datasource_name, data_connector_name)
try:
context.add_datasource(**datasource_config)
print(f"Datasource registered: {datasource_name}")
except gx.exceptions.GreatExpectationsError as err:
print(f"Datasource registration issue: {err}")
batch_request = RuntimeBatchRequest(
datasource_name=datasource_name,
data_connector_name=data_connector_name,
data_asset_name=data_asset_name,
runtime_parameters={"batch_data": df},
batch_identifiers={"batch_id": DEFAULT_BATCH_ID},
)
try:
context.delete_expectation_suite(expectation_suite_name=expectation_suite_name)
except DataContextError:
pass
if hasattr(context, "create_expectation_suite"):
context.create_expectation_suite(
expectation_suite_name=expectation_suite_name, overwrite_existing=True
)
else:
context.add_expectation_suite(expectation_suite_name=expectation_suite_name)
validator = context.get_validator(
batch_request=batch_request, expectation_suite_name=expectation_suite_name
)
profiler = UserConfigurableProfiler(profile_dataset=validator)
expectation_suite = profiler.build_suite()
context.add_or_update_expectation_suite(expectation_suite=expectation_suite)
validation_result = validator.validate(result_format="SUMMARY")
context.build_data_docs()
data_docs_path = (
Path(context.root_directory)
/ "uncommitted"
/ "data_docs"
/ "local_site"
/ "index.html"
)
print(
f"Expectation suite saved: {expectation_suite_name} ({len(expectation_suite.expectations)} expectations)"
)
return {
"batch_request": serialize_batch_request(batch_request),
"expectation_suite_name": expectation_suite_name,
"expectations_count": len(expectation_suite.expectations),
"validation_result": validation_result.to_json_dict(),
"data_docs_path": os.path.abspath(data_docs_path),
}
def assemble_payload(
excel_path: Path,
sheet_name: Any,
dataframe_summary: Dict[str, Any],
ge_summary: Dict[str, Any],
) -> Dict[str, Any]:
"""Combine pandas and GE artefacts into a single JSON payload."""
return {
"source": {
"excel_path": str(excel_path.resolve()),
"sheet_name": sheet_name,
},
"analysis": dataframe_summary,
"great_expectations": ge_summary,
}
def save_json_payload(payload: Dict[str, Any], output_path: Path) -> None:
"""Persist the combined analysis payload to disk."""
output_path.parent.mkdir(parents=True, exist_ok=True)
with output_path.open("w", encoding="utf-8") as f:
json.dump(payload, f, ensure_ascii=False, indent=2)
print(f"JSON analysis saved to: {output_path}")
def main() -> None:
args = parse_cli_args()
reset_results_dir(clean_results=args.clean_results)
context = ensure_data_context(args.ge_root)
print(f"Great Expectations Data Context initialized at {context.root_directory}.")
df = load_excel_as_dataframe(args.excel_path, args.sheet_name, args.header_row)
dataframe_summary = build_analysis_summary(df)
file_stem = args.excel_path.stem
datasource_name = f"{file_stem}_datasource"
data_connector_name = "runtime_data_connector"
data_asset_name = f"{file_stem}_asset"
expectation_suite_name = f"{file_stem}_suite"
ge_summary = run_ge_profiling(
context,
df,
datasource_name,
data_connector_name,
data_asset_name,
expectation_suite_name,
)
payload = assemble_payload(
excel_path=args.excel_path,
sheet_name=args.sheet_name,
dataframe_summary=dataframe_summary,
ge_summary=ge_summary,
)
output_path = RESULTS_DIR / f"{file_stem}_analysis.json"
save_json_payload(payload, output_path)
print(
f"Data Docs generated. Open in browser: file://{ge_summary['data_docs_path']}"
)
if __name__ == "__main__":
main()