Files
aerbim-ht-monitor/backend/api/parser/parse_csv.py
2025-09-04 18:37:23 +03:00

192 lines
9.8 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import csv
import logging
from datetime import datetime
from pathlib import Path
from typing import Dict, Any
from sitemanagement.models import Multiplexor, Channel, Sensor, Alert, Metric
logger = logging.getLogger(__name__)
def parse_multiplexor_data(csv_file_path: str) -> Dict[str, Any]:
"""
Парсит данные из CSV файла с показаниями мультиплексора.
Args:
csv_file_path (str): Путь к CSV файлу с данными
Returns:
Dict[str, Any]: Словарь с результатами парсинга:
- success (bool): Успешно ли выполнен парсинг
- errors (List[str]): Список ошибок, если они есть
- metrics_count (int): Количество созданных метрик
- alerts_count (int): Количество созданных алертов
"""
if not csv_file_path:
logger.error("Не указан путь к файлу")
return {
"success": False,
"errors": ["Не указан путь к файлу"],
"metrics_count": 0,
"alerts_count": 0
}
# проверяем существование файла
if not os.path.exists(csv_file_path):
logger.error(f"Файл не найден: {csv_file_path}")
return {
"success": False,
"errors": ["Файл не найден"],
"metrics_count": 0,
"alerts_count": 0
}
#! шаг 1: извлекаем имя мультиплексора из имени файла
file_name = Path(csv_file_path).stem # получаем имя файла без расширения
# ищем мультиплексор в базе данных по имени
try:
multiplexor = Multiplexor.objects.get(name=file_name)
except Multiplexor.DoesNotExist:
logger.error(f"Мультиплексор с именем {file_name} не найден")
return {
"success": False,
"errors": [f"Мультиплексор с именем {file_name} не найден"],
"metrics_count": 0,
"alerts_count": 0
}
#!TODO -- читать Vmux (напряжение мультиплексора) Tmux (температура мультиплексора) и записывать в базу
#! шаг 2: проверяем каналы
try:
metrics_count = 0
alerts_count = 0
errors = []
with open(csv_file_path, 'r') as csvfile:
csv_reader = csv.DictReader(csvfile)
# получаем заголовки CSV - они должны содержать номера каналов
headers = csv_reader.fieldnames
if not headers:
logger.error("CSV файл пуст или неверного формата")
return {
"success": False,
"errors": ["CSV файл пуст или неверного формата"],
"metrics_count": 0,
"alerts_count": 0
}
# получаем список каналов из CSV
csv_channels = [h for h in headers if h.startswith('CH-')]
# получаем каналы мультиплексора из базы
db_channels = Channel.objects.filter(multiplexor=multiplexor)
db_channel_numbers = set(ch.number for ch in db_channels)
# проверяем соответствие каналов
csv_channel_numbers = set(int(ch.split('-')[1]) for ch in csv_channels)
missing_channels = db_channel_numbers - csv_channel_numbers
extra_channels = csv_channel_numbers - db_channel_numbers
if missing_channels or extra_channels:
error_msg = []
if missing_channels:
error_msg.append(f"Отсутствуют каналы: {', '.join(f'CH-{num}' for num in missing_channels)}")
if extra_channels:
error_msg.append(f"Лишние каналы: {', '.join(f'CH-{num}' for num in extra_channels)}")
logger.error(". ".join(error_msg))
return {
"success": False,
"errors": error_msg,
"metrics_count": 0,
"alerts_count": 0
}
#! шаг 3: проверяем датчики в каналах
for row in csv_reader:
# если нет timestamp в CSV, используем текущее время
timestamp = row.get('timestamp', datetime.now().isoformat())
for channel_header in csv_channels:
channel_number = int(channel_header.split('-')[1])
value = row[channel_header]
# получаем канал и его датчики из базы
try:
channel = db_channels.get(number=channel_number)
sensors = Sensor.objects.filter(channel=channel)
if not sensors.exists():
errors.append(f"Канал CH-{channel_number} не имеет настроенных датчиков")
continue
# проверяем значение для каждого датчика в канале ++ TODO полуканальность
for sensor in sensors:
try:
float_value = float(value)
#!TODO -- математические формулы для rowdata
# создаем метрику для каждого значения
metric = Metric.objects.create(
sensor=sensor,
value=float_value,
timestamp=timestamp
)
metrics_count += 1
# проверяем пороговые значения, если они заданы
if sensor.sensor_type.min_value is not None and float_value < float(sensor.sensor_type.min_value):
message = f"Значение {value} в канале CH-{channel_number} ниже минимального порога {sensor.sensor_type.min_value}"
errors.append(f"{message} для датчика {sensor.name or sensor.sensor_type.code}")
# создаем алерт для нижнего порога
Alert.objects.create(
sensor=sensor,
metric=metric,
sensor_type=sensor.sensor_type,
message=message,
severity="critical" if float_value < float(sensor.sensor_type.min_value) * 0.9 else "warning"
)
alerts_count += 1
if sensor.sensor_type.max_value is not None and float_value > float(sensor.sensor_type.max_value):
message = f"Значение {value} в канале CH-{channel_number} выше максимального порога {sensor.sensor_type.max_value}"
errors.append(f"{message} для датчика {sensor.name or sensor.sensor_type.code}")
# создаем алерт для верхнего порога
Alert.objects.create(
sensor=sensor,
metric=metric,
sensor_type=sensor.sensor_type,
message=message,
severity="critical" if float_value > float(sensor.sensor_type.max_value) * 1.1 else "warning"
)
alerts_count += 1
except ValueError:
errors.append(f"Некорректное значение {value} в канале CH-{channel_number}")
except Channel.DoesNotExist:
errors.append(f"Канал CH-{channel_number} не найден в базе данных")
return {
"success": True,
"errors": errors,
"metrics_count": metrics_count,
"alerts_count": alerts_count
}
except Exception as e:
logger.error(f"Ошибка при чтении CSV файла: {str(e)}")
return {
"success": False,
"errors": [f"Ошибка при чтении CSV файла: {str(e)}"],
"metrics_count": metrics_count,
"alerts_count": alerts_count
}