Language:Chinese VersionEnglish Version

当系统优雅地而非灾难性地失效时

分布式系统会失效,网络会分区,依赖会变慢,第三方 API 会返回 503。问题不在于你的系统是否会遇到故障,而在于这些故障是否会级联导致完全中断,或者被系统优雅地吸收。熔断器、舱壁和重试模式是每个后端开发者需要掌握的三种基础弹性模式。本指南涵盖了每种模式的工作原理、实现方法,以及关键的是,如何将它们组合使用而不产生新问题。

熔断器模式

熔断器位于你的服务和依赖之间。当依赖开始反复失效时,熔断器”打开”,立即拒绝请求而不是等待超时。这可以防止缓慢或失效的依赖消耗所有线程并导致整个应用程序性能下降。

熔断器有三种状态:

  • 关闭:正常运行状态。请求正常通过。故障被计数。
  • 打开:依赖正在失效。请求立即被拒绝并返回回退响应。不会向失效的服务发起调用。
  • 半开:在恢复超时后,允许少量测试请求通过。如果它们成功,熔断器关闭。如果它们失败,熔断器再次打开。

在 Python 中实现熔断器

import time
import threading
from enum import Enum
from dataclasses import dataclass, field
from typing import Callable, Optional

class State(Enum):
    CLOSED = "closed"
    OPEN = "open"
    HALF_OPEN = "half_open"

@dataclass
class CircuitBreaker:
    failure_threshold: int = 5
    success_threshold: int = 2
    timeout: float = 30.0  # seconds before trying half-open

    _state: State = field(default=State.CLOSED, init=False)
    _failure_count: int = field(default=0, init=False)
    _success_count: int = field(default=0, init=False)
    _last_failure_time: Optional[float] = field(default=None, init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)

    def call(self, func: Callable, *args, fallback=None, **kwargs):
        with self._lock:
            if self._state == State.OPEN:
                if time.time() - self._last_failure_time >= self.timeout:
                    self._state = State.HALF_OPEN
                    self._success_count = 0
                else:
                    if fallback is not None:
                        return fallback()
                    raise CircuitOpenError("Circuit is open")

        try:
            result = func(*args, **kwargs)
            self._on_success()
            return result
        except Exception as e:
            self._on_failure()
            if fallback is not None:
                return fallback()
            raise

    def _on_success(self):
        with self._lock:
            if self._state == State.HALF_OPEN:
                self._success_count += 1
                if self._success_count >= self.success_threshold:
                    self._state = State.CLOSED
                    self._failure_count = 0
            elif self._state == State.CLOSED:
                self._failure_count = 0

    def _on_failure(self):
        with self._lock:
            self._failure_count += 1
            self._last_failure_time = time.time()
            if (self._state == State.CLOSED and
                    self._failure_count >= self.failure_threshold):
                self._state = State.OPEN
            elif self._state == State.HALF_OPEN:
                self._state = State.OPEN

class CircuitOpenError(Exception):
    pass

# Usage
payment_circuit = CircuitBreaker(failure_threshold=5, timeout=30.0)

def charge_card(amount, card_token):
    return payment_circuit.call(
        payment_gateway.charge,
        amount,
        card_token,
        fallback=lambda: {"status": "queued", "message": "Payment queued for retry"}
    )

在 Java/Kotlin 中使用 Resilience4j

在 JVM 生态系统中,Resilience4j 是事实上的标准:

CircuitBreakerConfig config = CircuitBreakerConfig.custom()
    .slidingWindowType(SlidingWindowType.COUNT_BASED)
    .slidingWindowSize(10)
    .failureRateThreshold(50)        // 当 50% 的请求失败时打开
    .waitDurationInOpenState(Duration.ofSeconds(30))
    .permittedNumberOfCallsInHalfOpenState(3)
    .recordExceptions(IOException.class, TimeoutException.class)
    .ignoreExceptions(BusinessException.class)  // 不计算业务逻辑错误
    .build();

CircuitBreaker circuitBreaker = CircuitBreaker.of("payment-service", config);

// 装饰你的函数
Supplier<PaymentResult> decoratedSupplier =
    CircuitBreaker.decorateSupplier(circuitBreaker, () -> paymentService.charge(amount));

// 执行并提供回退
Try.ofSupplier(decoratedSupplier)
    .recover(CallNotPermittedException.class, ex -> PaymentResult.queued())
    .recover(IOException.class, ex -> PaymentResult.failed(ex.getMessage()));

