Files
ROSzetta/backend/app/core/security.py
T
2026-05-18 01:33:23 +05:00

109 lines
4.1 KiB
Python

from __future__ import annotations
from datetime import datetime, timedelta, timezone
from typing import Any
from cryptography.fernet import Fernet
from jose import JWTError, jwt
from passlib.context import CryptContext
from .config import get_settings
settings = get_settings()
pwd_context = CryptContext(schemes=["bcrypt"], deprecated="auto")
ALGORITHM = "HS256"
def hash_password(password: str) -> str:
return pwd_context.hash(password)
def verify_password(plain: str, hashed: str) -> bool:
return pwd_context.verify(plain, hashed)
def _now() -> datetime:
return datetime.now(timezone.utc)
def create_access_token(subject: str | int, extra: dict[str, Any] | None = None) -> str:
payload: dict[str, Any] = {
"sub": str(subject),
"type": "access",
"iat": _now(),
"exp": _now() + timedelta(minutes=settings.access_token_expire_minutes),
}
if extra:
payload.update(extra)
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
def create_refresh_token(subject: str | int) -> str:
payload = {
"sub": str(subject),
"type": "refresh",
"iat": _now(),
"exp": _now() + timedelta(days=settings.refresh_token_expire_days),
}
return jwt.encode(payload, settings.secret_key, algorithm=ALGORITHM)
def decode_token(token: str) -> dict[str, Any]:
try:
return jwt.decode(token, settings.secret_key, algorithms=[ALGORITHM])
except JWTError as exc: # pragma: no cover
raise ValueError(f"invalid token: {exc}") from exc
# --- Симметричное шифрование секретов устройств -----------------------------
# Двухслойная схема:
# v1: Fernet от SHA256(SECRET_KEY) — устаревший формат (без префикса для
# обратной совместимости). Только дешифрация. Запись новых v1 запрещена.
# v2: AES-256-GCM от DEK из services.vault, префикс "v2:".
#
# encrypt_secret() требует, чтобы vault был инициализирован и разблокирован.
# decrypt_secret() умеет читать оба формата.
# Подробнее см. services/vault.py.
SECRET_PREFIX_V2 = "v2:"
def _legacy_fernet() -> Fernet:
import base64
import hashlib
digest = hashlib.sha256(settings.secret_key.encode()).digest()
key = base64.urlsafe_b64encode(digest)
return Fernet(key)
def encrypt_secret(value: str) -> str:
"""Шифрует секрет DEK'ом из vault. Если vault ещё не инициализирован
(свежий апгрейд с 0.6.x) — fallback на legacy Fernet, чтобы существующие
сценарии не ломались. После /api/v1/vault/init все секреты пишутся как v2.
"""
from ..services.vault import vault_service # локальный импорт против цикла
initialized = vault_service.is_initialized_cached()
if initialized is False:
# legacy v1: пишем Fernet-токен без префикса
return _legacy_fernet().encrypt(value.encode()).decode()
# initialized is True (или None — тогда поверим, что инициализирован, и
# если на самом деле нет — VaultNotInitialized всплывёт; админу всё равно
# пора создавать мастер-пароль).
return vault_service.encrypt_secret(value)
def decrypt_secret(token: str) -> str:
"""Дешифрует секрет. Поддерживает v2 (vault) и legacy v1 (SECRET_KEY)."""
if token.startswith(SECRET_PREFIX_V2):
from ..services.vault import vault_service
return vault_service.decrypt_secret_v2(token)
# Legacy: Fernet-токен без префикса.
return _legacy_fernet().decrypt(token.encode()).decode()
def is_legacy_secret(token: str) -> bool:
"""True, если секрет ещё не мигрирован на v2 (Fernet от SECRET_KEY)."""
return not token.startswith(SECRET_PREFIX_V2)