import great_expectations as gx from datasets import load_dataset import pandas as pd import os import webbrowser from great_expectations.profile.user_configurable_profiler import ( UserConfigurableProfiler, ) # --- 1. 加载Hugging Face数据集并转换为Pandas DataFrame --- print("🚚 1. 从Hugging Face加载 'millat/e-commerce-orders' 数据集...") # 加载数据集,只取训练集部分 hf_dataset = load_dataset("millat/e-commerce-orders", split="train") # 转换为Pandas DataFrame,这是GX最常使用的数据格式 df = hf_dataset.to_pandas() print(f"✅ 数据集加载成功,包含 {len(df)} 行数据。") print("\n📝 数据集前5行预览:") print(df.head()) # --- 2. 初始化Great Expectations (GX) 项目 --- # 这将在当前目录下创建一个名为 "ge_project" 的文件夹来存放所有GX的配置和结果 print("\n🏗️ 2. 初始化Great Expectations项目...") context = gx.get_context() print("✅ GX项目上下文(Context)创建成功。") # --- 3. 添加数据源并将DataFrame连接到GX --- # 我们将Pandas DataFrame添加为一个数据源,这样GX就知道如何访问它 print("\n🔗 3. 将DataFrame添加为GX的数据源...") datasource_name = "my_ecommerce_datasource" # Fluent API: add_or_update 确保多次运行脚本也不会重复出错 datasource = context.sources.add_or_update_pandas(name=datasource_name) data_asset_name = "orders_table" data_asset = datasource.add_dataframe_asset(name=data_asset_name, dataframe=df) print("✅ 数据源和数据资产(Data Asset)配置完成。") # --- 4. 使用自动剖析器生成期望套件 --- print("\n🔍 4. 使用自动剖析器 (Profiler) 扫描数据并生成期望...") # 创建一个请求,告诉GX我们要处理哪个数据资产 batch_request = data_asset.build_batch_request() # 定义期望套件的名称 expectation_suite_name = "ecommerce_profiling_suite" # 创建或获取期望套件 try: suite = context.get_expectation_suite(expectation_suite_name=expectation_suite_name) print(f" - 已找到现有的期望套件 '{expectation_suite_name}'。") except gx.exceptions.DataContextError: suite = context.add_expectation_suite(expectation_suite_name=expectation_suite_name) print(f" - 已创建新的期望套件 '{expectation_suite_name}'。") # 构建一个 Validator,供剖析器消费 validator = context.get_validator( batch_request=batch_request, expectation_suite_name=expectation_suite_name, ) # 这是核心步骤:使用 UserConfigurableProfiler 自动分析数据并创建期望 profiler = UserConfigurableProfiler(profile_dataset=validator) suite = profiler.build_suite() # 保存由剖析器生成的期望套件 context.save_expectation_suite(expectation_suite=suite, expectation_suite_name=expectation_suite_name) print("✅ 自动剖析完成,期望已生成并保存。") # --- 5. 创建并运行检查点(Checkpoint)以验证数据 --- print("\n🛡️ 5. 创建并运行检查点 (Checkpoint) 以验证数据...") checkpoint_name = "ecommerce_profiling_checkpoint" try: # 检查检查点是否已存在 checkpoint = context.get_checkpoint(name=checkpoint_name) print(f" - 已加载现有的检查点 '{checkpoint_name}'。") except gx.exceptions.CheckpointNotFoundError: # 如果不存在,则创建一个新的 checkpoint_config = { "name": checkpoint_name, "validations": [ { "batch_request": batch_request, "expectation_suite_name": expectation_suite_name, } ], } context.add_or_update_checkpoint(**checkpoint_config) checkpoint = context.get_checkpoint(name=checkpoint_name) print(f" - 已创建新的检查点 '{checkpoint_name}'。") # 运行检查点,它会将数据与我们刚刚生成的期望套件进行对比 checkpoint_result = checkpoint.run() print("✅ 检查点运行完毕,数据验证完成。") # --- 6. 构建并打开数据文档(Data Docs)查看结果 --- print("\n📊 6. 构建并打开数据文档 (Data Docs) 查看剖析报告...") # 这会生成一个HTML报告 context.build_data_docs() # 获取Data Docs的路径并自动在浏览器中打开 docs_path = os.path.join(context.root_directory, "uncommitted", "data_docs", "local_site", "index.html") print(f"\n🎉 剖析报告已生成!请在浏览器中查看:\nfile://{os.path.abspath(docs_path)}") webbrowser.open(f"file://{os.path.abspath(docs_path)}")