舱壁模式

舱壁通过为不同操作分配独立的资源池来隔离故障。这个名称来源于船体中的防水隔舱——如果一个隔舱进水,其他隔舱仍然完好。

没有舱壁时,缓慢的外部 API 可能会耗尽整个线程池,阻塞不相关的操作。有了舱壁,每个下游依赖都有自己的有限资源池。

线程池舱壁

import concurrent.futures
from contextlib import contextmanager

class BulkheadPool:
    def __init__(self, name: str, max_workers: int, max_queue: int = 10):
        self.name = name
        self.executor = concurrent.futures.ThreadPoolExecutor(
            max_workers=max_workers,
            thread_name_prefix=f"bulkhead-{name}"
        )
        self.semaphore = threading.Semaphore(max_workers + max_queue)

    def submit(self, fn, *args, **kwargs):
        if not self.semaphore.acquire(blocking=False):
            raise BulkheadFullError(
                f"舱壁 '{self.name}' 已满 — 拒绝请求"
            )
        def release_and_call():
            try:
                return fn(*args, **kwargs)
            finally:
                self.semaphore.release()

        return self.executor.submit(release_and_call)

class BulkheadFullError(Exception):
    pass

# 为不同的外部服务使用独立的线程池
email_pool = BulkheadPool("email-service", max_workers=5, max_queue=20)
payment_pool = BulkheadPool("payment-service", max_workers=10, max_queue=5)
analytics_pool = BulkheadPool("analytics", max_workers=3, max_queue=50)

# 现在慢速邮件服务不会阻塞支付处理
def send_confirmation_email(user_id, order_id):
    try:
        future = email_pool.submit(email_service.send, user_id, order_id)
        return future.result(timeout=5.0)
    except BulkheadFullError:
        # 优雅降级 — 稍后排队处理
        queue_email_task(user_id, order_id)
    except concurrent.futures.TimeoutError:
        queue_email_task(user_id, order_id)

Resilience4j 中的信号量舱壁

BulkheadConfig bulkheadConfig = BulkheadConfig.custom()
    .maxConcurrentCalls(10)
    .maxWaitDuration(Duration.ofMillis(100))  // 如果已满不要等待太久
    .build();

Bulkhead bulkhead = Bulkhead.of("database-reads", bulkheadConfig);

Supplier<List<User>> decoratedSupplier =
    Bulkhead.decorateSupplier(bulkhead, () -> userRepository.findActive());

Try.ofSupplier(decoratedSupplier)
    .recover(BulkheadFullException.class, ex -> getCachedActiveUsers());

重试模式

重试处理瞬时故障 — 网络波动、短暂资源耗尽、临时数据库锁定。简单实现会导致惊群问题。正确实现则使系统具有自愈能力。

带抖动的指数退避

最重要的规则:永远不要立即重试,也不要使用固定的重试间隔。使用带抖动的指数退避:

import random
import time

def exponential_backoff_with_jitter(
    func,
    max_retries: int = 5,
    base_delay: float = 0.1,    # 100ms base
    max_delay: float = 30.0,    # 30 second cap
    jitter_factor: float = 0.5,
    retryable_exceptions: tuple = (IOError, TimeoutError)
):
    last_exception = None

    for attempt in range(max_retries + 1):
        try:
            return func()
        except retryable_exceptions as e:
            last_exception = e
            if attempt == max_retries:
                break

            # Exponential backoff: 0.1, 0.2, 0.4, 0.8, 1.6...
            delay = min(base_delay * (2 ** attempt), max_delay)

            # Full jitter: random value between 0 and delay
            # This spreads retries across time, preventing thundering herd
            jittered_delay = random.uniform(0, delay * jitter_factor) + delay * (1 - jitter_factor)

            time.sleep(jittered_delay)

    raise last_exception

# Usage
result = exponential_backoff_with_jitter(
    lambda: requests.post("https://api.example.com/charge", json=payload, timeout=5),
    max_retries=3,
    retryable_exceptions=(requests.exceptions.Timeout, requests.exceptions.ConnectionError)
)

重试预算:防止重试放大

一种危险的故障模式:服务A调用服务B,服务B调用服务C。每一层都重试3次。在故障情况下,每个原始请求会产生 3 x 3 x 3 = 27 个请求。使用重试预算来限制整个系统的总重试次数:

