ГлавнаяБлогТокен бакет: пишем rate limiter на Python с нуля
Python

Токен бакет: пишем rate limiter на Python с нуля

Разберитесь, как работают rate limiters Stripe и Twilio. Напишите токен бакет на Python с нуля и встройте его в HTTP-сервер. Попробуйте сами.

Al
Редакция Algolitalgolit.ru
12 мин чтения13 июня 2026 г.

Вы когда-нибудь получали 429 от Stripe или Twilio и просто ждали, не понимая, как именно считается этот Retry-After? Большинство разработчиков так и делают. Но когда вам нужно построить систему, выдерживающую пиковый трафик, без понимания алгоритма не обойтись. Сегодня мы с нуля напишем токен бакет на чистом Python — именно то, что лежит в основе продакшен-решений.

Почему токен бакет, а не фиксированное окно или лог?

У rate limiting есть три основных подхода:

  • Фиксированное окно: считаем запросы в 60-секундном интервале, сбрасываем на границе. Просто, но если кто-то отправит 100 запросов в 11:59 и ещё 100 в 12:00, вы пропустите 200 за две секунды. Граница — дыра.
  • Скользящий лог: храним временные метки запросов и считаем количество в окне. Точный, но хранить метку на каждый запрос дорого.
  • Токен бакет: токены накапливаются с постоянной скоростью до максимума. Каждый запрос тратит один токен. Если токенов нет — ждёте. Пиковый трафик разрешён до размера бакета.

Токен бакет — это то, что используют в реальных rate limiters. Он естественно обрабатывает всплески: вы копите токены, когда трафик низкий, и тратите их, когда он резко растёт.

Как это работает: ментальная модель

Представьте физическое ведро. Оно вмещает до N токенов. Токены капают со скоростью R в секунду. Каждый API-вызов забирает один токен.

Если отправлять запросы медленно, токены накапливаются. Когда нужно сделать всплеск, вы их тратите. Если ведро пусто — ждёте.

Формула проста:

tokens_now = min(capacity, tokens_last + rate * (now - last_refill_time))

Никакого фонового потока для долива — вы просто вычисляете, сколько токенов должно было накопиться, исходя из прошедшего времени.

Пишем токен бакет на Python

Вот минимальная, но рабочая реализация:

import time
import threading
from dataclasses import dataclass, field

@dataclass
class TokenBucket:
    capacity: float          # максимум токенов
    refill_rate: float       # токенов в секунду
    _tokens: float = field(init=False)
    _last_refill: float = field(init=False)
    _lock: threading.Lock = field(default_factory=threading.Lock, init=False)

    def __post_init__(self):
        self._tokens = self.capacity  # начинаем полным
        self._last_refill = time.monotonic()

    def _refill(self):
        now = time.monotonic()
        elapsed = now - self._last_refill
        gained = elapsed * self.refill_rate
        self._tokens = min(self.capacity, self._tokens + gained)
        self._last_refill = now

    def consume(self, tokens: float = 1.0) -> bool:
        with self._lock:
            self._refill()
            if self._tokens >= tokens:
                self._tokens -= tokens
                return True
            return False

    def wait_and_consume(self, tokens: float = 1.0) -> float:
        """Блокируется, пока токены не появятся. Возвращает время ожидания."""
        while True:
            with self._lock:
                self._refill()
                if self._tokens >= tokens:
                    self._tokens -= tokens
                    return time.monotonic() - self._last_refill
            time.sleep(0.01)

Обратите внимание на ключевые моменты:

  • Блокировка на каждый бакет. В распределённой системе вы бы использовали Redis с WATCH/MULTI/EXEC или Lua-скрипт, но для одного процесса threading.Lock достаточно.
  • Ленивый долив. Токены накапливаются по времени, а не через фоновый поток. Это дёшево и не даёт дрейфа.
  • Стартуем полными. Новый клиент сразу может сделать всплеск, а не ждать заполнения.

Тестируем

import time

def test_burst_then_throttle():
    bucket = TokenBucket(capacity=10, refill_rate=2)  # 2 запроса/сек, всплеск до 10

    # Всплеск: сжигаем все 10 токенов
    burst_results = [bucket.consume() for _ in range(10)]
    assert all(burst_results), "Должен пропустить весь всплеск"

    # Следующий запрос должен упасть (бакет пуст)
    assert not bucket.consume(), "Должен отклонить, когда пусто"

    # Ждём полсекунды — должен появиться ~1 токен
    time.sleep(0.6)
    assert bucket.consume(), "Должен пропустить после частичного долива"

    print("Все тесты всплеска пройдены.")

test_burst_then_throttle()

Вывод:

Все тесты всплеска пройдены.

Подключаем к HTTP-серверу

Вот как встроить это в простой HTTP-сервер без фреймворков:

from http.server import HTTPServer, BaseHTTPRequestHandler

# Один бакет на API-ключ (в реальности — по пользователю/организации)
buckets: dict[str, TokenBucket] = {}

