import os import csv import logging from datetime import datetime from pathlib import Path from typing import Dict, Any from sitemanagement.models import Multiplexor, Channel, Sensor, Alert, Metric logger = logging.getLogger(__name__) def parse_multiplexor_data(csv_file_path: str) -> Dict[str, Any]: """ Парсит данные из CSV файла с показаниями мультиплексора. Args: csv_file_path (str): Путь к CSV файлу с данными Returns: Dict[str, Any]: Словарь с результатами парсинга: - success (bool): Успешно ли выполнен парсинг - errors (List[str]): Список ошибок, если они есть - metrics_count (int): Количество созданных метрик - alerts_count (int): Количество созданных алертов """ if not csv_file_path: logger.error("Не указан путь к файлу") return { "success": False, "errors": ["Не указан путь к файлу"], "metrics_count": 0, "alerts_count": 0 } # проверяем существование файла if not os.path.exists(csv_file_path): logger.error(f"Файл не найден: {csv_file_path}") return { "success": False, "errors": ["Файл не найден"], "metrics_count": 0, "alerts_count": 0 } #! шаг 1: извлекаем имя мультиплексора из имени файла file_name = Path(csv_file_path).stem # получаем имя файла без расширения # ищем мультиплексор в базе данных по имени try: multiplexor = Multiplexor.objects.get(name=file_name) except Multiplexor.DoesNotExist: logger.error(f"Мультиплексор с именем {file_name} не найден") return { "success": False, "errors": [f"Мультиплексор с именем {file_name} не найден"], "metrics_count": 0, "alerts_count": 0 } #!TODO -- читать Vmux (напряжение мультиплексора) Tmux (температура мультиплексора) и записывать в базу #! шаг 2: проверяем каналы try: metrics_count = 0 alerts_count = 0 errors = [] with open(csv_file_path, 'r') as csvfile: csv_reader = csv.DictReader(csvfile) # получаем заголовки CSV - они должны содержать номера каналов headers = csv_reader.fieldnames if not headers: logger.error("CSV файл пуст или неверного формата") return { "success": False, "errors": ["CSV файл пуст или неверного формата"], "metrics_count": 0, "alerts_count": 0 } # получаем список каналов из CSV csv_channels = [h for h in headers if h.startswith('CH-')] # получаем каналы мультиплексора из базы db_channels = Channel.objects.filter(multiplexor=multiplexor) db_channel_numbers = set(ch.number for ch in db_channels) # проверяем соответствие каналов csv_channel_numbers = set(int(ch.split('-')[1]) for ch in csv_channels) missing_channels = db_channel_numbers - csv_channel_numbers extra_channels = csv_channel_numbers - db_channel_numbers if missing_channels or extra_channels: error_msg = [] if missing_channels: error_msg.append(f"Отсутствуют каналы: {', '.join(f'CH-{num}' for num in missing_channels)}") if extra_channels: error_msg.append(f"Лишние каналы: {', '.join(f'CH-{num}' for num in extra_channels)}") logger.error(". ".join(error_msg)) return { "success": False, "errors": error_msg, "metrics_count": 0, "alerts_count": 0 } #! шаг 3: проверяем датчики в каналах for row in csv_reader: # если нет timestamp в CSV, используем текущее время timestamp = row.get('timestamp', datetime.now().isoformat()) for channel_header in csv_channels: channel_number = int(channel_header.split('-')[1]) value = row[channel_header] # получаем канал и его датчики из базы try: channel = db_channels.get(number=channel_number) sensors = Sensor.objects.filter(channel=channel) if not sensors.exists(): errors.append(f"Канал CH-{channel_number} не имеет настроенных датчиков") continue # проверяем значение для каждого датчика в канале ++ TODO полуканальность for sensor in sensors: try: float_value = float(value) #!TODO -- математические формулы для rowdata # создаем метрику для каждого значения metric = Metric.objects.create( sensor=sensor, value=float_value, timestamp=timestamp ) metrics_count += 1 # проверяем пороговые значения, если они заданы if sensor.sensor_type.min_value is not None and float_value < float(sensor.sensor_type.min_value): message = f"Значение {value} в канале CH-{channel_number} ниже минимального порога {sensor.sensor_type.min_value}" errors.append(f"{message} для датчика {sensor.name or sensor.sensor_type.code}") # создаем алерт для нижнего порога Alert.objects.create( sensor=sensor, metric=metric, sensor_type=sensor.sensor_type, message=message, severity="critical" if float_value < float(sensor.sensor_type.min_value) * 0.9 else "warning" ) alerts_count += 1 if sensor.sensor_type.max_value is not None and float_value > float(sensor.sensor_type.max_value): message = f"Значение {value} в канале CH-{channel_number} выше максимального порога {sensor.sensor_type.max_value}" errors.append(f"{message} для датчика {sensor.name or sensor.sensor_type.code}") # создаем алерт для верхнего порога Alert.objects.create( sensor=sensor, metric=metric, sensor_type=sensor.sensor_type, message=message, severity="critical" if float_value > float(sensor.sensor_type.max_value) * 1.1 else "warning" ) alerts_count += 1 except ValueError: errors.append(f"Некорректное значение {value} в канале CH-{channel_number}") except Channel.DoesNotExist: errors.append(f"Канал CH-{channel_number} не найден в базе данных") return { "success": True, "errors": errors, "metrics_count": metrics_count, "alerts_count": alerts_count } except Exception as e: logger.error(f"Ошибка при чтении CSV файла: {str(e)}") return { "success": False, "errors": [f"Ошибка при чтении CSV файла: {str(e)}"], "metrics_count": metrics_count, "alerts_count": alerts_count }