Узнайте, как оптимизировать API для миллионов записей и внедрить механизмы отказоустойчивости. Практические примеры кода на Python.
Вы когда-нибудь задумывались, как заставить API обрабатывать миллионы запросов за секунды и при этом не падать при сбоях внешних сервисов? В этой статье я делюсь опытом стажировки, где мы решали именно эти задачи. Вы узнаете, как оптимизировать пакетную вставку CSV, реализовать повторные попытки с экспоненциальной задержкой и организовать graceful degradation — всё с рабочим кодом на Python.
Представьте: пользователи загружают CSV-файлы с миллионами профилей. Нельзя вставлять строки по одной — это 500 000 отдельных запросов к базе. Нельзя загружать весь файл в память — не хватит RAM. Загрузка не должна блокировать другие запросы. И одна плохая строка не должна валить всю загрузку.
Решение — потоковая обработка с пакетной вставкой.
import csv
import io
from typing import Set, List, Dict
BATCH_SIZE = 10000 # оптимальный размер пакета
CHUNK_SIZE = 262144 # 256 КБ
async def ingest_csv(file_content: bytes):
"""Потоковая загрузка CSV с пакетной вставкой"""
reader = csv.DictReader(io.StringIO(file_content.decode('utf-8', errors='replace')))
batch: List[Dict] = []
seen_names: Set[str] = set()
errors = []
for row in reader:
# Валидация строки
if not validate_row(row):
errors.append(f"Строка {reader.line_num}: неверный формат")
continue
# Дедупликация внутри пакета
name = row['name'].strip().lower()
if name in seen_names:
errors.append(f"Строка {reader.line_num}: дубликат имени {name}")
continue
seen_names.add(name)
batch.append(row)
# Вставка пакета
if len(batch) >= BATCH_SIZE:
await insert_batch(batch)
batch.clear()
# Вставка остатка
if batch:
await insert_batch(batch)
return {"imported": len(seen_names), "errors": errors}
async def insert_batch(rows: List[Dict]):
"""Пакетная вставка с игнорированием конфликтов"""
# Здесь выполняется INSERT ... ON CONFLICT DO NOTHING
pass
Ключевые моменты: читаем файл чанками по 256 КБ, валидируем каждую строку, накапливаем 10 000 валидных строк и вставляем одним запросом. Дедупликация по имени в рамках сессии предотвращает конфликты. Каждый пакет вставляется независимо — ошибка в одном не откатывает другие.
Первая попытка — загрузить весь CSV в память — провалилась на 500 000 строках. Перешли на потоковую обработку. Вторая проблема — дубликаты внутри одного файла: два одинаковых имени проходили валидацию, но конфликтовали при вставке. Решение — глобальное множество seen_names. Третья — неожиданные форматы: UTF-16, лишние запятые, возраст "twenty-five". Добавили валидаторы для каждого поля.
Платформа MeetMind зависела от Gemini, Resend и LiveKit. Любой из них мог упасть. Нужно было внедрить повторные попытки с экспоненциальной задержкой и механизм fallback для стенограмм.
import asyncio
import logging
from typing import Callable, Tuple, Type, Any
async def retry_async(
func: Callable[..., Any],
*args,
max_retries: int = 3,
initial_delay: float = 2.0,
backoff_factor: float = 2.0,
exceptions: Tuple[Type[BaseException], ...] = (Exception,),
task_name: str = "Task",
**kwargs
) -> Any:
"""Универсальная функция повторных попыток"""
delay = initial_delay
for attempt in range(1, max_retries + 1):
try:
return await func(*args, **kwargs)
except exceptions as e:
if attempt == max_retries:
logging.error(f"{task_name}: все попытки исчерпаны", exc_info=True)
raise
logging.warning(f"{task_name}: попытка {attempt}/{max_retries} не удалась: {e}. Повтор через {delay:.2f}с")
await asyncio.sleep(delay)
delay *= backoff_factor # экспоненциальное увеличение
Функция принимает любую асинхронную функцию, количество попыток, начальную задержку и типы исключений, которые нужно повторять. Это позволяет не ретраить, например, ValueError от некорректного ввода.
Когда отдельные фрагменты стенограммы потеряны, система восстанавливает их из JSON-поля сессии. Fallback прозрачен для вызывающего кода:
async def get_transcript(session_id: str) -> List[dict]:
turns = await get_turns_from_db(session_id)
if turns:
return turns
# Fallback: извлекаем из transcript_json
session = await get_session(session_id)
if session.transcript_json:
return reconstruct_turns(session.transcript_json)
return []
def reconstruct_turns(transcript_json: dict) -> List[dict]:
"""Восстанавливает фрагменты из JSON"""
turns = []
for item in transcript_json.get('turns', []):
turns.append({
'id': str(uuid.uuid5(uuid.NAMESPACE_DNS, str(item))),
'timestamp': format_elapsed(item['start_time']),
'speaker': item['speaker'],
'text': item['text']
})
return turns
Первая ошибка — задержка умножалась до сна, а не после. Исправили: delay *= backoff_factor теперь после asyncio.sleep. Вторая — ретраились все исключения, включая программные ошибки. Сделали параметр exceptions. Третья — несоответствие формата времени: в JSON были Unix-секунды, а API ожидал "HH:MM:SS". Вынесли общую функцию форматирования.
Что делать прямо сейчас: проанализируйте свой код на предмет узких мест. Если вы работаете с большими объёмами данных — внедрите потоковую пакетную обработку. Если используете внешние API — оберните вызовы в retry_async с экспоненциальной задержкой. И всегда предусматривайте fallback для критических данных. Отказоустойчивость — это не фича, а подход к проектированию.
Хочешь закрепить знания на практике?
Решай задачи на Algolit — интерактивная платформа для обучения
Начать бесплатно →