我们团队的数据仓库构建在Snowflake之上,ETL流程由dbt驱动。一个持续困扰我们的问题是数据质量的验证。业务分析师在Confluence上用自然语言描述数据规则,数据工程师再将其手动翻译成dbt的测试或独立的SQL脚本。这个过程不仅效率低下,而且极易出错。需求文档和实际测试代码之间的鸿沟,是敏捷流程中的一个明显断点。当数据出现问题时,追溯是哪个环节的沟通或翻译出了错,总是一场耗时的争论。
这个痛点促使我思考一个问题:能否构建一个框架,让业务分析师用他们熟悉的语言定义的规则,可以直接被系统理解并自动化执行?这自然地指向了行为驱动开发(BDD)和Gherkin语法。如果业务规则能以Given-When-Then的形式写下来,并直接触发对Snowflake中数据的校验,那么沟通壁垒将被彻底打破。
初步构想是利用Python作为胶水语言,结合一个BDD框架(如behave)来解析Gherkin,然后动态生成并执行Snowflake SQL查询。这个框架的核心任务必须是:将业务语义精准地转换为可执行的数据质量断言。
技术选型决策
- BDD框架: Python生态中有
behave和pytest-bdd。我选择了behave,因为它更纯粹地专注于BDD,配置和目录结构约定清晰,没有pytest那么多额外的复杂性,对于构建一个专用工具来说更直接。 - Snowflake连接: 官方的
snowflake-connector-python是首选。它提供了完整的API支持、连接池管理以及对各种认证方式的支持,是生产环境中最稳妥的选择。 - 配置管理: 为了在不同环境(开发、测试、生产)中切换Snowflake连接信息,使用
configparser读取.ini文件,并通过环境变量覆盖,这是标准的工程实践。绝不能将敏感信息硬编码在代码中。
步骤化实现
我们的目标是创建一个可执行的规范。首先,规划项目结构,使其清晰地分离关注点:
data_quality_bdd/
├── features/
│ ├── environment.py
│ ├── steps/
│ │ └── validation_steps.py
│ └── sales_data.feature
├── config/
│ └── snowflake.ini
└── requirements.txt
-
features/:behave框架约定的目录,存放所有.feature文件和步骤定义。 -
sales_data.feature: Gherkin文件,用业务语言描述数据质量规则。 -
environment.py:behave的钩子文件,用于管理上下文、数据库连接等全局资源。 -
steps/validation_steps.py: 实现Gherkin语句到Python代码和SQL查询的映射。 -
config/snowflake.ini: 存放非敏感的Snowflake连接配置。
1. 定义业务规则:sales_data.feature
我们从一个具体的业务场景开始。假设我们有一个ORDERS表和一个CUSTOMERS表,需要验证它们的数据质量。
# features/sales_data.feature
Feature: 销售数据质量验证
为了确保下游报表的准确性,我们需要对核心销售数据表的完整性和一致性进行验证。
Background:
Given 我连接到 "DEV" 环境的Snowflake数据库
Scenario: 验证ORDERS表的核心字段非空
Given 我选择表 "PUBLIC"."ORDERS"
When 我检查列 "O_ORDERKEY"
Then 该列不应包含任何NULL值
When 我检查列 "O_CUSTKEY"
Then 该列不应包含任何NULL值
Scenario: 验证ORDERS表的外键完整性
Given 我选择表 "PUBLIC"."ORDERS"
Then 列 "O_CUSTKEY" 的值必须全部存在于表 "PUBLIC"."CUSTOMERS" 的 "C_CUSTKEY" 列中
Scenario: 验证订单总价的业务规则
# 订单总价 O_TOTALPRICE 必须大于等于0
Given 我选择表 "PUBLIC"."ORDERS"
Then 列 "O_TOTALPRICE" 中的所有值必须满足条件 ">= 0"
这个文件清晰、无歧义,业务人员和数据分析师完全可以理解并编写。这里的关键在于,这些语句是高度结构化的,为我们后续的解析和SQL生成提供了可能。
2. 管理环境与连接:environment.py
这是框架的支柱。我们需要在所有测试开始前建立Snowflake连接,并在结束后安全地关闭它。
# features/environment.py
import os
import configparser
import logging
from snowflake import connector
from snowflake.connector import SnowflakeConnection
# 配置日志记录
logging.basicConfig(level=logging.INFO, format='%(asctime)s - %(levelname)s - %(message)s')
def before_all(context):
"""
在所有feature执行前运行。
负责读取配置并建立Snowflake连接。
"""
config = configparser.ConfigParser()
# 优先从环境变量读取配置文件路径,增强灵活性
config_path = os.getenv('SNOWFLAKE_CONFIG_PATH', 'config/snowflake.ini')
if not os.path.exists(config_path):
raise FileNotFoundError(f"Snowflake配置文件未找到: {config_path}")
config.read(config_path)
context.config = config
context.connections = {}
logging.info("配置加载完成。")
def get_snowflake_connection(context, env):
"""
根据环境名称获取或创建一个Snowflake连接。
连接被缓存以避免重复创建。
"""
if env in context.connections:
conn = context.connections[env]
if not conn.is_closed():
return conn
logging.info(f"正在为环境 '{env}' 创建新的Snowflake连接...")
# 凭证优先从环境变量获取,这是生产环境的最佳实践
user = os.getenv(f'SNOWFLAKE_USER_{env}')
password = os.getenv(f'SNOWFLAKE_PASSWORD_{env}')
account = os.getenv(f'SNOWFLAKE_ACCOUNT_{env}')
# 如果环境变量不存在,则从配置文件回退
if not all([user, password, account]):
logging.warning("部分连接信息未在环境变量中找到,将从配置文件读取。")
try:
user = user or context.config.get(env, 'user')
password = password or context.config.get(env, 'password')
account = account or context.config.get(env, 'account')
except (configparser.NoSectionError, configparser.NoOptionError) as e:
raise ValueError(f"环境 '{env}' 的配置不完整,请检查环境变量或配置文件。错误: {e}")
try:
conn = connector.connect(
user=user,
password=password,
account=account,
warehouse=context.config.get(env, 'warehouse'),
database=context.config.get(env, 'database'),
schema=context.config.get(env, 'schema'),
role=context.config.get(env, 'role')
)
context.connections[env] = conn
logging.info(f"环境 '{env}' 的Snowflake连接已成功建立。")
return conn
except Exception as e:
logging.error(f"连接到Snowflake环境 '{env}' 失败: {e}")
raise
def before_scenario(context, scenario):
"""
在每个Scenario执行前,重置上下文状态
"""
context.table_name = None
context.column_name = None
context.sql_query = None
def after_all(context):
"""
在所有feature执行后运行。
负责关闭所有打开的数据库连接。
"""
for env, conn in context.connections.items():
if isinstance(conn, SnowflakeConnection) and not conn.is_closed():
logging.info(f"正在关闭环境 '{env}' 的Snowflake连接...")
conn.close()
logging.info(f"环境 '{env}' 的连接已关闭。")
这份代码考虑了生产环境的几个关键点:
- 配置分离: 将连接参数放在
snowflake.ini中。 - 凭证安全: 通过环境变量(
SNOWFLAKE_USER_DEV,SNOWFLAKE_PASSWORD_DEV等)注入敏感信息,避免硬编码。 - 连接管理: 使用
context对象在behave的生命周期内传递连接,并确保在测试结束后关闭。 - 日志: 详尽的日志记录了连接的创建和关闭过程,便于问题排查。
示例snowflake.ini文件:
# config/snowflake.ini
[DEV]
warehouse = COMPUTE_WH
database = MY_DATABASE
schema = PUBLIC
role = SYSADMIN
3. 核心逻辑:validation_steps.py
这是框架的大脑,它将Gherkin的自然语言步骤翻译成SQL。
# features/steps/validation_steps.py
import logging
from behave import given, when, then
from environment import get_snowflake_connection
@given('我连接到 "{env}" 环境的Snowflake数据库')
def step_impl_connect_to_snowflake(context, env):
"""
获取指定环境的数据库连接,并将其设置到上下文中。
"""
try:
context.conn = get_snowflake_connection(context, env)
context.cursor = context.conn.cursor()
except Exception as e:
# 捕获连接错误并使测试失败
assert False, f"无法连接到Snowflake环境 '{env}': {e}"
@given('我选择表 "{table_name}"')
def step_impl_select_table(context, table_name):
"""
将当前操作的目标表名存入上下文。
"""
context.table_name = table_name
logging.info(f"当前操作表: {context.table_name}")
@when('我检查列 "{column_name}"')
def step_impl_select_column(context, column_name):
"""
将当前操作的目标列名存入上下文。
"""
context.column_name = column_name
logging.info(f"当前检查列: {context.column_name}")
@then('该列不应包含任何NULL值')
def step_impl_check_not_null(context):
"""
生成并执行SQL,检查指定列是否有NULL值。
"""
if not all([context.table_name, context.column_name]):
assert False, "步骤依赖错误:必须先指定表和列。"
sql = f"""
SELECT COUNT(*)
FROM {context.table_name}
WHERE {context.column_name} IS NULL;
"""
logging.info(f"执行SQL: {sql.strip()}")
try:
context.cursor.execute(sql)
result = context.cursor.fetchone()
null_count = result[0]
assert null_count == 0, f"数据质量校验失败:表 '{context.table_name}' 的列 '{context.column_name}' 发现 {null_count} 个NULL值。"
logging.info(f"校验通过:表 '{context.table_name}' 的列 '{context.column_name}' 没有NULL值。")
except Exception as e:
assert False, f"执行SQL校验时出错: {e}"
@then('列 "{child_col}" 的值必须全部存在于表 "{parent_table}" 的 "{parent_col}" 列中')
def step_impl_check_foreign_key(context, child_col, parent_table, parent_col):
"""
检查外键完整性。
使用LEFT JOIN来查找在子表中存在但在父表中不存在的键。
"""
if not context.table_name:
assert False, "步骤依赖错误:必须先指定主操作表。"
sql = f"""
SELECT COUNT(t1.{child_col})
FROM {context.table_name} AS t1
LEFT JOIN {parent_table} AS t2 ON t1.{child_col} = t2.{parent_col}
WHERE t2.{parent_col} IS NULL AND t1.{child_col} IS NOT NULL;
"""
logging.info(f"执行SQL: {sql.strip()}")
try:
context.cursor.execute(sql)
result = context.cursor.fetchone()
orphan_count = result[0]
assert orphan_count == 0, f"外键校验失败:表 '{context.table_name}' 的列 '{child_col}' 中有 {orphan_count} 个值在父表 '{parent_table}' 的 '{parent_col}' 中不存在。"
logging.info(f"外键校验通过:列 '{child_col}' 引用完整。")
except Exception as e:
assert False, f"执行外键校验SQL时出错: {e}"
@then('列 "{column_name}" 中的所有值必须满足条件 "{condition}"')
def step_impl_check_custom_condition(context, column_name, condition):
"""
检查列值是否满足一个自定义的SQL条件。
这是一个非常灵活的步骤,可以扩展以支持复杂的业务规则。
"""
if not context.table_name:
assert False, "步骤依赖错误:必须先指定表。"
# 简单的安全检查,防止SQL注入。在真实项目中,这里需要更复杂的解析和白名单机制。
allowed_operators = ['>', '<', '>=', '<=', '=', '!=', 'LIKE', 'NOT LIKE']
operator_found = any(op in condition for op in allowed_operators)
if not operator_found:
assert False, f"不安全的或不支持的条件: '{condition}'。只允许基本的比较运算符。"
sql = f"""
SELECT COUNT(*)
FROM {context.table_name}
WHERE NOT ({column_name} {condition});
"""
logging.info(f"执行SQL: {sql.strip()}")
try:
context.cursor.execute(sql)
result = context.cursor.fetchone()
failed_count = result[0]
assert failed_count == 0, f"业务规则校验失败:表 '{context.table_name}' 的列 '{column_name}' 中有 {failed_count} 个值不满足条件 '{condition}'。"
logging.info(f"业务规则校验通过:列 '{column_name}' 所有值均满足 '{condition}'。")
except Exception as e:
assert False, f"执行自定义条件校验SQL时出错: {e}"
这里的代码将Gherkin的声明性描述转化为了命令式的SQL执行。每个@then装饰的函数都是一个独立的、可复用的验证逻辑。它接收Gherkin语句中的参数,动态构建SQL,执行并断言结果。一个常见的错误是在这里构建过于复杂的SQL生成逻辑,正确的做法是保持每个步骤函数的原子性,让它们只做一件事。
4. 执行流程与结果
现在,所有组件都已就绪。在运行前,需要设置环境变量:
export SNOWFLAKE_CONFIG_PATH='config/snowflake.ini'
export SNOWFLAKE_USER_DEV='your_user'
export SNOWFLAKE_PASSWORD_DEV='your_password'
export SNOWFLAKE_ACCOUNT_DEV='your_account_identifier'
然后,在项目根目录运行behave命令:
behave
如果所有数据质量规则都满足,输出会是这样的:
Feature: 销售数据质量验证 # features/sales_data.feature:3
Background: # features/sales_data.feature:6
Given 我连接到 "DEV" 环境的Snowflake数据库 # features/steps/validation_steps.py:10
Scenario: 验证ORDERS表的核心字段非空 # features/sales_data.feature:9
Given 我选择表 "PUBLIC"."ORDERS" # features/steps/validation_steps.py:21
When 我检查列 "O_ORDERKEY" # features/steps/validation_steps.py:29
Then 该列不应包含任何NULL值 # features/steps/validation_steps.py:37
When 我检查列 "O_CUSTKEY" # features/steps/validation_steps.py:29
Then 该列不应包含任何NULL值 # features/steps/validation_steps.py:37
Scenario: 验证ORDERS表的外键完整性 # features/sales_data.feature:16
Given 我选择表 "PUBLIC"."ORDERS" # features/steps/validation_steps.py:21
Then 列 "O_CUSTKEY" 的值必须全部存在于表 "PUBLIC"."CUSTOMERS" 的 "C_CUSTKEY" 列中 # features/steps/validation_steps.py:59
... (其他场景)
1 feature passed, 0 failed, 0 skipped
3 scenarios passed, 0 failed, 0 skipped
...
如果存在数据问题,例如O_CUSTKEY中有NULL值,behave会立即失败并给出明确的错误信息:
...
Scenario: 验证ORDERS表的核心字段非空 # features/sales_data.feature:9
...
Then 该列不应包含任何NULL值 # features/steps/validation_steps.py:37 ... failed
Assertion Failed: 数据质量校验失败:表 'PUBLIC.ORDERS' 的列 'O_CUSTKEY' 发现 15 个NULL值。
Failing scenarios:
features/sales_data.feature:9 验证ORDERS表的核心字段非空
0 features passed, 1 failed, 0 skipped
1 scenario passed, 1 failed, 1 skipped
...
这个输出是 actionable 的。它精确地指出了哪个规则(Gherkin场景)、哪个表、哪个列出了问题,以及问题的严重程度(15个NULL值)。
架构流程图
整个工作流可以用一个Mermaid图清晰地表示:
graph TD
subgraph "业务与分析团队"
A[编写 Gherkin .feature 文件]
end
subgraph "CI/CD Pipeline 或本地执行"
B{behave 命令行工具}
end
subgraph "Python BDD 框架"
C[解析 .feature 文件]
D[environment.py: 建立Snowflake连接]
E[steps.py: 匹配Gherkin步骤]
F[动态生成SQL查询]
end
subgraph "Snowflake 数据仓库"
G[Snowflake Python Connector]
H[(Snowflake DB)]
end
I[输出测试结果: 成功/失败]
A --> B
B --> C
C -- 触发执行 --> D
D -- 提供连接 --> E
C -- 匹配语句 --> E
E --> F
F -- 通过Connector --> G
G -- 执行SQL --> H
H -- 返回结果 --> G
G -- 返回数据 --> E
E -- 断言结果 --> I
当前方案的局限性与未来迭代
这个框架有效地解决了沟通鸿沟,实现了数据质量规则的自动化。但在真实项目中,它还有几个可以改进的地方。
首先,性能。当前的实现为每个Then步骤执行一次独立的SQL查询。对于一个包含上百个规则的feature文件,这会产生大量的网络往返和查询开销。一个重要的优化方向是设计一种机制,将针对同一张表的多个校验合并成一个单一的、更复杂的SQL查询。例如,使用CASE语句或CTE(公共表表达式)在一个查询中计算多个指标,然后在Python端对结果进行拆解和断言。
其次,SQL生成的健壮性。step_impl_check_custom_condition中的条件检查非常初级,容易被滥用或产生语法错误的SQL。引入一个更专业的SQL查询构建库(如pypika)可以更安全、更结构化地生成SQL,有效防止SQL注入,并能处理更复杂的查询逻辑。
最后,扩展性。目前的步骤定义还比较有限。框架需要能轻松扩展以支持更复杂的数据质量维度,例如:
- 唯一性检查 (
COUNT(DISTINCT col) = COUNT(col)) - 数值分布检查(均值、标准差、百分位数在预期范围内)
- 字符串格式校验(使用正则表达式)
- 跨表一致性检查(例如,订单明细总和应等于订单头表的总价)
这些都需要定义新的Gherkin语句,并实现对应的、可复用的步骤函数。这为框架的长期演进指明了清晰的路径。