Разбираем архитектуру ZoClone — AI-агента на Python из 7 файлов. Планер, память, пул вычислений и навыки. Код и торговые офферты внутри.
Вы когда-нибудь задумывались, сколько в современных AI-платформах реальной архитектуры, а сколько — обвязки из Docker, Postgres и микросервисов? Я восстановил семь ключевых подсистем Zo Computer в 800 строках Python без единого облачного сервиса. В этой статье — архитектура ZoClone, tradeoffs каждого компонента и то, что пришлось вырезать, чтобы уместиться в один репозиторий.
ZoClone — это AI-агент, который запускается на вашем ноутбуке. Он умеет параллельно опрашивать модели, загружать навыки из SKILL.md, сжимать контекст через TF-IDF, запускать задачи по расписанию, распределять работу между машинами и управлять ключами API. Всё это — 775 строк Python в семи файлах:
ZoClone/
├── src/
│ ├── zo.py # главный оркестратор + цикл ask()
│ ├── agent_manager.py # параллельные AI-агенты
│ ├── skills.py # автозагрузка SKILL.md
│ ├── memory.py # TF-IDF эмбеддинги + поиск контекста
│ ├── automation.py # планировщик по rrule
│ ├── compute_pool.py # пул вычислительных узлов
│ ├── browser.py # headless-браузер на Playwright
│ ├── byok.py # хранилище ключей (Groq, OpenAI, Anthropic)
│ ├── zo_client.py # OpenAI-совместимый клиент
│ └── services.py # супервизор процессовНикаких __init__.py, метаклассов или плагинов — только функции и классы с тремя методами. Каждый файл решает ровно одну задачу.
Всё замыкается на класс ZoClone, который владеет подключением к SQLite, пулом потоков и AI-клиентом. Клиент создаётся лениво — при первом вызове ask().
class ZoClone:
def __init__(self):
self.db = init_db()
self.executor = ThreadPoolExecutor(max_workers=10)
self.ai_client = None
self.pool = pool # синглтон модуля
self.hosting = hosting # синглтон модуля
self.memory = memory
self.scheduler = scheduler
def ask(self, conv_id: str, message: str,
provider: str = "groq", model: str = "",
tools: list[dict] = None) -> dict:
if not self.ai_client:
key = get_key(provider)
m = model or PROVIDERS[provider]["models"][0]
self.ai_client = AIClient(provider, m, key)
messages = self.memory.get_context(conv_id)
messages.append({"role": "user", "content": message})
system = f"Вы — AI-агент. Рабочая директория: {os.getcwd()}."
resp = self.ai_client.chat(
[{"role": "system", "content": system}] +
messages[-20:],
tools or [],
)
# сохраняем ответ в БД
return respКлючевой трюк — AIClient. Он единственный, кто должен быть совместим с OpenAI, потому что все современные провайдеры (Groq, Together, OpenRouter, Ollama, LM Studio) сошлись на схеме chat completions. Anthropic потребовал крошечную прослойку, но Groq работает из коробки.
Это моя гордость. Сканирование директории занимает шесть строк:
def load_all_skills():
global SKILLS
SKILLS = {}
if not SKILL_DIR.exists():
return
for item in SKILL_DIR.iterdir():
if item.is_dir() and (item / "SKILL.md").exists():
skill = load_skill(item.name, item / "SKILL.md")
if skill:
SKILLS[skill.name] = skillРазбор SKILL.md принимает те же поля frontmatter, что и Agent Skills: name, description, triggers. Затем ищет scripts/<name>.py с функцией run() или execute(). Это весь API плагина — никакой регистрации, декораторов или манифестов. Просто положите папку в skills/, и следующий import подхватит её.
Цена такой простоты: нет версионирования, декларации зависимостей и песочницы для навыков. Для одного пользователя на ноутбуке — нормально. Для мультитенантной платформы — нет.
Я схитрил и не стыжусь этого. Примитив «запустить параллельного агента» — это удалённый вызов модели, а эндпоинт /zo/ask открыт для любого с токеном:
async def spawn(self, agent_id: str, prompt: str, callback=None):
async with aiohttp.ClientSession() as session:
async with session.post(
"https://api.zo.computer/zo/ask",
headers={
"authorization": self.api_token,
"content-type": "application/json"
},
json={
"input": prompt,
"model_name": "vercel:minimax/minimax-m2.7"
},
) as resp:
return {
"agent_id": agent_id,
"output": (await resp.json())["output"]
}
async def spawn_all(self, agents: list) -> list:
return await asyncio.gather(
*[self.spawn(a["id"], a["prompt"]) for a in agents]
)spawn_all запускает N конкурентных запросов, asyncio.gather ждёт самый медленный, и вы получаете список ответов. Для синхронных вызовов — ThreadPoolExecutor(max_workers=10). На практике узкое место — модель, а не сеть: 10 параллельных вызовов упираются в rate limiter задолго до насыщения asyncio.
Честно: это самая слабая подсистема. embed_tfidf хэширует токены в 512-мерный вектор, cosine считает косинусное расстояние, а recall() возвращает top-k узлов с наибольшей похожестью. Это работает для коротких запросов и маленьких корпусов, но это не семантический поиск — слова «база данных» и «SQL» не кластеризуются так, как с настоящей моделью эмбеддингов.
Почему я всё равно это выкатил: настоящая модель (sentence-transformers или удалённый вызов) заменяется одним свапом, а интерфейс — memorize(content, meta) -> nid, recall(query, top_k) -> [{id, content, meta}] — не меняется. Когда я доберусь до подключения nomic-embed-text через Ollama, в zo.py не придётся трогать ни строчки. Трюк в том, чтобы сначала определить правильную форму, а потом честно признать, какие поля заглушка подделывает.
Спецификация rrule — это 50-страничный документ. Мне было нужно три частоты и счётчик:
def parse_rrule(rrule: str) -> dict:
result = {"interval": 86400, "count": 0} # по умолчанию ежедневно
if "FREQ=DAILY" in rrule:
result["interval"] = 86400
elif "FREQ=HOURLY" in rrule:
result["interval"] = 3600
elif "FREQ=MINUTELY" in rrule:
result["interval"] = 60
if "COUNT=" in rrule:
m = re.search(r"COUNT=(\d+)", rrule)
if m:
result["count"] = int(m.group(1))
return resultDaemon-поток просыпается раз в минуту, спрашивает SQLite WHERE enabled=1 AND next_run <= now, запускает обработчик каждой задачи и сдвигает next_run на интервал. Это вся система автоматизации. В ней нет часовых поясов, исключений и обработки перехода на летнее время, но для «запускать каждый час» — корректно и надёжно.
ComputePool хранит self.jobs и self.nodes как словари в памяти, защищённые threading.Lock. Heartbeats обновляют last_heartbeat, диспетчер сортирует ожидающие задачи по убыванию приоритета и назначает верхнюю следующему опрашивающему узлу. Никакого leader election, Raft или gossip-протокола.
def assign_job(self, node_id: str) -> dict | None:
with self.lock:
pending = [j for j in self.jobs.values()
if j["status"] == "pending"]
if not pending:
return None
pending.sort(key=lambda x: -x["priority"])
job = pending[0]
job["status"] = "assigned"
job["assigned_node"] = node_id
if node_id in self.nodes:
self.nodes[node_id]["status"] = "busy"
return jobЭто настоящая грабля: состояние в процессе означает, что перезапуск теряет все ожидающие задачи. Для реального грида нужно Postgres с блокировками на уровне строк. Но для «запустить задачу на втором ноутбуке» — pip install это весь онбординг.
Три вещи не попали в пакет и, вероятно, никогда не попадут:
zo и вызывайте zo.ask(...) из Flask-роута, Tk-окна, Discord-бота или cron-задачи.whoami() возвращает локальное имя. Если нужен командный план — форкайте репозиторий.nomic-embed-text через Ollama (приватно, бесплатно, на той же машине), а интерфейс останется тем же.Склонируйте репозиторий и запустите:
git clone https://github.com/AmSach/ZoClone
cd ZoClone && pip install aiohttp playwright
python -m playwright install chromium
python -c "from src.zo import zo; print(zo.ask('test-conv', 'hi'))"Хотите добавить навык? Положите папку в skills/ с SKILL.md и scripts/foo.py — и откройте PR. Я мержу в течение 24 часов. Нашли баг в одной из семи подсистем? Откройте issue с минимальным воспроизведением — там всего 775 строк для поиска.
Семь файлов, один Python-процесс, никаких облачных зависимостей. Форма важнее масштаба.
Хочешь закрепить знания на практике?
Решай задачи на Algolit — интерактивная платформа для обучения
Начать бесплатно →