Key Encryption

This commit is contained in:
2026-05-18 01:33:23 +05:00
parent a552e55c29
commit c3422f7c67
11 changed files with 831 additions and 16 deletions
+2
View File
@@ -12,6 +12,7 @@ from .v1 import firmware as firmware_router
from .v1 import health as health_router
from .v1 import metrics as metrics_router
from .v1 import settings as settings_router
from .v1 import vault as vault_router
api_router = APIRouter(prefix="/api/v1")
api_router.include_router(health_router.router, tags=["health"])
@@ -24,3 +25,4 @@ api_router.include_router(metrics_router.router, tags=["metrics"])
api_router.include_router(cli_router.router, prefix="/cli", tags=["cli"])
api_router.include_router(controller_backup_router.router, prefix="/controller/backup", tags=["controller"])
api_router.include_router(settings_router.router, prefix="/settings", tags=["settings"])
api_router.include_router(vault_router.router, prefix="/vault", tags=["vault"])
+1 -1
View File
@@ -1,7 +1,7 @@
from fastapi import APIRouter
APP_NAME = "ROSzetta"
APP_VERSION = "0.6.0"
APP_VERSION = "0.7.0"
router = APIRouter()
+118
View File
@@ -0,0 +1,118 @@
"""API мастер-пароля / vault.
Эндпоинты:
GET /api/v1/vault/status — состояние (initialized, unlocked); доступно всем авторизованным.
POST /api/v1/vault/init — установить первичный мастер-пароль (admin, только если не init).
POST /api/v1/vault/unlock — разблокировать DEK мастер-паролем (admin).
POST /api/v1/vault/lock — забыть DEK (admin).
POST /api/v1/vault/rotate — сменить мастер-пароль (admin).
После init/unlock автоматически выполняется миграция legacy v1-секретов в v2.
"""
from __future__ import annotations
from fastapi import APIRouter, Body, Depends, HTTPException, status
from pydantic import BaseModel, Field
from sqlalchemy.orm import Session
from ...core.db import get_db
from ...models.user import User
from ...services.vault import (
InvalidMasterPassword,
VaultAlreadyInitialized,
VaultError,
VaultLocked,
VaultNotInitialized,
migrate_legacy_device_secrets,
vault_service,
)
from ..deps import get_current_user, require_role
router = APIRouter()
class InitPayload(BaseModel):
master_password: str = Field(min_length=8, max_length=256)
class UnlockPayload(BaseModel):
master_password: str = Field(min_length=1, max_length=256)
class RotatePayload(BaseModel):
old_password: str = Field(min_length=1, max_length=256)
new_password: str = Field(min_length=8, max_length=256)
@router.get("/status")
def vault_status(
db: Session = Depends(get_db),
_: User = Depends(get_current_user),
) -> dict:
return vault_service.status(db).as_dict()
@router.post("/init", status_code=status.HTTP_201_CREATED)
def vault_init(
payload: InitPayload,
db: Session = Depends(get_db),
_: User = Depends(require_role("admin")),
) -> dict:
try:
vault_service.init_master_password(db, payload.master_password)
except VaultAlreadyInitialized as exc:
raise HTTPException(status.HTTP_409_CONFLICT, str(exc))
except VaultError as exc:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(exc))
# После init обычно паролей ещё нет, но всё равно прогоним миграцию на всякий случай.
migration = migrate_legacy_device_secrets(db)
return {
"status": vault_service.status(db).as_dict(),
"migration": migration,
}
@router.post("/unlock")
def vault_unlock(
payload: UnlockPayload,
db: Session = Depends(get_db),
_: User = Depends(require_role("admin")),
) -> dict:
try:
vault_service.unlock(db, payload.master_password)
except VaultNotInitialized as exc:
raise HTTPException(status.HTTP_409_CONFLICT, str(exc))
except InvalidMasterPassword as exc:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(exc))
except VaultError as exc:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(exc))
migration = migrate_legacy_device_secrets(db)
return {
"status": vault_service.status(db).as_dict(),
"migration": migration,
}
@router.post("/lock")
def vault_lock(
_: User = Depends(require_role("admin")),
) -> dict:
vault_service.lock()
return {"unlocked": False}
@router.post("/rotate")
def vault_rotate(
payload: RotatePayload,
db: Session = Depends(get_db),
_: User = Depends(require_role("admin")),
) -> dict:
try:
vault_service.rotate_master_password(db, payload.old_password, payload.new_password)
except VaultNotInitialized as exc:
raise HTTPException(status.HTTP_409_CONFLICT, str(exc))
except InvalidMasterPassword as exc:
raise HTTPException(status.HTTP_403_FORBIDDEN, str(exc))
except VaultError as exc:
raise HTTPException(status.HTTP_400_BAD_REQUEST, str(exc))
return {"status": vault_service.status(db).as_dict()}