周日凌晨3点,数据管道决定给你上一课
我被叫醒处理的周日次数多到数不清,而根本原因几乎从来不是硬件故障或代码错误。问题出在数据管道。具体来说,是一个ETL作业,它在周一到周五运行完美,然后在周日悄无声息地崩溃了,因为它在周日预期的数据与一周中其他时间获取的数据完全不同。
生产环境的ETL管道在周日(以及节假日、月末边界、夏令时转换时)会失败,因为它们建立在关于数据的隐式假设上,而这些假设只在大多数情况下成立。最危险的假设是未来会与近期相似。数据管道正是这个假设死亡的地方。
管道在周末失效的七种方式
1. 数据量下降触发误报(或掩盖真实问题)
大多数数据管道监控都包含行数检查:如果今天的批次比昨天的行数显著减少,就触发警报。在周日,对于许多B2B应用来说,交易量确实会合法地下降40-70%。警报每周日都会触发。几周后,值班工程师会将其静音。然后某个周日,管道真的出问题了,但没有人注意到,因为警报已经被静音了。
# 天真的方法:静态阈值
def check_row_count(table, expected_min=10000):
count = db.query(f"SELECT COUNT(*) FROM {table} WHERE date = CURRENT_DATE")
if count < expected_min:
alert(f"Low row count in {table}: {count} < {expected_min}")
# 更好的方法:考虑星期几的阈值
def check_row_count_smart(table):
current_day = datetime.now().strftime("%A")
# 计算过去4个同一天的预期范围
stats = db.query("""
SELECT
AVG(row_count) as avg_count,
STDDEV(row_count) as std_count
FROM pipeline_metrics
WHERE table_name = %s
AND day_of_week = %s
AND run_date > CURRENT_DATE - INTERVAL '28 days'
""", [table, current_day])
lower_bound = max(0, stats.avg_count - 2 * stats.std_count)
upper_bound = stats.avg_count + 2 * stats.std_count
count = get_today_count(table)
if count < lower_bound or count > upper_bound:
alert(
f"Anomalous row count in {table}: {count}. "
f"Expected {lower_bound:.0f}-{upper_bound:.0f} for {current_day}."
)
2. 源系统维护窗口
许多上游数据源会在周末安排维护。您的管道会在常规时间尝试连接,但会收到连接被拒绝或超时错误,然后要么大声失败,要么更糟糕的是,成功获取空数据集或部分数据集。
# 一个健壮的提取步骤应该处理维护窗口
import tenacity
import httpx
@tenacity.retry(
stop=tenacity.stop_after_attempt(5),
wait=tenacity.wait_exponential(multiplier=1, min=60, max=3600),
retry=tenacity.retry_if_exception_type((httpx.ConnectError, httpx.ReadTimeout)),
before_sleep=lambda retry_state: logger.warning(
"source_unavailable",
attempt=retry_state.attempt_number,
wait=retry_state.next_action.sleep,
source="billing-api",
),
)
async def extract_billing_data(date: str):
response = await httpx.AsyncClient().get(
f"https://billing-api.internal/export?date={date}",
timeout=300,
)
if response.status_code == 503:
raise httpx.ConnectError("源系统处于维护模式")
data = response.json()
# 关键:验证我们获取的是完整数据集
if data.get("is_partial"):
raise PartialDataError(
f"源系统为 {date} 返回了部分数据。 "
f"预期约 {data.get('expected_count', 'unknown')} 条记录, "
f"实际得到 {len(data['records'])} 条。可能是维护窗口期间。"
)
return data
3. 时区和夏令时转换
时钟每年会变化两次。而每年两次,假设一天有24小时的ETL管道就会崩溃。一个处理从午夜到午夜每日批次数据的管道,在夏令时转换期间要么会处理23小时的数据,要么会处理25小时的数据,导致计数不足或重复处理。
# 不好的:简单的日期算术
from datetime import datetime, timedelta
def get_date_range(date):
start = datetime(date.year, date.month, date.day)
end = start + timedelta(days=1) # 不总是24小时!
return start, end
# 好的:时区感知的日期边界
from zoneinfo import ZoneInfo
def get_date_range_safe(date, tz_name="America/New_York"):
tz = ZoneInfo(tz_name)
start = datetime(date.year, date.month, date.day, tzinfo=tz)
end = datetime(date.year, date.month, date.day, tzinfo=tz) + timedelta(days=1)
# 在夏令时转换期间:
# 春季向前:end - start = 23小时(这是正确的!)
# 秋季向后:end - start = 25小时(这也正确!)
# 转换为UTC用于数据库查询
return start.astimezone(ZoneInfo("UTC")), end.astimezone(ZoneInfo("UTC"))
# 更好的方法:全部以UTC存储和处理
# 只在表示层转换为本地时间
4. 迟到的数据
周末数据常常延迟到达。在早上6点处理”昨日数据”的批处理系统通常假设数据在那时已经完整。在工作日,这通常是正确的。但在周日,周六晚上的数据可能要到中午才能到达,因为源系统的批处理导出是在周末计划运行的。
# 基于水印的完整性检查
async def wait_for_data_completeness(
source: str,
date: str,
expected_watermark: str = "23:59",
timeout_hours: int = 6,
check_interval_minutes: int = 15,
):
"""等待源数据包含直到预期水印的事件。"""
start_time = datetime.now()
timeout = timedelta(hours=timeout_hours)
while datetime.now() - start_time < timeout:
latest_event = await db.fetchval(
"SELECT MAX(event_timestamp) FROM raw_events "
"WHERE source = $1 AND event_date = $2",
source, date,
)
if latest_event and latest_event.strftime("%H:%M") >= expected_watermark:
logger.info("data_complete", source=source, date=date,
latest_event=str(latest_event))
return True
logger.info("waiting_for_data", source=source, date=date,
latest_event=str(latest_event), expected=expected_watermark)
await asyncio.sleep(check_interval_minutes * 60)
raise DataIncompleteError(
f"{source}/{date}的数据在{timeout_hours}小时后仍未完整。 "
f"最新事件: {latest_event}。预期水印: {expected_watermark}。"
)
5. 在周五实施的架构变更
有人在周五下午部署了一个架构变更(完全违背常理)。应用程序运行良好,因为它能够处理新的架构。但是,从副本或事件流读取的ETL管道并不知道新列或重命名字段。它在周五晚上和周六运行良好,因为新字段是可空的,管道会忽略额外的字段。到了周日,第一条包含必需新字段的记录到达,管道就崩溃了。
# 转换步骤中的防御性架构处理
import pandas as pd
def transform_orders(raw_df: pd.DataFrame) -> pd.DataFrame:
# 定义带有默认值的预期架构
expected_columns = {
"order_id": {"type": "str", "required": True},
"user_id": {"type": "str", "required": True},
"amount": {"type": "float", "required": True},
"currency": {"type": "str", "default": "USD"},
"discount_code": {"type": "str", "default": None},
"fulfillment_type": {"type": "str", "default": "standard"},
}
# 检查缺失的必需列
missing_required = [
col for col, spec in expected_columns.items()
if spec.get("required") and col not in raw_df.columns
]
if missing_required:
raise SchemaError(
f"缺少必需列: {missing_required}. "
f"可用列: {list(raw_df.columns)}. "
f"这可能表明上游架构发生了变更。"
)
# 添加缺失的可选列并设置默认值
for col, spec in expected_columns.items():
if col not in raw_df.columns and "default" in spec:
raw_df[col] = spec["default"]
logger.warning("schema_drift", column=col, action="added_default",
default=spec["default"])
# 警告意外的列
known_columns = set(expected_columns.keys())
unknown_columns = set(raw_df.columns) - known_columns
if unknown_columns:
logger.warning("schema_drift", new_columns=list(unknown_columns),
action="ignored")
return raw_df[list(expected_columns.keys())]
6. 备份作业导致的资源竞争
许多组织会在周末安排数据库备份、索引重建和 VACUUM 操作。这些维护任务与在同一时间运行的 ETL 管道竞争 I/O 和 CPU 资源。原本在周二只需 20 分钟的管道,在周日需要 3 小时,因为数据库同时还在运行完整备份。
# 在运行重查询前检查是否有竞争操作
async def check_database_load(max_active_queries: int = 20):
active = await db.fetchval("""
SELECT COUNT(*) FROM pg_stat_activity
WHERE state = 'active'
AND query NOT LIKE '%pg_stat_activity%'
AND backend_type = 'client backend'
""")
if active > max_active_queries:
logger.warning("high_database_load", active_queries=active,
threshold=max_active_queries)
return False
return True
# 同时检查长时间运行的维护操作
async def check_for_maintenance():
maintenance = await db.fetch("""
SELECT pid, query, state, now() - query_start as duration
FROM pg_stat_activity
WHERE query ILIKE '%vacuum%'
OR query ILIKE '%reindex%'
OR query ILIKE '%pg_dump%'
ORDER BY duration DESC
""")
if maintenance:
logger.info("maintenance_detected",
operations=[{
"pid": m["pid"],
"type": m["query"][:50],
"duration": str(m["duration"])
} for m in maintenance])
return maintenance
7. “没有有效数据”的问题
在某些星期日,确实可能没有数据。一个 B2B SaaS 产品在星期日可能没有任何交易。数据管道需要区分”因为没有数据所以是星期日”和”因为提取失败所以没有数据”这两种情况。这听起来比实际要难。
# 跟踪提取元数据以区分空数据与提取失败
@dataclass
class ExtractionResult:
source: str
date: str
record_count: int
extraction_started: datetime
extraction_completed: datetime
source_responded: bool
source_reported_complete: bool
@property
def is_empty_but_valid(self) -> bool:
return (
self.record_count == 0
and self.source_responded
and self.source_reported_complete
)
@property
def is_suspicious_empty(self) -> bool:
return (
self.record_count == 0
and (not self.source_responded or not self.source_reported_complete)
)
async def extract_and_validate(source: str, date: str):
result = await extract_data(source, date)
if result.is_suspicious_empty:
alert(
f"从 {source} 提取 {date} 的数据返回了 0 条记录 "
f"但数据源未确认完整性。 "
f"数据源响应: {result.source_responded}。 "
f"在将此运行标记为成功前需要调查。"
)
elif result.is_empty_but_valid:
logger.info("empty_extraction_valid", source=source, date=date,
note="数据源确认该期间无数据")
构建弹性管道:检查清单
- 使用星期感知的阈值进行容量检查和异常检测。
- 处理维护窗口,使用重试和指数退避策略。
- 以 UTC 格式存储和处理时间戳。仅在展示层转换为本地时间。
- 实现基于水印的完整性检查,而不是假设数据在固定时间已准备好。
- 防御性地验证模式,包括已知列检查和默认值处理。
- 监控资源竞争,避免在备份窗口期间运行重型 ETL。
- 使用数据源元数据区分有效空数据和提取失败。
- 使管道具有幂等性。应该能够重新运行任何日期的任何管道而不会重复数据。
- 使用周末数据进行测试。您的 CI 管道应包含测试装置,模拟星期日的数据量、夏令时转换和延迟到达的数据。
- 维护管道日历,将已知的不规则事件(节假日、夏令时、月末、季末)映射到预期的管道行为变化。
运营成熟度模型
| 级别 | 特征 | 周日行为 |
|---|---|---|
| 1: 临时性 | 无监控,手动修复 | 直到周一才知道出问题 |
| 2: 响应式 | 基本警报,手动修复 | 值班人员收到通知,手动重新运行 |
| 3: 防御性 | 重试机制,验证,感知日期的阈值 | 大多数问题自动修复,真正故障会通知 |
| 4: 主动性 | 异常检测,完整性检查,自动修复 | 流水线自动调整到周末模式 |
大多数团队处于第2级。达到第3级需要一周的专注工程工作。达到第4级需要持续投入,但通过减少值班负担和提高数据质量而获得回报。
数据流水线在周日崩溃,因为工程师是在周二构建它们的。周二是干净、完整且准时到达的数据。周日的数据则是混乱、不完整、延迟,有时甚至完全缺失。如果你的流水线无法优雅地处理周日数据,它就不是生产流水线——它只是一个碰巧每天运行的周二流水线。为周日而构建,其他所有日子都会自行解决。