def get_bucket(api_key: str) -> TokenBucket:
    if api_key not in buckets:
        buckets[api_key] = TokenBucket(capacity=20, refill_rate=5)
    return buckets[api_key]

class RateLimitedHandler(BaseHTTPRequestHandler):
    def do_GET(self):
        api_key = self.headers.get("X-API-Key", "anonymous")
        bucket = get_bucket(api_key)
        if bucket.consume():
            self.send_response(200)
            self.end_headers()
            self.wfile.write(b"OK\n")
        else:
            self.send_response(429)
            self.send_header("Retry-After", "1")
            self.end_headers()
            self.wfile.write(b"Too Many Requests\n")

    def log_message(self, fmt, *args):
        pass  # отключаем лог

if __name__ == "__main__":
    server = HTTPServer(("localhost", 8080), RateLimitedHandler)
    print("Сервер на :8080")
    server.serve_forever()

Запустите и протестируйте утилитой curl:

for i in $(seq 1 25); do curl -s -o /dev/null -w "%{http_code}\n" -H "X-API-Key: test123" http://localhost:8080/; done

Вы увидите 20 ответов с кодом 200, затем 429, пока токены не восстановятся.

Правильный Retry-After

При возврате 429 сообщите клиенту, сколько ждать. Вот простой хелпер:

def seconds_until_token(bucket: TokenBucket) -> float:
    deficit = 1.0 - bucket._tokens
    if deficit <= 0:
        return 0.0
    return deficit / bucket.refill_rate

Добавьте в ответ 429:

wait = seconds_until_token(bucket)
self.send_header("Retry-After", str(int(wait) + 1))

Именно так делает Stripe. Клиент читает заголовок, ждёт нужное время и повторяет запрос.

Per-user vs глобальные бакеты

Один бакет на API-ключ даёт изоляцию по пользователям. Но часто нужно и то, и другое:

class TwoTierBucket:
    def __init__(self, global_bucket: TokenBucket, user_bucket: TokenBucket):
        self.global_bucket = global_bucket
        self.user_bucket = user_bucket

    def consume(self) -> bool:
        # Оба должны сработать. Сначала проверяем пользователя (дешевле отклонить раньше).
        if not self.user_bucket.consume():
            return False
        if not self.global_bucket.consume():
            # У пользователя были токены, но глобальный пуст. Возвращаем токен пользователю.
            self.user_bucket._tokens = min(
                self.user_bucket.capacity,
                self.user_bucket._tokens + 1
            )
            return False
        return True

Это ближе к продакшену. Один злоумышленник не исчерпает глобальный бюджет, но даже добросовестный пользователь будет ограничен при пиковом трафике.

Что я узнал, что меня удивило

  • Бакет начинается полным намеренно. Новые интеграции должны иметь возможность сделать всплеск. Если бакет пуст, новые клиенты сразу получают 429 и считают, что API сломан.
  • Вместимость всплеска — продуктовое решение. Высокая ёмкость даёт больше толерантности к пиковым клиентам. Низкая — более жёсткие SLA. Скорость (токенов/сек) контролирует steady-state, а ёмкость — запас прочности.
  • Распределённые токен бакеты — это сложно. В одном процессе 30 строк. В распределённой системе с несколькими серверами нужен общий Redis. Lua-скрипт с INCR, expire и return — стандартный трюк.
  • 429 лучше, чем 500. Если пропустить весь трафик, база данных упадёт, и все получат 500. Хорошо настроенный rate limiter возвращает 429 меньшинству и сохраняет систему здоровой.

Практический вывод: сделайте игрушечную версию того, от чего зависите

Rate limiters стоят за каждым API, который вы вызываете: Stripe, Twilio, OpenAI, AWS, ваши внутренние сервисы. Большинство разработчиков просто воспринимают 429 как данность.

Построив токен бакет, вы поймёте, как числа связаны на самом деле. Ёмкость 20 и скорость 5 означают, что в steady-state вы можете делать 5 запросов в секунду вечно, но при этом поглотить всплеск в 20. Когда теперь увидите Retry-After: 12, вы будете знать, что кто-то вычислил дефицит.

Токен бакет — это 30 строк. Ментальная модель — одно предложение: токены капают, запросы их тратят, полный бакет — значит можно сделать всплеск.

Что делать прямо сейчас: скопируйте класс TokenBucket в свой проект, оберните в него любой эндпоинт и запустите нагрузочное тестирование. Вы увидите, как 429 защищает систему, и поймёте, почему Stripe и Twilio делают именно так.

#rate limiter#токен бакет#Python#429#алгоритмы
Al
Редакция Algolit

Пишем про алгоритмы, подготовку к собеседованиям и карьеру в IT — так, чтобы было понятно и полезно.

Хочешь закрепить знания на практике?

Решай задачи на Algolit — интерактивная платформа для обучения

Начать бесплатно →
Токен бакет: пишем rate limiter на Python с нуля | Algolit