Files
2026-05-18 01:33:23 +05:00

269 lines
11 KiB
Python

from __future__ import annotations
from contextlib import asynccontextmanager
from apscheduler.schedulers.asyncio import AsyncIOScheduler
from fastapi import FastAPI, Request
from fastapi.middleware.cors import CORSMiddleware
from fastapi.responses import JSONResponse
from loguru import logger
from .api.router import api_router
from .core.bootstrap import init_db
from .core.config import get_settings
from .core.db import SessionLocal
from .services.vault import VaultLocked, VaultNotInitialized, vault_service
def _job_firmware_check() -> None:
from .services.firmware_check import check_and_alert
db = SessionLocal()
try:
check_and_alert(db)
except Exception as exc: # pragma: no cover
logger.warning("firmware check job failed: {}", exc)
finally:
db.close()
def _job_probe_devices() -> None:
"""Периодически опрашивает все устройства, обновляет метрики/алерты."""
from .models.device import Device
from .models.metric import DeviceMetric
from .models.interface_stat import InterfaceStat
from .core.security import decrypt_secret
from .services.events import add_alert
from .services.routeros.client import (
RouterOSCredentials, RouterOSError, check_internet,
fetch_identity, fetch_interface_stats, fetch_resource, parse_uptime,
)
from datetime import datetime, timedelta, timezone
# Если vault уже инициализирован, но заблокирован — пропускаем итерацию:
# без DEK не получится расшифровать password_enc устройств в формате v2.
# До инициализации (legacy-режим) опрос продолжается со старым ключом из SECRET_KEY.
if vault_service.is_initialized_cached() is True and not vault_service.is_unlocked():
logger.info("probe_devices: vault locked, пропускаем итерацию")
return
db = SessionLocal()
try:
for d in db.query(Device).all():
creds = RouterOSCredentials(
host=d.host, username=d.username,
password=decrypt_secret(d.password_enc),
port=d.port, use_tls=d.use_tls, timeout=5.0,
)
try:
res = fetch_resource(creds)
ident = fetch_identity(creds)
except RouterOSError as exc:
if d.status != "down":
add_alert(db, severity="error", category="device",
source=f"device:{d.id}",
title=f"Устройство недоступно: {d.identity or d.name}",
message=str(exc))
d.status = "down"
d.last_error = str(exc)
db.commit()
continue
d.identity = ident or d.identity
d.model = res.get("board-name") or d.model
d.ros_version = res.get("version") or d.ros_version
d.architecture = res.get("architecture-name") or d.architecture
prev_status = d.status
d.status = "up"
d.last_error = None
d.last_seen = datetime.now(timezone.utc)
uptime_s = parse_uptime(res.get("uptime"))
if uptime_s is not None and d.last_uptime_seconds is not None:
if uptime_s < d.last_uptime_seconds - 60:
d.abnormal_reboot = True
add_alert(db, severity="warning", category="abnormal_reboot",
source=f"device:{d.id}",
title=f"Аварийный перезапуск: {d.identity or d.name}",
message=f"uptime {d.last_uptime_seconds}s → {uptime_s}s")
else:
d.abnormal_reboot = False
d.last_uptime_seconds = uptime_s
try:
d.internet_ok = check_internet(creds)
except Exception:
d.internet_ok = None
if prev_status == "down":
add_alert(db, severity="info", category="device",
source=f"device:{d.id}",
title=f"Устройство снова онлайн: {d.identity or d.name}")
def _i(v):
try: return int(v) if v is not None else None
except: return None # noqa: E722
cpu = _i(res.get("cpu-load"))
free_mem = _i(res.get("free-memory"))
total_mem = _i(res.get("total-memory"))
mem_pct = None
if free_mem is not None and total_mem and total_mem > 0:
mem_pct = round(100 - (free_mem / total_mem) * 100, 1)
db.add(DeviceMetric(
device_id=d.id,
cpu_load=float(cpu) if cpu is not None else None,
mem_used_pct=mem_pct,
free_memory=free_mem, total_memory=total_mem,
uptime_seconds=uptime_s, internet_ok=d.internet_ok,
))
# ---- Sprint 09: счётчики выбранных интерфейсов ----
mon = (d.monitored_interfaces or "").strip()
up = (d.uplink_interfaces or "").strip()
wanted = {x.strip() for x in mon.split(",") if x.strip()}
wanted |= {x.strip() for x in up.split(",") if x.strip()}
if wanted:
try:
iface_rows = fetch_interface_stats(creds)
now_ts = datetime.now(timezone.utc)
for r in iface_rows:
if r["name"] in wanted:
db.add(InterfaceStat(
device_id=d.id, name=r["name"],
rx_bytes=r["rx_bytes"], tx_bytes=r["tx_bytes"],
running=r["running"], ts=now_ts,
))
# ретенция: глубина в часах
keep_hours = int(d.interface_history_hours or 24)
cutoff = now_ts - timedelta(hours=keep_hours)
db.query(InterfaceStat).filter(
InterfaceStat.device_id == d.id,
InterfaceStat.ts < cutoff,
).delete(synchronize_session=False)
except RouterOSError as exc:
logger.debug("iface stats failed for {}: {}", d.host, exc)
db.commit()
except Exception as exc: # pragma: no cover
logger.warning("probe job failed: {}", exc)
finally:
db.close()
_scheduler: AsyncIOScheduler | None = None
# Допустимые интервалы автоопроса (мин), используются для clamp/валидации.
ALLOWED_PROBE_MINUTES: tuple[int, ...] = (1, 2, 3, 5, 10)
def reschedule_probe_job(minutes: int) -> int:
"""Изменяет интервал джобы probe_devices на лету. Возвращает применённое значение."""
global _scheduler
if minutes not in ALLOWED_PROBE_MINUTES:
# ближайшее снизу из разрешённых
minutes = max((m for m in ALLOWED_PROBE_MINUTES if m <= minutes), default=ALLOWED_PROBE_MINUTES[0])
if _scheduler is None:
return minutes
_scheduler.reschedule_job("probe_devices", trigger="interval", minutes=minutes)
logger.info("probe_devices job rescheduled: every {}m", minutes)
return minutes
@asynccontextmanager
async def lifespan(_: FastAPI):
global _scheduler
settings = get_settings()
logger.info("Starting ROSzetta API ({} env)", settings.app_env)
init_db()
# Прогреваем кеш «vault initialized?» — нужен encrypt_secret() для legacy-fallback
# и probe-джобе, чтобы решать skip/run без обращения к БД.
try:
_db = SessionLocal()
try:
initialized = vault_service.refresh_initialized_cache(_db)
logger.info("Vault initialized={}, unlocked={}", initialized, vault_service.is_unlocked())
finally:
_db.close()
except Exception as exc: # pragma: no cover
logger.warning("vault init-cache refresh failed: {}", exc)
# FTP-сервер для приёма push-бэкапов от MikroTik
try:
from .services.backup_ftp_server import start_server
start_server(host=settings.backup_ftp_host, port=settings.backup_ftp_port)
except Exception as exc: # pragma: no cover
logger.warning("Backup FTP server failed to start: {}", exc)
# Стартовый интервал берём из настроек БД (если уже сохранены), иначе из env.
probe_minutes = settings.device_probe_interval_minutes
try:
from .services.settings import get_settings_dict
db = SessionLocal()
try:
s = get_settings_dict(db)
ui_pm = (s.get("ui") or {}).get("probe_interval_minutes")
if isinstance(ui_pm, int) and ui_pm in ALLOWED_PROBE_MINUTES:
probe_minutes = ui_pm
finally:
db.close()
except Exception as exc: # pragma: no cover
logger.warning("could not load probe interval from settings: {}", exc)
_scheduler = AsyncIOScheduler(timezone="UTC")
from datetime import datetime, timedelta, timezone
now = datetime.now(timezone.utc)
_scheduler.add_job(
_job_firmware_check, "interval",
hours=max(1, settings.firmware_check_interval_hours),
id="firmware_check",
next_run_time=now + timedelta(seconds=30),
)
_scheduler.add_job(
_job_probe_devices, "interval",
minutes=max(1, probe_minutes),
id="probe_devices",
next_run_time=now + timedelta(seconds=10),
)
_scheduler.start()
logger.info("Scheduler started: firmware/{}h, probe/{}m",
settings.firmware_check_interval_hours, probe_minutes)
yield
if _scheduler:
_scheduler.shutdown(wait=False)
try:
from .services.backup_ftp_server import stop_server
stop_server()
except Exception: # pragma: no cover
pass
logger.info("Shutting down")
def create_app() -> FastAPI:
settings = get_settings()
app = FastAPI(
title="ROSzetta API",
version="0.1.0",
lifespan=lifespan,
)
app.add_middleware(
CORSMiddleware,
allow_origins=settings.cors_origins_list,
allow_credentials=True,
allow_methods=["*"],
allow_headers=["*"],
)
@app.exception_handler(VaultLocked)
async def _vault_locked(_: Request, exc: VaultLocked) -> JSONResponse:
# 423 Locked — стандартный HTTP-код для «ресурс заперт»; фронт ловит его и
# показывает форму ввода мастер-пароля.
return JSONResponse(status_code=423, content={"detail": str(exc), "code": "vault_locked"})
@app.exception_handler(VaultNotInitialized)
async def _vault_uninit(_: Request, exc: VaultNotInitialized) -> JSONResponse:
return JSONResponse(
status_code=412,
content={"detail": str(exc), "code": "vault_not_initialized"},
)
app.include_router(api_router)
return app
app = create_app()