"""Great Expectations profiling helper for Excel sources. This script loads a user-provided Excel file into pandas, profiles it with Great Expectations, writes a lightweight analysis summary to JSON, and exposes the path to GE Data Docs for manual inspection. """ from __future__ import annotations import argparse import json import os import shutil from pathlib import Path from typing import Any, Dict import numpy as np import pandas as pd import great_expectations as gx from great_expectations.core.batch import RuntimeBatchRequest from great_expectations.data_context import FileDataContext from great_expectations.exceptions import ( DataContextError, InvalidDataContextConfigError, ) try: from great_expectations.profile.user_configurable_profiler import ( UserConfigurableProfiler, ) except ImportError: try: from great_expectations.profiler.user_configurable_profiler import ( UserConfigurableProfiler, ) except ImportError as err: raise ImportError( "UserConfigurableProfiler is not available; please install a compatible " "version of great_expectations (>=0.15,<0.19) or add the profiling extra." ) from err RESULTS_DIR = Path("results") DEFAULT_EXCEL_PATH = Path("file") / "全国品牌.xlsx" DEFAULT_BATCH_ID = "initial_profile" def parse_cli_args() -> argparse.Namespace: """Parse command line options for Excel ingestion.""" parser = argparse.ArgumentParser(description="Profile an Excel file with GE") parser.add_argument( "--excel-path", type=Path, default=DEFAULT_EXCEL_PATH, help="Path to the Excel file to analyse (default: ./file/全国品牌.xlsx)", ) parser.add_argument( "--sheet-name", default=0, help="Excel sheet name or index to load (default: 0)", ) parser.add_argument( "--header-row", type=int, default=0, help="Row index (0-based) to use as the header (default: 0)", ) parser.add_argument( "--clean-results", action="store_true", help="Remove the previous results directory before running", ) parser.add_argument( "--ge-root", type=Path, default=Path("gx_project"), help="Directory to host the Great Expectations project (default: ./gx_project)", ) return parser.parse_args() def reset_results_dir(clean_results: bool) -> None: """Remove prior results folder when requested and ensure directory exists.""" if clean_results and RESULTS_DIR.exists(): shutil.rmtree(RESULTS_DIR) RESULTS_DIR.mkdir(parents=True, exist_ok=True) def load_excel_as_dataframe(excel_path: Path, sheet_name: Any, header_row: int) -> pd.DataFrame: """Load Excel data into a DataFrame and provide basic logging.""" if not excel_path.exists(): raise FileNotFoundError(f"Excel file not found: {excel_path}") df = pd.read_excel(excel_path, sheet_name=sheet_name, header=header_row) print(f"Loaded Excel data: {excel_path} ({len(df)} rows, {len(df.columns)} columns)") return df def get_datasource_config(datasource_name: str, data_connector_name: str) -> Dict[str, Any]: """Assemble a minimal Pandas datasource configuration.""" return { "name": datasource_name, "class_name": "Datasource", "execution_engine": {"class_name": "PandasExecutionEngine"}, "data_connectors": { data_connector_name: { "class_name": "RuntimeDataConnector", "runtime_keys": ["batch_id"], } }, } def clean_value(value: Any) -> Any: """Convert numpy/pandas scalar types into JSON serialisable values.""" if isinstance(value, (np.generic,)): return value.item() if isinstance(value, pd.Timestamp): return str(value) if pd.isna(value): return None return value def build_column_profile(series: pd.Series) -> Dict[str, Any]: """Generate a compact per-column profile for JSON output.""" stats = series.describe() profiled_stats = {key: clean_value(val) for key, val in stats.items()} return { "name": str(series.name), "dtype": str(series.dtype), "non_null_count": int(series.count()), "null_count": int(series.isna().sum()), "unique_count": int(series.nunique(dropna=True)), "stats": profiled_stats, } def build_analysis_summary(df: pd.DataFrame, sample_size: int = 5) -> Dict[str, Any]: """Collate basic statistics to accompany GE outputs.""" summary = { "shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])}, "columns": [build_column_profile(df[col]) for col in df.columns], "sample_rows": [ {key: clean_value(value) for key, value in row.items()} for row in df.head(sample_size).to_dict(orient="records") ], } return summary def serialize_batch_request(batch_request: Any) -> Dict[str, Any]: """Convert differing batch request types into plain dictionaries.""" if hasattr(batch_request, "to_json_dict"): return batch_request.to_json_dict() if hasattr(batch_request, "dict"): return batch_request.dict() if hasattr(batch_request, "model_dump"): return batch_request.model_dump() return {"repr": repr(batch_request)} def ensure_data_context(ge_root: Path) -> gx.DataContext: """Create or repair a file-backed GE data context as needed.""" ge_root = ge_root.resolve() config_path = ge_root / "gx" / "great_expectations.yml" if not config_path.exists(): FileDataContext.create(project_root_dir=str(ge_root)) try: return gx.get_context(project_root_dir=str(ge_root)) except InvalidDataContextConfigError: print("Existing Great Expectations config invalid; recreating project root.") shutil.rmtree(ge_root, ignore_errors=True) FileDataContext.create(project_root_dir=str(ge_root)) return gx.get_context(project_root_dir=str(ge_root)) def run_ge_profiling( context: gx.DataContext, df: pd.DataFrame, datasource_name: str, data_connector_name: str, data_asset_name: str, expectation_suite_name: str, ) -> Dict[str, Any]: """Register datasource, build expectations, and capture validation results.""" if hasattr(context, "sources"): datasource = context.sources.add_or_update_pandas(name=datasource_name) try: datasource.delete_asset(data_asset_name) except (gx.exceptions.DataConnectorError, ValueError, KeyError, LookupError, AttributeError): pass asset = datasource.add_dataframe_asset(name=data_asset_name) batch_request = asset.build_batch_request(dataframe=df) print(f"Datasource registered (fluent): {datasource_name}") else: datasource_config = get_datasource_config(datasource_name, data_connector_name) try: context.add_datasource(**datasource_config) print(f"Datasource registered: {datasource_name}") except gx.exceptions.GreatExpectationsError as err: print(f"Datasource registration issue: {err}") batch_request = RuntimeBatchRequest( datasource_name=datasource_name, data_connector_name=data_connector_name, data_asset_name=data_asset_name, runtime_parameters={"batch_data": df}, batch_identifiers={"batch_id": DEFAULT_BATCH_ID}, ) try: context.delete_expectation_suite(expectation_suite_name=expectation_suite_name) except DataContextError: pass if hasattr(context, "create_expectation_suite"): context.create_expectation_suite( expectation_suite_name=expectation_suite_name, overwrite_existing=True ) else: context.add_expectation_suite(expectation_suite_name=expectation_suite_name) validator = context.get_validator( batch_request=batch_request, expectation_suite_name=expectation_suite_name ) profiler = UserConfigurableProfiler(profile_dataset=validator) expectation_suite = profiler.build_suite() context.add_or_update_expectation_suite(expectation_suite=expectation_suite) validation_result = validator.validate(result_format="SUMMARY") context.build_data_docs() data_docs_path = ( Path(context.root_directory) / "uncommitted" / "data_docs" / "local_site" / "index.html" ) print( f"Expectation suite saved: {expectation_suite_name} ({len(expectation_suite.expectations)} expectations)" ) return { "batch_request": serialize_batch_request(batch_request), "expectation_suite_name": expectation_suite_name, "expectations_count": len(expectation_suite.expectations), "validation_result": validation_result.to_json_dict(), "data_docs_path": os.path.abspath(data_docs_path), } def assemble_payload( excel_path: Path, sheet_name: Any, dataframe_summary: Dict[str, Any], ge_summary: Dict[str, Any], ) -> Dict[str, Any]: """Combine pandas and GE artefacts into a single JSON payload.""" return { "source": { "excel_path": str(excel_path.resolve()), "sheet_name": sheet_name, }, "analysis": dataframe_summary, "great_expectations": ge_summary, } def save_json_payload(payload: Dict[str, Any], output_path: Path) -> None: """Persist the combined analysis payload to disk.""" output_path.parent.mkdir(parents=True, exist_ok=True) with output_path.open("w", encoding="utf-8") as f: json.dump(payload, f, ensure_ascii=False, indent=2) print(f"JSON analysis saved to: {output_path}") def main() -> None: args = parse_cli_args() reset_results_dir(clean_results=args.clean_results) context = ensure_data_context(args.ge_root) print(f"Great Expectations Data Context initialized at {context.root_directory}.") df = load_excel_as_dataframe(args.excel_path, args.sheet_name, args.header_row) dataframe_summary = build_analysis_summary(df) file_stem = args.excel_path.stem datasource_name = f"{file_stem}_datasource" data_connector_name = "runtime_data_connector" data_asset_name = f"{file_stem}_asset" expectation_suite_name = f"{file_stem}_suite" ge_summary = run_ge_profiling( context, df, datasource_name, data_connector_name, data_asset_name, expectation_suite_name, ) payload = assemble_payload( excel_path=args.excel_path, sheet_name=args.sheet_name, dataframe_summary=dataframe_summary, ge_summary=ge_summary, ) output_path = RESULTS_DIR / f"{file_stem}_analysis.json" save_json_payload(payload, output_path) print( f"Data Docs generated. Open in browser: file://{ge_summary['data_docs_path']}" ) if __name__ == "__main__": main()