# coding=utf-8 from rest_framework import generics from rest_framework.authentication import BasicAuthentication, SessionAuthentication from rest_framework.permissions import IsAuthenticated, DjangoObjectPermissions from rest_framework.views import APIView from rest_framework import viewsets from rest_framework.renderers import JSONRenderer from rest_framework.permissions import AllowAny from rest_framework.response import Response from rest_framework.schemas import SchemaGenerator from rest_framework_swagger import renderers from BaseModels.api.base_api_permissions import * from datetime import datetime from GeneralApp.temp_data_funcs import add_element_in_tmp_data_list, check_exists_element_in_tmp_data_list, add_element_list_to_tmp_data from rest_framework.utils.serializer_helpers import ReturnList from rest_framework.decorators import action from rest_framework import status from django.contrib.contenttypes.models import ContentType from BaseModels.mailSender import techSendMail # from BaseModels.api.api_middlewares import APILogMiddleware class SwaggerSchemaView(APIView): permission_classes = [AllowAny] renderer_classes = [ renderers.OpenAPIRenderer, renderers.SwaggerUIRenderer ] def get(self, request): generator = SchemaGenerator() schema = generator.get_schema(request=request) return Response(schema) JSONCustomRenderer = JSONRenderer JSONCustomRenderer.charset = 'utf-8' class APIBasePublicClass(APIView): # authentication_classes = (SessionAuthentication, BasicAuthentication) permission_classes = (AllowAny,) # renderer_classes = [JSONCustomRenderer] pagination_class = None # def finalize_response(self, request, response, *args, **kwargs): # # res = super(APIBasePublicClass, self).finalize_response(request, response, *args, **kwargs) # # from CompaniesApp.models import Region # regions = Region.objects.filter().values_list( # 'id', 'domain' # ).order_by('id') # res.data.update({'regions': tuple(regions)}) # # return res # def get(self, request, *args, **kwargs): # # if not 'region_id' in request.headers: # request.headers['region_id'] = '1' # # return super(APIBasePublicClass, self).get(request, *args, **kwargs) class APIListBaseClass(generics.ListAPIView): # authentication_classes = (SessionAuthentication, BasicAuthentication, )# permission_classes = (IsAuthenticated,) pagination_class = None class APIBaseClass(generics.RetrieveAPIView): # authentication_classes = (SessionAuthentication, BasicAuthentication, )# permission_classes = (IsAuthenticated, ) # renderer_classes = [JSONCustomRenderer] pagination_class = None class APIBaseSimplaClass(generics.GenericAPIView): # authentication_classes = (SessionAuthentication, BasicAuthentication) permission_classes = (IsAuthenticated,) # renderer_classes = [JSONCustomRenderer] pagination_class = None # ---------------------------- class APIViewSet_ModelReadOnlyClass(viewsets.ReadOnlyModelViewSet): pass # authentication_classes = (SessionAuthentication, BasicAuthentication, )# permission_classes = (IsAuthenticated, ) # renderer_classes = [JSONCustomRenderer] pagination_class = None exclude_actions_for_logging = [] create_kwargs = [ 'create', 'create_short', 'create_item', 'copy_item', 'create_short', 'create_reminder' ] exclude_actions_for_logging.extend(create_kwargs) exclude_actions_for_logging.extend([ 'update', 'partial_update', 'destroy', 'update_items', 'update_item' ]) def log_save_cur_state_obj(query_data, response=None, init=False): if query_data.basename == 'alert' or not query_data.action in exclude_actions_for_logging: return None if response and response.status_code > 299: return None data_Dict = {} data_target = 'log_{0}'.format(str(query_data.basename)) obj_id = None try: if type(query_data.request.data) == list and query_data.request.data and len(query_data.request.data) > 0 and \ 'id' in query_data.request.data[0]: objs_list_ids = [obj['id'] for obj in query_data.request.data] elif response and response.data and type(response.data) == dict and 'id' in response.data: objs_list_ids = [response.data['id']] elif response and response.data and getattr(response.data.serializer, 'instance', None): objs_list_ids = [response.data.serializer.instance.id] elif response and response.data and 'id' in response.data and response.data['id']: objs_list_ids = [response.data['id']] elif query_data.request.data and 'id' in query_data.request.data: objs_list_ids = [query_data.request.data['id']] elif 'pk' in query_data.kwargs: objs_list_ids = [query_data.kwargs['pk']] elif query_data.queryset: objs_list_ids = query_data.queryset.values_list('id') else: return None # if not objs_list_ids: # # serializer = query_data.serializer_class() # data = serializer(data=query_data.request.data) # # data_Dict = { # 'data': data, # 'DT': str(datetime.now()), # 'user': str(query_data.request.user), # 'oper_type': query_data.action, # 'init': init # } # # add_element_in_tmp_data_list('log', data_target, obj_id, data_Dict) objs_list = query_data.queryset.filter(id__in=objs_list_ids) cur_action = query_data.action query_data.action = 'retrieve' serializer = query_data.get_serializer_class() query_data.action = cur_action obj_data_list = serializer(objs_list, many=True) elements_list_for_add_to_tmp_data = [] for obj_data in obj_data_list.data: obj_id = obj_data['id'] # фиксим json-неподходящие поля for item_data in obj_data.keys(): if type(obj_data[item_data]) not in (str, int, float, dict, list, bool): obj_data[item_data] = str(obj_data[item_data]) # if init: # if check_exists_element_in_tmp_data_list('log', data_target, obj_id): # continue data_Dict = { 'id': obj_id, 'data': obj_data, 'DT': str(datetime.now()), 'user': str(query_data.request.user), 'oper_type': query_data.action, 'init': init } # add_element_in_tmp_data_list('log', data_target, obj_id, data_Dict) elements_list_for_add_to_tmp_data.append(data_Dict) add_element_list_to_tmp_data('log', data_target, init, elements_list_for_add_to_tmp_data) except Exception as e: response_data = '' if response and response.data: response_data = str(response.data) msg = 'log_save_cur_state_obj fail save to log w data = {0}
{1}
{2}
response_data={3}'.format( str(e), 'log - ' + str(data_target) + ' - ' + str(obj_id), str(data_Dict), response_data ) techSendMail(msg) return 'OK' class APIViewSet_ModelClass(viewsets.ModelViewSet): # pass # # authentication_classes = (SessionAuthentication, BasicAuthentication, )# # permission_classes = (IsAuthenticated, ) # # renderer_classes = [JSONCustomRenderer] # pagination_class = None def initial(self, request, *args, **kwargs): res = super(APIViewSet_ModelClass, self).initial(request, *args, **kwargs) if self.basename == 'alert' or not self.action in exclude_actions_for_logging: return res if not self.action in create_kwargs: log_save_cur_state_obj(self, init=True) return res def finalize_response(self, request, response, *args, **kwargs): res = super(APIViewSet_ModelClass, self).finalize_response(request, response, *args, **kwargs) if self.basename == 'alert' or not self.action in exclude_actions_for_logging: return res log_save_cur_state_obj(self, response=response) return res def create(self, request, *args, **kwargs): obj = super(APIViewSet_ModelClass, self).create(request, *args, **kwargs) # data_Dict = {} # try: # data_Dict = { # 'data': prepare_data_for_json(vars(obj)), # 'DT': str(datetime.now()), # 'user': str(request.user), # 'oper_type': 'create' # } # # add_element_in_tmp_data_list('log', 'properties_log', obj.id, data_Dict) # except Exception as e: # msg = 'fail save to log w data = {0}
{1}'.format( # 'log - properties_log - ' + str(obj.id), # str(data_Dict) # ) # techSendMail(msg) return obj def partial_update(self, request, *args, **kwargs): if request.data: request.data['modifiedDT'] = datetime.now() obj = super(APIViewSet_ModelClass, self).partial_update(request, *args, **kwargs) # data_Dict = {} # try: # data_Dict = { # 'data': prepare_data_for_json(vars(obj)), # 'DT': str(datetime.now()), # 'user': str(request.user), # 'oper_type': 'create' # } # # add_element_in_tmp_data_list('log', 'properties_log', obj.id, data_Dict) # except Exception as e: # msg = 'fail save to log w data = {0}
{1}'.format( # 'log - properties_log - ' + str(obj.id), # str(data_Dict) # ) # techSendMail(msg) return obj class APIViewSet_ModelClass_w_Expenses(APIViewSet_ModelClass): @action(methods=['GET'], detail=True) def expenses_rates(self, request, *args, **kwargs): from ExpensesApp.api.v1.expenses_rate.expenses_rate_api_serializers import ExpensesRate_get_Serializer model = self.serializer_class.Meta.model try: obj = model.objects.get(id=kwargs['pk']) except model.DoesNotExist: return Response({'error': u'ошибка получения expenses_rates'}, status=status.HTTP_400_BAD_REQUEST) expenses_rates = obj.expenses_rates.all() serializer = ExpensesRate_get_Serializer(expenses_rates, many=True) # if serializer.data: # return Response(serializer.data) return Response(serializer.data) @action(methods=['GET'], detail=True) def expenses_data(self, request, *args, **kwargs): from ExpensesApp.api.v1.expenses_data.expenses_data_api_serializers import ExpensesData_get_Serializer model = self.serializer_class.Meta.model try: obj = model.objects.get(id=kwargs['pk']) except model.DoesNotExist: return Response({'error': u'ошибка получения expenses_rates'}, status=status.HTTP_400_BAD_REQUEST) expenses_data = obj.expenses_data.all() serializer = ExpensesData_get_Serializer(expenses_data, many=True) # if serializer.data: # return Response(serializer.data) return Response(serializer.data) class APIViewSet_BaseClass(viewsets.ViewSet): pass # authentication_classes = (SessionAuthentication, BasicAuthentication,) # permission_classes = (IsAuthenticated, ) # renderer_classes = [JSONCustomRenderer] pagination_class = None # class APIBaseClass(generics.RetrieveAPIView): # authentication_classes = (SessionAuthentication, BasicAuthentication, )# # permission_classes = (IsAuthenticated,) # # renderer_classes = [JSONCustomRenderer] # pagination_class = None # # # class APIBaseSimplaClass(APIView): # authentication_classes = (SessionAuthentication, BasicAuthentication) # permission_classes = (IsAuthenticated,) # # renderer_classes = [JSONCustomRenderer] # pagination_class = None