269 lines
11 KiB
Python
269 lines
11 KiB
Python
from __future__ import annotations
|
|
|
|
from contextlib import asynccontextmanager
|
|
|
|
from apscheduler.schedulers.asyncio import AsyncIOScheduler
|
|
from fastapi import FastAPI, Request
|
|
from fastapi.middleware.cors import CORSMiddleware
|
|
from fastapi.responses import JSONResponse
|
|
from loguru import logger
|
|
|
|
from .api.router import api_router
|
|
from .core.bootstrap import init_db
|
|
from .core.config import get_settings
|
|
from .core.db import SessionLocal
|
|
from .services.vault import VaultLocked, VaultNotInitialized, vault_service
|
|
|
|
|
|
def _job_firmware_check() -> None:
|
|
from .services.firmware_check import check_and_alert
|
|
db = SessionLocal()
|
|
try:
|
|
check_and_alert(db)
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("firmware check job failed: {}", exc)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
def _job_probe_devices() -> None:
|
|
"""Периодически опрашивает все устройства, обновляет метрики/алерты."""
|
|
from .models.device import Device
|
|
from .models.metric import DeviceMetric
|
|
from .models.interface_stat import InterfaceStat
|
|
from .core.security import decrypt_secret
|
|
from .services.events import add_alert
|
|
from .services.routeros.client import (
|
|
RouterOSCredentials, RouterOSError, check_internet,
|
|
fetch_identity, fetch_interface_stats, fetch_resource, parse_uptime,
|
|
)
|
|
from datetime import datetime, timedelta, timezone
|
|
|
|
# Если vault уже инициализирован, но заблокирован — пропускаем итерацию:
|
|
# без DEK не получится расшифровать password_enc устройств в формате v2.
|
|
# До инициализации (legacy-режим) опрос продолжается со старым ключом из SECRET_KEY.
|
|
if vault_service.is_initialized_cached() is True and not vault_service.is_unlocked():
|
|
logger.info("probe_devices: vault locked, пропускаем итерацию")
|
|
return
|
|
|
|
db = SessionLocal()
|
|
try:
|
|
for d in db.query(Device).all():
|
|
creds = RouterOSCredentials(
|
|
host=d.host, username=d.username,
|
|
password=decrypt_secret(d.password_enc),
|
|
port=d.port, use_tls=d.use_tls, timeout=5.0,
|
|
)
|
|
try:
|
|
res = fetch_resource(creds)
|
|
ident = fetch_identity(creds)
|
|
except RouterOSError as exc:
|
|
if d.status != "down":
|
|
add_alert(db, severity="error", category="device",
|
|
source=f"device:{d.id}",
|
|
title=f"Устройство недоступно: {d.identity or d.name}",
|
|
message=str(exc))
|
|
d.status = "down"
|
|
d.last_error = str(exc)
|
|
db.commit()
|
|
continue
|
|
|
|
d.identity = ident or d.identity
|
|
d.model = res.get("board-name") or d.model
|
|
d.ros_version = res.get("version") or d.ros_version
|
|
d.architecture = res.get("architecture-name") or d.architecture
|
|
prev_status = d.status
|
|
d.status = "up"
|
|
d.last_error = None
|
|
d.last_seen = datetime.now(timezone.utc)
|
|
uptime_s = parse_uptime(res.get("uptime"))
|
|
if uptime_s is not None and d.last_uptime_seconds is not None:
|
|
if uptime_s < d.last_uptime_seconds - 60:
|
|
d.abnormal_reboot = True
|
|
add_alert(db, severity="warning", category="abnormal_reboot",
|
|
source=f"device:{d.id}",
|
|
title=f"Аварийный перезапуск: {d.identity or d.name}",
|
|
message=f"uptime {d.last_uptime_seconds}s → {uptime_s}s")
|
|
else:
|
|
d.abnormal_reboot = False
|
|
d.last_uptime_seconds = uptime_s
|
|
try:
|
|
d.internet_ok = check_internet(creds)
|
|
except Exception:
|
|
d.internet_ok = None
|
|
if prev_status == "down":
|
|
add_alert(db, severity="info", category="device",
|
|
source=f"device:{d.id}",
|
|
title=f"Устройство снова онлайн: {d.identity or d.name}")
|
|
|
|
def _i(v):
|
|
try: return int(v) if v is not None else None
|
|
except: return None # noqa: E722
|
|
cpu = _i(res.get("cpu-load"))
|
|
free_mem = _i(res.get("free-memory"))
|
|
total_mem = _i(res.get("total-memory"))
|
|
mem_pct = None
|
|
if free_mem is not None and total_mem and total_mem > 0:
|
|
mem_pct = round(100 - (free_mem / total_mem) * 100, 1)
|
|
db.add(DeviceMetric(
|
|
device_id=d.id,
|
|
cpu_load=float(cpu) if cpu is not None else None,
|
|
mem_used_pct=mem_pct,
|
|
free_memory=free_mem, total_memory=total_mem,
|
|
uptime_seconds=uptime_s, internet_ok=d.internet_ok,
|
|
))
|
|
# ---- Sprint 09: счётчики выбранных интерфейсов ----
|
|
mon = (d.monitored_interfaces or "").strip()
|
|
up = (d.uplink_interfaces or "").strip()
|
|
wanted = {x.strip() for x in mon.split(",") if x.strip()}
|
|
wanted |= {x.strip() for x in up.split(",") if x.strip()}
|
|
if wanted:
|
|
try:
|
|
iface_rows = fetch_interface_stats(creds)
|
|
now_ts = datetime.now(timezone.utc)
|
|
for r in iface_rows:
|
|
if r["name"] in wanted:
|
|
db.add(InterfaceStat(
|
|
device_id=d.id, name=r["name"],
|
|
rx_bytes=r["rx_bytes"], tx_bytes=r["tx_bytes"],
|
|
running=r["running"], ts=now_ts,
|
|
))
|
|
# ретенция: глубина в часах
|
|
keep_hours = int(d.interface_history_hours or 24)
|
|
cutoff = now_ts - timedelta(hours=keep_hours)
|
|
db.query(InterfaceStat).filter(
|
|
InterfaceStat.device_id == d.id,
|
|
InterfaceStat.ts < cutoff,
|
|
).delete(synchronize_session=False)
|
|
except RouterOSError as exc:
|
|
logger.debug("iface stats failed for {}: {}", d.host, exc)
|
|
db.commit()
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("probe job failed: {}", exc)
|
|
finally:
|
|
db.close()
|
|
|
|
|
|
_scheduler: AsyncIOScheduler | None = None
|
|
|
|
# Допустимые интервалы автоопроса (мин), используются для clamp/валидации.
|
|
ALLOWED_PROBE_MINUTES: tuple[int, ...] = (1, 2, 3, 5, 10)
|
|
|
|
|
|
def reschedule_probe_job(minutes: int) -> int:
|
|
"""Изменяет интервал джобы probe_devices на лету. Возвращает применённое значение."""
|
|
global _scheduler
|
|
if minutes not in ALLOWED_PROBE_MINUTES:
|
|
# ближайшее снизу из разрешённых
|
|
minutes = max((m for m in ALLOWED_PROBE_MINUTES if m <= minutes), default=ALLOWED_PROBE_MINUTES[0])
|
|
if _scheduler is None:
|
|
return minutes
|
|
_scheduler.reschedule_job("probe_devices", trigger="interval", minutes=minutes)
|
|
logger.info("probe_devices job rescheduled: every {}m", minutes)
|
|
return minutes
|
|
|
|
|
|
@asynccontextmanager
|
|
async def lifespan(_: FastAPI):
|
|
global _scheduler
|
|
settings = get_settings()
|
|
logger.info("Starting ROSzetta API ({} env)", settings.app_env)
|
|
init_db()
|
|
|
|
# Прогреваем кеш «vault initialized?» — нужен encrypt_secret() для legacy-fallback
|
|
# и probe-джобе, чтобы решать skip/run без обращения к БД.
|
|
try:
|
|
_db = SessionLocal()
|
|
try:
|
|
initialized = vault_service.refresh_initialized_cache(_db)
|
|
logger.info("Vault initialized={}, unlocked={}", initialized, vault_service.is_unlocked())
|
|
finally:
|
|
_db.close()
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("vault init-cache refresh failed: {}", exc)
|
|
|
|
# FTP-сервер для приёма push-бэкапов от MikroTik
|
|
try:
|
|
from .services.backup_ftp_server import start_server
|
|
start_server(host=settings.backup_ftp_host, port=settings.backup_ftp_port)
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("Backup FTP server failed to start: {}", exc)
|
|
|
|
# Стартовый интервал берём из настроек БД (если уже сохранены), иначе из env.
|
|
probe_minutes = settings.device_probe_interval_minutes
|
|
try:
|
|
from .services.settings import get_settings_dict
|
|
db = SessionLocal()
|
|
try:
|
|
s = get_settings_dict(db)
|
|
ui_pm = (s.get("ui") or {}).get("probe_interval_minutes")
|
|
if isinstance(ui_pm, int) and ui_pm in ALLOWED_PROBE_MINUTES:
|
|
probe_minutes = ui_pm
|
|
finally:
|
|
db.close()
|
|
except Exception as exc: # pragma: no cover
|
|
logger.warning("could not load probe interval from settings: {}", exc)
|
|
|
|
_scheduler = AsyncIOScheduler(timezone="UTC")
|
|
from datetime import datetime, timedelta, timezone
|
|
now = datetime.now(timezone.utc)
|
|
_scheduler.add_job(
|
|
_job_firmware_check, "interval",
|
|
hours=max(1, settings.firmware_check_interval_hours),
|
|
id="firmware_check",
|
|
next_run_time=now + timedelta(seconds=30),
|
|
)
|
|
_scheduler.add_job(
|
|
_job_probe_devices, "interval",
|
|
minutes=max(1, probe_minutes),
|
|
id="probe_devices",
|
|
next_run_time=now + timedelta(seconds=10),
|
|
)
|
|
_scheduler.start()
|
|
logger.info("Scheduler started: firmware/{}h, probe/{}m",
|
|
settings.firmware_check_interval_hours, probe_minutes)
|
|
yield
|
|
if _scheduler:
|
|
_scheduler.shutdown(wait=False)
|
|
try:
|
|
from .services.backup_ftp_server import stop_server
|
|
stop_server()
|
|
except Exception: # pragma: no cover
|
|
pass
|
|
logger.info("Shutting down")
|
|
|
|
|
|
def create_app() -> FastAPI:
|
|
settings = get_settings()
|
|
app = FastAPI(
|
|
title="ROSzetta API",
|
|
version="0.1.0",
|
|
lifespan=lifespan,
|
|
)
|
|
app.add_middleware(
|
|
CORSMiddleware,
|
|
allow_origins=settings.cors_origins_list,
|
|
allow_credentials=True,
|
|
allow_methods=["*"],
|
|
allow_headers=["*"],
|
|
)
|
|
|
|
@app.exception_handler(VaultLocked)
|
|
async def _vault_locked(_: Request, exc: VaultLocked) -> JSONResponse:
|
|
# 423 Locked — стандартный HTTP-код для «ресурс заперт»; фронт ловит его и
|
|
# показывает форму ввода мастер-пароля.
|
|
return JSONResponse(status_code=423, content={"detail": str(exc), "code": "vault_locked"})
|
|
|
|
@app.exception_handler(VaultNotInitialized)
|
|
async def _vault_uninit(_: Request, exc: VaultNotInitialized) -> JSONResponse:
|
|
return JSONResponse(
|
|
status_code=412,
|
|
content={"detail": str(exc), "code": "vault_not_initialized"},
|
|
)
|
|
|
|
app.include_router(api_router)
|
|
return app
|
|
|
|
|
|
app = create_app()
|