Files
2026-05-18 01:33:23 +05:00

270 lines
11 KiB
Python

"""Сервис мастер-пароля и шифрования секретов устройств (envelope encryption).
Архитектура:
• DEK — случайные 32 байта, шифруют все секреты устройств AES-256-GCM.
• KEK — производный ключ из мастер-пароля (PBKDF2-HMAC-SHA256, по умолчанию
200_000 итераций, соль 16 B). KEK существует только в момент init/unlock/rotate.
• В таблице `vault` хранится: соль, число итераций, verifier (короткий
AES-GCM-токен от KEK для проверки пароля) и dek_wrapped (DEK, завёрнутый KEK).
• После unlock'а DEK кешируется в памяти процесса; после рестарта vault
автоматически locked, фоновые задачи и API устройств получают VaultLocked.
Мастер-пароль в БД НЕ хранится — только производные. Забыл — данные потеряны
безвозвратно (это by design). См. /api/v1/vault/rotate для смены пароля.
"""
from __future__ import annotations
import base64
import os
import secrets
import threading
from dataclasses import dataclass
from typing import Optional
from cryptography.hazmat.primitives import hashes
from cryptography.hazmat.primitives.ciphers.aead import AESGCM
from cryptography.hazmat.primitives.kdf.pbkdf2 import PBKDF2HMAC
from sqlalchemy.orm import Session
from ..models.vault import Vault
DEFAULT_KDF_ITERATIONS = 200_000
DEK_BYTES = 32 # AES-256
SALT_BYTES = 16
NONCE_BYTES = 12 # стандарт AES-GCM
VERIFIER_PLAINTEXT = b"roszetta-vault-v2"
SECRET_PREFIX_V2 = "v2:" # формат: v2:<base64(nonce|ct|tag)>
class VaultError(Exception):
"""Общий класс ошибок vault."""
class VaultLocked(VaultError):
"""Vault заблокирован — нужно ввести мастер-пароль через /api/v1/vault/unlock."""
class VaultNotInitialized(VaultError):
"""Мастер-пароль ещё не задан — нужен /api/v1/vault/init."""
class VaultAlreadyInitialized(VaultError):
"""Попытка повторного init — нужно использовать /rotate."""
class InvalidMasterPassword(VaultError):
"""Мастер-пароль не подходит."""
# --- helpers --------------------------------------------------------------
def _b64e(data: bytes) -> str:
return base64.b64encode(data).decode("ascii")
def _b64d(text: str) -> bytes:
return base64.b64decode(text.encode("ascii"))
def _derive_kek(master_password: str, salt: bytes, iterations: int) -> bytes:
kdf = PBKDF2HMAC(
algorithm=hashes.SHA256(),
length=32,
salt=salt,
iterations=iterations,
)
return kdf.derive(master_password.encode("utf-8"))
def _aead_encrypt(key: bytes, plaintext: bytes) -> str:
"""AES-256-GCM. Возвращает base64(nonce|ct|tag)."""
aead = AESGCM(key)
nonce = os.urandom(NONCE_BYTES)
ct_with_tag = aead.encrypt(nonce, plaintext, associated_data=None)
return _b64e(nonce + ct_with_tag)
def _aead_decrypt(key: bytes, token_b64: str) -> bytes:
raw = _b64d(token_b64)
if len(raw) < NONCE_BYTES + 16:
raise InvalidMasterPassword("слишком короткий ciphertext")
nonce, ct_with_tag = raw[:NONCE_BYTES], raw[NONCE_BYTES:]
aead = AESGCM(key)
return aead.decrypt(nonce, ct_with_tag, associated_data=None)
# --- основной сервис -------------------------------------------------------
@dataclass
class VaultStatus:
initialized: bool
unlocked: bool
def as_dict(self) -> dict:
return {"initialized": self.initialized, "unlocked": self.unlocked}
class VaultService:
"""Singleton-сервис. Держит DEK в памяти процесса."""
def __init__(self) -> None:
self._dek: Optional[bytes] = None
self._lock = threading.RLock()
# Кеш «инициализирован ли vault» — обновляется при init/status, чтобы
# encrypt_secret() мог решить legacy-fallback без передачи db-сессии.
self._initialized_cache: Optional[bool] = None
def refresh_initialized_cache(self, db: Session) -> bool:
with self._lock:
self._initialized_cache = db.query(Vault).first() is not None
return self._initialized_cache
# ---- состояние ----
def status(self, db: Session) -> VaultStatus:
row = db.query(Vault).first()
self._initialized_cache = row is not None
return VaultStatus(initialized=row is not None, unlocked=self._dek is not None)
def is_unlocked(self) -> bool:
return self._dek is not None
def is_initialized_cached(self) -> Optional[bool]:
"""None — кеш ещё не заполнен; True/False — последнее состояние."""
return self._initialized_cache
def _require_unlocked(self) -> bytes:
if self._dek is None:
raise VaultLocked("vault locked: введите мастер-пароль в Настройках → Безопасность")
return self._dek
# ---- init / unlock / lock / rotate ----
def init_master_password(self, db: Session, master_password: str) -> None:
if not master_password or len(master_password) < 8:
raise VaultError("мастер-пароль должен быть не короче 8 символов")
with self._lock:
existing = db.query(Vault).first()
if existing is not None:
raise VaultAlreadyInitialized("vault уже инициализирован — используйте rotate")
salt = os.urandom(SALT_BYTES)
kek = _derive_kek(master_password, salt, DEFAULT_KDF_ITERATIONS)
dek = secrets.token_bytes(DEK_BYTES)
row = Vault(
kdf_salt=_b64e(salt),
kdf_iterations=DEFAULT_KDF_ITERATIONS,
verifier=_aead_encrypt(kek, VERIFIER_PLAINTEXT),
dek_wrapped=_aead_encrypt(kek, dek),
)
db.add(row)
db.commit()
self._dek = dek # сразу разблокирован после init
self._initialized_cache = True
def unlock(self, db: Session, master_password: str) -> None:
with self._lock:
row = db.query(Vault).first()
if row is None:
raise VaultNotInitialized("сначала установите мастер-пароль")
salt = _b64d(row.kdf_salt)
kek = _derive_kek(master_password, salt, row.kdf_iterations)
# 1) проверяем пароль через verifier
try:
check = _aead_decrypt(kek, row.verifier)
except Exception as exc: # noqa: BLE001 — любая ошибка дешифровки = неверный пароль
raise InvalidMasterPassword("неверный мастер-пароль") from exc
if check != VERIFIER_PLAINTEXT:
raise InvalidMasterPassword("неверный мастер-пароль")
# 2) разворачиваем DEK
dek = _aead_decrypt(kek, row.dek_wrapped)
if len(dek) != DEK_BYTES:
raise VaultError("повреждённый dek_wrapped")
self._dek = dek
self._initialized_cache = True
def lock(self) -> None:
with self._lock:
self._dek = None
def rotate_master_password(self, db: Session, old_password: str, new_password: str) -> None:
if not new_password or len(new_password) < 8:
raise VaultError("новый мастер-пароль должен быть не короче 8 символов")
with self._lock:
row = db.query(Vault).first()
if row is None:
raise VaultNotInitialized("vault не инициализирован")
# Сначала проверяем старый пароль и достаём DEK
salt_old = _b64d(row.kdf_salt)
kek_old = _derive_kek(old_password, salt_old, row.kdf_iterations)
try:
_ = _aead_decrypt(kek_old, row.verifier)
dek = _aead_decrypt(kek_old, row.dek_wrapped)
except Exception as exc: # noqa: BLE001
raise InvalidMasterPassword("текущий мастер-пароль неверен") from exc
# Генерим новую соль и перешифровываем verifier/DEK новым KEK
new_salt = os.urandom(SALT_BYTES)
kek_new = _derive_kek(new_password, new_salt, DEFAULT_KDF_ITERATIONS)
row.kdf_salt = _b64e(new_salt)
row.kdf_iterations = DEFAULT_KDF_ITERATIONS
row.verifier = _aead_encrypt(kek_new, VERIFIER_PLAINTEXT)
row.dek_wrapped = _aead_encrypt(kek_new, dek)
db.commit()
self._dek = dek # остаётся разблокированным с тем же DEK
# ---- шифрование секретов устройств ----
def encrypt_secret(self, value: str) -> str:
dek = self._require_unlocked()
token = _aead_encrypt(dek, value.encode("utf-8"))
return SECRET_PREFIX_V2 + token
def decrypt_secret_v2(self, token: str) -> str:
dek = self._require_unlocked()
if not token.startswith(SECRET_PREFIX_V2):
raise VaultError("not a v2 ciphertext")
payload = token[len(SECRET_PREFIX_V2):]
try:
return _aead_decrypt(dek, payload).decode("utf-8")
except Exception as exc: # noqa: BLE001
raise VaultError(f"не удалось расшифровать секрет: {exc}") from exc
# Глобальный экземпляр (живёт всё время uvicorn-процесса)
vault_service = VaultService()
def migrate_legacy_device_secrets(db: Session) -> dict:
"""Перешифровывает password_enc у всех устройств с v1 (Fernet от SECRET_KEY)
в v2 (AES-GCM от DEK). Безопасно вызывать многократно — уже v2 пропускаются.
Возвращает {migrated, failed, skipped} для логов/UI.
"""
from ..core.security import _legacy_fernet, is_legacy_secret
from ..models.device import Device
if not vault_service.is_unlocked():
raise VaultLocked("нельзя мигрировать при заблокированном vault")
migrated = 0
failed = 0
skipped = 0
legacy = _legacy_fernet()
for d in db.query(Device).all():
if not is_legacy_secret(d.password_enc):
skipped += 1
continue
try:
plaintext = legacy.decrypt(d.password_enc.encode()).decode()
except Exception: # noqa: BLE001 — старый ключ не подошёл, пропустим, чтобы не терять данные
failed += 1
continue
d.password_enc = vault_service.encrypt_secret(plaintext)
migrated += 1
db.commit()
return {"migrated": migrated, "failed": failed, "skipped": skipped}