Files
data-ge/ge_v2.py
2025-10-29 00:38:57 +08:00

105 lines
4.4 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

import great_expectations as gx
from datasets import load_dataset
import pandas as pd
import os
import webbrowser
from great_expectations.profile.user_configurable_profiler import (
UserConfigurableProfiler,
)
# --- 1. 加载Hugging Face数据集并转换为Pandas DataFrame ---
print("🚚 1. 从Hugging Face加载 'millat/e-commerce-orders' 数据集...")
# 加载数据集,只取训练集部分
hf_dataset = load_dataset("millat/e-commerce-orders", split="train")
# 转换为Pandas DataFrame这是GX最常使用的数据格式
df = hf_dataset.to_pandas()
print(f"✅ 数据集加载成功,包含 {len(df)} 行数据。")
print("\n📝 数据集前5行预览:")
print(df.head())
# --- 2. 初始化Great Expectations (GX) 项目 ---
# 这将在当前目录下创建一个名为 "ge_project" 的文件夹来存放所有GX的配置和结果
print("\n🏗️ 2. 初始化Great Expectations项目...")
context = gx.get_context()
print("✅ GX项目上下文Context创建成功。")
# --- 3. 添加数据源并将DataFrame连接到GX ---
# 我们将Pandas DataFrame添加为一个数据源这样GX就知道如何访问它
print("\n🔗 3. 将DataFrame添加为GX的数据源...")
datasource_name = "my_ecommerce_datasource"
# Fluent API: add_or_update 确保多次运行脚本也不会重复出错
datasource = context.sources.add_or_update_pandas(name=datasource_name)
data_asset_name = "orders_table"
data_asset = datasource.add_dataframe_asset(name=data_asset_name, dataframe=df)
print("✅ 数据源和数据资产Data Asset配置完成。")
# --- 4. 使用自动剖析器生成期望套件 ---
print("\n🔍 4. 使用自动剖析器 (Profiler) 扫描数据并生成期望...")
# 创建一个请求告诉GX我们要处理哪个数据资产
batch_request = data_asset.build_batch_request()
# 定义期望套件的名称
expectation_suite_name = "ecommerce_profiling_suite"
# 创建或获取期望套件
try:
suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name)
print(f" - 已找到现有的期望套件 '{expectation_suite_name}'")
except gx.exceptions.DataContextError:
suite = context.add_expectation_suite(expectation_suite_name=expectation_suite_name)
print(f" - 已创建新的期望套件 '{expectation_suite_name}'")
# 构建一个 Validator供剖析器消费
validator = context.get_validator(
batch_request=batch_request,
expectation_suite_name=expectation_suite_name,
)
# 这是核心步骤:使用 UserConfigurableProfiler 自动分析数据并创建期望
profiler = UserConfigurableProfiler(profile_dataset=validator)
suite = profiler.build_suite()
# 保存由剖析器生成的期望套件
context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name)
print("✅ 自动剖析完成,期望已生成并保存。")
# --- 5. 创建并运行检查点Checkpoint以验证数据 ---
print("\n🛡️ 5. 创建并运行检查点 (Checkpoint) 以验证数据...")
checkpoint_name = "ecommerce_profiling_checkpoint"
try:
# 检查检查点是否已存在
checkpoint = context.get_checkpoint(name=checkpoint_name)
print(f" - 已加载现有的检查点 '{checkpoint_name}'")
except gx.exceptions.CheckpointNotFoundError:
# 如果不存在,则创建一个新的
checkpoint_config = {
"name": checkpoint_name,
"validations": [
{
"batch_request": batch_request,
"expectation_suite_name": expectation_suite_name,
}
],
}
context.add_or_update_checkpoint(**checkpoint_config)
checkpoint = context.get_checkpoint(name=checkpoint_name)
print(f" - 已创建新的检查点 '{checkpoint_name}'")
# 运行检查点,它会将数据与我们刚刚生成的期望套件进行对比
checkpoint_result = checkpoint.run()
print("✅ 检查点运行完毕,数据验证完成。")
# --- 6. 构建并打开数据文档Data Docs查看结果 ---
print("\n📊 6. 构建并打开数据文档 (Data Docs) 查看剖析报告...")
# 这会生成一个HTML报告
context.build_data_docs()
# 获取Data Docs的路径并自动在浏览器中打开
docs_path = os.path.join(context.root_directory, "uncommitted", "data_docs", "local_site", "index.html")
print(f"\n🎉 剖析报告已生成!请在浏览器中查看:\nfile://{os.path.abspath(docs_path)}")
webbrowser.open(f"file://{os.path.abspath(docs_path)}")