Files
aerbim-ht-monitor/backend/api/account/serializers/sensor_serializers.py

93 lines
3.9 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
from rest_framework import serializers
from drf_spectacular.utils import extend_schema_field
from drf_spectacular.types import OpenApiTypes
from typing import Dict, Any
from sitemanagement.models import Sensor, Alert
class NotificationSerializer(serializers.ModelSerializer):
type = serializers.SerializerMethodField()
priority = serializers.SerializerMethodField()
timestamp = serializers.DateTimeField(source='created_at')
acknowledged = serializers.BooleanField(source='resolved', default=False)
class Meta:
model = Alert
fields = ('id', 'type', 'timestamp', 'acknowledged', 'priority')
def get_type(self, obj):
return 'critical' if obj.severity == 'critical' else 'warning'
def get_priority(self, obj):
return 'high' if obj.severity == 'critical' else 'medium'
class DetectorSerializer(serializers.ModelSerializer):
detector_id = serializers.SerializerMethodField()
type = serializers.SerializerMethodField()
detector_type = serializers.SerializerMethodField()
name = serializers.SerializerMethodField()
object = serializers.SerializerMethodField()
status = serializers.SerializerMethodField()
zone = serializers.SerializerMethodField()
floor = serializers.SerializerMethodField()
notifications = NotificationSerializer(source='alerts', many=True)
class Meta:
model = Sensor
fields = ('detector_id', 'type', 'detector_type', 'serial_number', 'name', 'object', 'status', 'zone', 'floor', 'notifications')
def get_detector_id(self, obj):
# Используем serial_number для совместимости с 3D моделью
# Если serial_number нет, используем ID с префиксом
return obj.serial_number or f"sensor_{obj.id}"
def get_type(self, obj):
sensor_type_mapping = {
'GA': 'Инклинометр',
'PE': 'Тензометр',
'GLE': 'Гидроуровень',
}
code = (getattr(obj.sensor_type, 'code', '') or '').upper()
return sensor_type_mapping.get(code, (getattr(obj.sensor_type, 'name', '') or ''))
def get_detector_type(self, obj):
return (getattr(obj.sensor_type, 'code', '') or '').upper()
def get_name(self, obj):
sensor_type = getattr(obj, 'sensor_type', None) or getattr(obj, 'sensor_type', None)
serial = getattr(obj, 'serial_number', '') or ''
base_name = getattr(obj, 'name', '') or ''
return base_name or f"{getattr(obj.sensor_type, 'code', '')}-{serial}".strip('-')
def get_object(self, obj):
# получаем первую зону датчика и её объект
zone = obj.zones.first()
if zone:
return zone.object.title
return None
def get_status(self, obj):
# Проверяем наличие нерешённых алертов
# Берём самый свежий (последний по времени) нерешённый алерт
latest_alert = obj.alerts.filter(resolved=False).order_by('-created_at').first()
if latest_alert:
return latest_alert.severity # вернет 'warning' или 'critical'
return 'normal'
def get_zone(self, obj):
first_zone = obj.zones.first()
return first_zone.name if first_zone else None
def get_floor(self, obj):
first_zone = obj.zones.first()
return getattr(first_zone, 'floor', None)
class DetectorsResponseSerializer(serializers.Serializer):
detectors = serializers.SerializerMethodField()
@extend_schema_field(OpenApiTypes.OBJECT)
def get_detectors(self, sensors) -> Dict[str, Any]:
detector_serializer = DetectorSerializer(sensors, many=True)
return {
sensor['detector_id']: sensor
for sensor in detector_serializer.data
}