333 lines
11 KiB
Python
333 lines
11 KiB
Python
"""Great Expectations profiling helper for Excel sources.
|
|
|
|
This script loads a user-provided Excel file into pandas, profiles it with
|
|
Great Expectations, writes a lightweight analysis summary to JSON, and exposes
|
|
the path to GE Data Docs for manual inspection.
|
|
"""
|
|
|
|
from __future__ import annotations
|
|
|
|
import argparse
|
|
import json
|
|
import os
|
|
import shutil
|
|
from pathlib import Path
|
|
from typing import Any, Dict
|
|
|
|
import numpy as np
|
|
import pandas as pd
|
|
import great_expectations as gx
|
|
from great_expectations.core.batch import RuntimeBatchRequest
|
|
from great_expectations.data_context import FileDataContext
|
|
from great_expectations.exceptions import (
|
|
DataContextError,
|
|
InvalidDataContextConfigError,
|
|
)
|
|
|
|
try:
|
|
from great_expectations.profile.user_configurable_profiler import (
|
|
UserConfigurableProfiler,
|
|
)
|
|
except ImportError:
|
|
try:
|
|
from great_expectations.profiler.user_configurable_profiler import (
|
|
UserConfigurableProfiler,
|
|
)
|
|
except ImportError as err:
|
|
raise ImportError(
|
|
"UserConfigurableProfiler is not available; please install a compatible "
|
|
"version of great_expectations (>=0.15,<0.19) or add the profiling extra."
|
|
) from err
|
|
|
|
|
|
RESULTS_DIR = Path("results")
|
|
DEFAULT_EXCEL_PATH = Path("file") / "全国品牌.xlsx"
|
|
DEFAULT_BATCH_ID = "initial_profile"
|
|
|
|
|
|
def parse_cli_args() -> argparse.Namespace:
|
|
"""Parse command line options for Excel ingestion."""
|
|
|
|
parser = argparse.ArgumentParser(description="Profile an Excel file with GE")
|
|
parser.add_argument(
|
|
"--excel-path",
|
|
type=Path,
|
|
default=DEFAULT_EXCEL_PATH,
|
|
help="Path to the Excel file to analyse (default: ./file/全国品牌.xlsx)",
|
|
)
|
|
parser.add_argument(
|
|
"--sheet-name",
|
|
default=0,
|
|
help="Excel sheet name or index to load (default: 0)",
|
|
)
|
|
parser.add_argument(
|
|
"--header-row",
|
|
type=int,
|
|
default=0,
|
|
help="Row index (0-based) to use as the header (default: 0)",
|
|
)
|
|
parser.add_argument(
|
|
"--clean-results",
|
|
action="store_true",
|
|
help="Remove the previous results directory before running",
|
|
)
|
|
parser.add_argument(
|
|
"--ge-root",
|
|
type=Path,
|
|
default=Path("gx_project"),
|
|
help="Directory to host the Great Expectations project (default: ./gx_project)",
|
|
)
|
|
return parser.parse_args()
|
|
|
|
|
|
def reset_results_dir(clean_results: bool) -> None:
|
|
"""Remove prior results folder when requested and ensure directory exists."""
|
|
|
|
if clean_results and RESULTS_DIR.exists():
|
|
shutil.rmtree(RESULTS_DIR)
|
|
RESULTS_DIR.mkdir(parents=True, exist_ok=True)
|
|
|
|
|
|
def load_excel_as_dataframe(excel_path: Path, sheet_name: Any, header_row: int) -> pd.DataFrame:
|
|
"""Load Excel data into a DataFrame and provide basic logging."""
|
|
|
|
if not excel_path.exists():
|
|
raise FileNotFoundError(f"Excel file not found: {excel_path}")
|
|
|
|
df = pd.read_excel(excel_path, sheet_name=sheet_name, header=header_row)
|
|
print(f"Loaded Excel data: {excel_path} ({len(df)} rows, {len(df.columns)} columns)")
|
|
return df
|
|
|
|
|
|
def get_datasource_config(datasource_name: str, data_connector_name: str) -> Dict[str, Any]:
|
|
"""Assemble a minimal Pandas datasource configuration."""
|
|
|
|
return {
|
|
"name": datasource_name,
|
|
"class_name": "Datasource",
|
|
"execution_engine": {"class_name": "PandasExecutionEngine"},
|
|
"data_connectors": {
|
|
data_connector_name: {
|
|
"class_name": "RuntimeDataConnector",
|
|
"runtime_keys": ["batch_id"],
|
|
}
|
|
},
|
|
}
|
|
|
|
|
|
def clean_value(value: Any) -> Any:
|
|
"""Convert numpy/pandas scalar types into JSON serialisable values."""
|
|
|
|
if isinstance(value, (np.generic,)):
|
|
return value.item()
|
|
if isinstance(value, pd.Timestamp):
|
|
return str(value)
|
|
if pd.isna(value):
|
|
return None
|
|
return value
|
|
|
|
|
|
def build_column_profile(series: pd.Series) -> Dict[str, Any]:
|
|
"""Generate a compact per-column profile for JSON output."""
|
|
|
|
stats = series.describe()
|
|
profiled_stats = {key: clean_value(val) for key, val in stats.items()}
|
|
|
|
return {
|
|
"name": str(series.name),
|
|
"dtype": str(series.dtype),
|
|
"non_null_count": int(series.count()),
|
|
"null_count": int(series.isna().sum()),
|
|
"unique_count": int(series.nunique(dropna=True)),
|
|
"stats": profiled_stats,
|
|
}
|
|
|
|
|
|
def build_analysis_summary(df: pd.DataFrame, sample_size: int = 5) -> Dict[str, Any]:
|
|
"""Collate basic statistics to accompany GE outputs."""
|
|
|
|
summary = {
|
|
"shape": {"rows": int(df.shape[0]), "columns": int(df.shape[1])},
|
|
"columns": [build_column_profile(df[col]) for col in df.columns],
|
|
"sample_rows": [
|
|
{key: clean_value(value) for key, value in row.items()} for row in df.head(sample_size).to_dict(orient="records")
|
|
],
|
|
}
|
|
return summary
|
|
|
|
|
|
def serialize_batch_request(batch_request: Any) -> Dict[str, Any]:
|
|
"""Convert differing batch request types into plain dictionaries."""
|
|
|
|
if hasattr(batch_request, "to_json_dict"):
|
|
return batch_request.to_json_dict()
|
|
if hasattr(batch_request, "dict"):
|
|
return batch_request.dict()
|
|
if hasattr(batch_request, "model_dump"):
|
|
return batch_request.model_dump()
|
|
return {"repr": repr(batch_request)}
|
|
|
|
|
|
def ensure_data_context(ge_root: Path) -> gx.DataContext:
|
|
"""Create or repair a file-backed GE data context as needed."""
|
|
|
|
ge_root = ge_root.resolve()
|
|
config_path = ge_root / "gx" / "great_expectations.yml"
|
|
if not config_path.exists():
|
|
FileDataContext.create(project_root_dir=str(ge_root))
|
|
|
|
try:
|
|
return gx.get_context(project_root_dir=str(ge_root))
|
|
except InvalidDataContextConfigError:
|
|
print("Existing Great Expectations config invalid; recreating project root.")
|
|
shutil.rmtree(ge_root, ignore_errors=True)
|
|
FileDataContext.create(project_root_dir=str(ge_root))
|
|
return gx.get_context(project_root_dir=str(ge_root))
|
|
|
|
|
|
def run_ge_profiling(
|
|
context: gx.DataContext,
|
|
df: pd.DataFrame,
|
|
datasource_name: str,
|
|
data_connector_name: str,
|
|
data_asset_name: str,
|
|
expectation_suite_name: str,
|
|
) -> Dict[str, Any]:
|
|
"""Register datasource, build expectations, and capture validation results."""
|
|
|
|
if hasattr(context, "sources"):
|
|
datasource = context.sources.add_or_update_pandas(name=datasource_name)
|
|
try:
|
|
datasource.delete_asset(data_asset_name)
|
|
except (gx.exceptions.DataConnectorError, ValueError, KeyError, LookupError, AttributeError):
|
|
pass
|
|
asset = datasource.add_dataframe_asset(name=data_asset_name)
|
|
batch_request = asset.build_batch_request(dataframe=df)
|
|
print(f"Datasource registered (fluent): {datasource_name}")
|
|
else:
|
|
datasource_config = get_datasource_config(datasource_name, data_connector_name)
|
|
try:
|
|
context.add_datasource(**datasource_config)
|
|
print(f"Datasource registered: {datasource_name}")
|
|
except gx.exceptions.GreatExpectationsError as err:
|
|
print(f"Datasource registration issue: {err}")
|
|
batch_request = RuntimeBatchRequest(
|
|
datasource_name=datasource_name,
|
|
data_connector_name=data_connector_name,
|
|
data_asset_name=data_asset_name,
|
|
runtime_parameters={"batch_data": df},
|
|
batch_identifiers={"batch_id": DEFAULT_BATCH_ID},
|
|
)
|
|
|
|
try:
|
|
context.delete_expectation_suite(expectation_suite_name=expectation_suite_name)
|
|
except DataContextError:
|
|
pass
|
|
|
|
if hasattr(context, "create_expectation_suite"):
|
|
context.create_expectation_suite(
|
|
expectation_suite_name=expectation_suite_name, overwrite_existing=True
|
|
)
|
|
else:
|
|
context.add_expectation_suite(expectation_suite_name=expectation_suite_name)
|
|
|
|
validator = context.get_validator(
|
|
batch_request=batch_request, expectation_suite_name=expectation_suite_name
|
|
)
|
|
profiler = UserConfigurableProfiler(profile_dataset=validator)
|
|
expectation_suite = profiler.build_suite()
|
|
context.add_or_update_expectation_suite(expectation_suite=expectation_suite)
|
|
validation_result = validator.validate(result_format="SUMMARY")
|
|
|
|
context.build_data_docs()
|
|
data_docs_path = (
|
|
Path(context.root_directory)
|
|
/ "uncommitted"
|
|
/ "data_docs"
|
|
/ "local_site"
|
|
/ "index.html"
|
|
)
|
|
|
|
print(
|
|
f"Expectation suite saved: {expectation_suite_name} ({len(expectation_suite.expectations)} expectations)"
|
|
)
|
|
|
|
return {
|
|
"batch_request": serialize_batch_request(batch_request),
|
|
"expectation_suite_name": expectation_suite_name,
|
|
"expectations_count": len(expectation_suite.expectations),
|
|
"validation_result": validation_result.to_json_dict(),
|
|
"data_docs_path": os.path.abspath(data_docs_path),
|
|
}
|
|
|
|
|
|
def assemble_payload(
|
|
excel_path: Path,
|
|
sheet_name: Any,
|
|
dataframe_summary: Dict[str, Any],
|
|
ge_summary: Dict[str, Any],
|
|
) -> Dict[str, Any]:
|
|
"""Combine pandas and GE artefacts into a single JSON payload."""
|
|
|
|
return {
|
|
"source": {
|
|
"excel_path": str(excel_path.resolve()),
|
|
"sheet_name": sheet_name,
|
|
},
|
|
"analysis": dataframe_summary,
|
|
"great_expectations": ge_summary,
|
|
}
|
|
|
|
|
|
def save_json_payload(payload: Dict[str, Any], output_path: Path) -> None:
|
|
"""Persist the combined analysis payload to disk."""
|
|
|
|
output_path.parent.mkdir(parents=True, exist_ok=True)
|
|
with output_path.open("w", encoding="utf-8") as f:
|
|
json.dump(payload, f, ensure_ascii=False, indent=2)
|
|
print(f"JSON analysis saved to: {output_path}")
|
|
|
|
|
|
def main() -> None:
|
|
args = parse_cli_args()
|
|
reset_results_dir(clean_results=args.clean_results)
|
|
|
|
context = ensure_data_context(args.ge_root)
|
|
print(f"Great Expectations Data Context initialized at {context.root_directory}.")
|
|
|
|
df = load_excel_as_dataframe(args.excel_path, args.sheet_name, args.header_row)
|
|
dataframe_summary = build_analysis_summary(df)
|
|
|
|
file_stem = args.excel_path.stem
|
|
datasource_name = f"{file_stem}_datasource"
|
|
data_connector_name = "runtime_data_connector"
|
|
data_asset_name = f"{file_stem}_asset"
|
|
expectation_suite_name = f"{file_stem}_suite"
|
|
|
|
ge_summary = run_ge_profiling(
|
|
context,
|
|
df,
|
|
datasource_name,
|
|
data_connector_name,
|
|
data_asset_name,
|
|
expectation_suite_name,
|
|
)
|
|
|
|
payload = assemble_payload(
|
|
excel_path=args.excel_path,
|
|
sheet_name=args.sheet_name,
|
|
dataframe_summary=dataframe_summary,
|
|
ge_summary=ge_summary,
|
|
)
|
|
|
|
output_path = RESULTS_DIR / f"{file_stem}_analysis.json"
|
|
save_json_payload(payload, output_path)
|
|
|
|
print(
|
|
f"Data Docs generated. Open in browser: file://{ge_summary['data_docs_path']}"
|
|
)
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|