class RetryBudget:
    """
    仅当重试率低于阈值时才允许重试。
    受Google的SRE重试预算方法启发。
    """
    def __init__(self, budget_percent: float = 10.0, window_seconds: float = 60.0):
        self.budget_percent = budget_percent
        self.window_seconds = window_seconds
        self._requests = []
        self._retries = []
        self._lock = threading.Lock()

    def can_retry(self) -> bool:
        now = time.time()
        cutoff = now - self.window_seconds

        with self._lock:
            self._requests = [t for t in self._requests if t > cutoff]
            self._retries = [t for t in self._retries if t > cutoff]

            if not self._requests:
                return True

            retry_rate = len(self._retries) / len(self._requests) * 100
            return retry_rate < self.budget_percent

    def record_request(self):
        with self._lock:
            self._requests.append(time.time())

    def record_retry(self):
        with self._lock:
            self._retries.append(time.time())

payment_retry_budget = RetryBudget(budget_percent=10.0)

结合模式:正确的顺序

这些模式最好一起使用,顺序也很重要。从外到内的标准组合方式:

  1. 舱壁模式 — 最外层,限制并发调用
  2. 断路器模式 — 包装操作,在断路器打开时快速失败
  3. 重试模式 — 最内层,在断路器看到之前重试瞬时故障
  4. 超时模式 — 应用于实际的网络调用
# Python示例组合所有三种模式
def resilient_payment_charge(amount, card_token):
    def charge():
        return payment_circuit.call(
            lambda: exponential_backoff_with_jitter(
                lambda: payment_api.charge(amount, card_token, timeout=3.0),
                max_retries=2
            ),
            fallback=lambda: {"status": "queued"}
        )

    try:
        future = payment_pool.submit(charge)
        return future.result(timeout=10.0)
    except BulkheadFullError:
        return {"status": "rejected", "reason": "system_busy"}

超时:每个人都忘记的模式

没有超时,以上任何模式都无法正常工作。一个无限挂起的操作会使断路器失效(失败计数永远不会增加)和舱壁失效(线程永远保持占用)。

import requests
from requests.adapters import HTTPAdapter
from urllib3.util.retry import Retry

# 配置连接级别和读取超时
session = requests.Session()
session.mount('https://', HTTPAdapter(
    max_retries=Retry(total=0)  # 让我们的重试逻辑处理这个
))

# 总是同时设置连接超时和读取超时
response = session.post(
    "https://api.payment.com/charge",
    json=payload,
    timeout=(3.05, 10)  # (connect_timeout, read_timeout)
)

弹性模式的可观测性

如果你无法观察这些模式,它们就毫无用处。为每个状态转换添加监控:

from prometheus_client import Counter, Histogram, Gauge

circuit_state = Gauge('circuit_breaker_state',
    '断路器状态 (0=关闭, 1=打开, 2=半开)',
    ['service'])
retry_attempts = Counter('retry_attempts_total',
    '总重试次数', ['service', 'outcome'])
bulkhead_rejected = Counter('bulkhead_rejected_total',
    '被舱壁拒绝的请求数', ['pool'])

设置警报:断路器打开超过2分钟意味着你的依赖确实已关闭。舱壁拒绝率超过1%意味着你的配置不足。重试率超过5%意味着某些问题持续存在。

实际实施清单

  • 每个外部 HTTP 调用都有连接超时和读取超时
  • 不进行没有指数退避和抖动的重试
  • 每个外部服务依赖都配置了断路器
  • 为不同的外部服务使用独立的线程池(隔板)
  • 为每个断路操作定义了降级响应
  • 记录并警报所有断路状态变化
  • 对你的弹性模式进行负载测试 — 在生产环境前进行混沌工程

这些模式的投入会在依赖项凌晨 2 点降级时立即得到回报,你的服务会继续提供缓存或降级响应,而不是抛出 500 错误。从一开始就构建弹性 — 将其重构到脆弱系统中要困难得多。

By Michael Sun

Founder and Editor-in-Chief of NovVista. Software engineer with hands-on experience in cloud infrastructure, full-stack development, and DevOps. Writes about AI tools, developer workflows, server architecture, and the practical side of technology. Based in China.

Leave a Reply

Your email address will not be published. Required fields are marked *

You missed