GeneralApp add
This commit is contained in:
SDE
2023-06-19 17:19:18 +03:00
parent b0ad8e41d2
commit b2bd830b6e
69 changed files with 5337 additions and 16 deletions

View File

View File

@@ -0,0 +1,43 @@
from openpyxl import Workbook
from django.http import HttpResponse
from openpyxl.writer.excel import save_virtual_workbook
def xls_export(data, filename):
print('xls_export')
# wb = Workbook()
# ws = wb.active
#
# r = 1
# for row in data:
# c = 1
# for val in row.values():
# try:
# ws.cell(row=r, column=c).value = val
# except:
# ws.cell(row=r, column=c).value = str(val)
# c += 1
#
# r += 1
#
# dims = {}
# for row in ws.rows:
# for cell in row:
# if cell.value:
# dims[cell.column] = max((dims.get(cell.column, 0), len(str(cell.value))))
# for col, value in dims.items():
# ws.column_dimensions[col].width = value
# filepath = "/demo.xlsx"
# wb.save(filepath)
# output = BytesIO()
# wb.save(output)
from ..office_documents_utils import get_xls_file_by_data_list
xls_file = get_xls_file_by_data_list(data)
response = HttpResponse(xls_file, content_type='application/ms-excel')
response['Content-Disposition'] = 'attachment; filename="{0}"'.format(filename)
return response

View File

@@ -0,0 +1,23 @@
def check_and_get_specific_output_format(obj, data=None, filename=None):
if obj.request.query_params and 'output_format' in obj.request.query_params and obj.request.query_params['output_format'] == 'xlsx':
if not data:
serializer = obj.get_serializer(obj.get_queryset(), many=True)
data = serializer.data
from .api_export_xls import xls_export
return xls_export(data, filename)
return None
def fix_txt_for_use_in_interlinks(txt):
txt = txt.replace('/', ' ')
txt = txt.replace('?', ' ')
txt = txt.replace(';', ' ')
txt = txt.replace(',', ' ')
txt = txt.replace('+', ' ')
txt = txt.replace(':', ' ')
return txt

View File

@@ -0,0 +1,19 @@
# from rest_framework import viewsets
#
# class APILogMiddleware(viewsets.ModelViewSet):
# # def __init__(self, get_response):
# # self.get_response = get_response
# # One-time configuration and initialization.
#
# def __call__(self, request):
# # Code to be executed for each request before
# # the view (and later middleware) are called.
#
# response = self.get_response(request)
#
# self
#
# # Code to be executed for each request/response after
# # the view is called.
#
# return response

View File

@@ -0,0 +1,22 @@
import codecs
from django.conf import settings
from rest_framework.exceptions import ParseError
from rest_framework.parsers import BaseParser
class PlainTextParser(BaseParser):
media_type = "text/plain"
def parse(self, stream, media_type=None, parser_context=None):
"""
Parses the incoming bytestream as Plain Text and returns the resulting data.
"""
parser_context = parser_context or {}
encoding = parser_context.get('encoding', settings.DEFAULT_CHARSET)
try:
decoded_stream = codecs.getreader(encoding)(stream)
text_content = decoded_stream.read()
return text_content
except ValueError as exc:
raise ParseError('Plain text parse error - %s' % str(exc))

View File

@@ -0,0 +1,36 @@
from rest_framework.permissions import BasePermission
class StaffOnly_perm(BasePermission):
"""
Allows access only to staff users.
"""
def has_permission(self, request, view):
return request.user and request.user.is_staff
class api_1C_perm(BasePermission):
"""
Allows access only 1C users.
"""
# def has_object_permission(self, request, view, obj):
def has_permission(self, request, view):
if request.user.id == 8751:
try:
if request.req_type == 'warehouse_import':
return True
else:
return False
except:
return False
perm = request.user.has_perm('AuthApp.1c_api')
return perm
class full_api_perm(BasePermission):
"""
Allows access only users w full access.
"""
def has_permission(self, request, view):
return request.user.has_perm('AuthApp.full_api')

View File

@@ -0,0 +1,44 @@
from rest_framework import serializers
from django.contrib.contenttypes.models import ContentType
from BaseModels.mailSender import techSendMail
class Import_Element_Srializer(serializers.Serializer):
element = serializers.JSONField()
class Meta:
fields = (
'element',
)
class Import_Pocket_Srializer(serializers.Serializer):
timestamp = serializers.IntegerField()
warehouse = serializers.CharField()
data_list = Import_Element_Srializer(many=True)
class Meta:
fields = (
'timestamp', 'warehouse', 'data_list'
)
class Generic_base_Serializer(serializers.ModelSerializer):
linked_object_type = serializers.CharField(required=False)
def create(self, validated_data):
if 'linked_object_type' in validated_data:
try:
validated_data['content_type'] = ContentType.objects.get(model=validated_data['linked_object_type'])
del validated_data['linked_object_type']
except Exception as e:
msg = 'Ошибка создания generic объекта<br>{0}({1})<br>{2}'.format(
str(e),
str(e.args),
str(validated_data)
)
print(msg)
title = 'ОШИБКА tE Generic_base_Serializer create'
techSendMail(msg, title)
return super(Generic_base_Serializer, self).create(validated_data)

View File

@@ -0,0 +1,356 @@
# coding=utf-8
from rest_framework import generics
from rest_framework.authentication import BasicAuthentication, SessionAuthentication
from rest_framework.permissions import IsAuthenticated, DjangoObjectPermissions
from rest_framework.views import APIView
from rest_framework import viewsets
from rest_framework.renderers import JSONRenderer
from rest_framework.permissions import AllowAny
from rest_framework.response import Response
from rest_framework.schemas import SchemaGenerator
from rest_framework_swagger import renderers
from BaseModels.api.base_api_permissions import *
from datetime import datetime
from GeneralApp.temp_data_funcs import add_element_in_tmp_data_list, check_exists_element_in_tmp_data_list, add_element_list_to_tmp_data
from rest_framework.utils.serializer_helpers import ReturnList
from rest_framework.decorators import action
from rest_framework import status
from django.contrib.contenttypes.models import ContentType
from BaseModels.mailSender import techSendMail
# from BaseModels.api.api_middlewares import APILogMiddleware
class SwaggerSchemaView(APIView):
permission_classes = [AllowAny]
renderer_classes = [
renderers.OpenAPIRenderer,
renderers.SwaggerUIRenderer
]
def get(self, request):
generator = SchemaGenerator()
schema = generator.get_schema(request=request)
return Response(schema)
JSONCustomRenderer = JSONRenderer
JSONCustomRenderer.charset = 'utf-8'
class APIBasePublicClass(APIView):
# authentication_classes = (SessionAuthentication, BasicAuthentication)
permission_classes = (AllowAny,)
# renderer_classes = [JSONCustomRenderer]
pagination_class = None
# def finalize_response(self, request, response, *args, **kwargs):
#
# res = super(APIBasePublicClass, self).finalize_response(request, response, *args, **kwargs)
#
# from CompaniesApp.models import Region
# regions = Region.objects.filter().values_list(
# 'id', 'domain'
# ).order_by('id')
# res.data.update({'regions': tuple(regions)})
#
# return res
# def get(self, request, *args, **kwargs):
#
# if not 'region_id' in request.headers:
# request.headers['region_id'] = '1'
#
# return super(APIBasePublicClass, self).get(request, *args, **kwargs)
class APIListBaseClass(generics.ListAPIView):
# authentication_classes = (SessionAuthentication, BasicAuthentication, )#
permission_classes = (IsAuthenticated,)
pagination_class = None
class APIBaseClass(generics.RetrieveAPIView):
# authentication_classes = (SessionAuthentication, BasicAuthentication, )#
permission_classes = (IsAuthenticated, )
# renderer_classes = [JSONCustomRenderer]
pagination_class = None
class APIBaseSimplaClass(generics.GenericAPIView):
# authentication_classes = (SessionAuthentication, BasicAuthentication)
permission_classes = (IsAuthenticated,)
# renderer_classes = [JSONCustomRenderer]
pagination_class = None
# ----------------------------
class APIViewSet_ModelReadOnlyClass(viewsets.ReadOnlyModelViewSet):
pass
# authentication_classes = (SessionAuthentication, BasicAuthentication, )#
permission_classes = (IsAuthenticated, )
# renderer_classes = [JSONCustomRenderer]
pagination_class = None
exclude_actions_for_logging = []
create_kwargs = [
'create', 'create_short', 'create_item',
'copy_item', 'create_short', 'create_reminder'
]
exclude_actions_for_logging.extend(create_kwargs)
exclude_actions_for_logging.extend([
'update', 'partial_update', 'destroy', 'update_items', 'update_item'
])
def log_save_cur_state_obj(query_data, response=None, init=False):
if query_data.basename == 'alert' or not query_data.action in exclude_actions_for_logging:
return None
if response and response.status_code > 299:
return None
data_Dict = {}
data_target = 'log_{0}'.format(str(query_data.basename))
obj_id = None
try:
if type(query_data.request.data) == list and query_data.request.data and len(query_data.request.data) > 0 and \
'id' in query_data.request.data[0]:
objs_list_ids = [obj['id'] for obj in query_data.request.data]
elif response and response.data and type(response.data) == dict and 'id' in response.data:
objs_list_ids = [response.data['id']]
elif response and response.data and getattr(response.data.serializer, 'instance', None):
objs_list_ids = [response.data.serializer.instance.id]
elif response and response.data and 'id' in response.data and response.data['id']:
objs_list_ids = [response.data['id']]
elif query_data.request.data and 'id' in query_data.request.data:
objs_list_ids = [query_data.request.data['id']]
elif 'pk' in query_data.kwargs:
objs_list_ids = [query_data.kwargs['pk']]
elif query_data.queryset:
objs_list_ids = query_data.queryset.values_list('id')
else:
return None
# if not objs_list_ids:
#
# serializer = query_data.serializer_class()
# data = serializer(data=query_data.request.data)
#
# data_Dict = {
# 'data': data,
# 'DT': str(datetime.now()),
# 'user': str(query_data.request.user),
# 'oper_type': query_data.action,
# 'init': init
# }
#
# add_element_in_tmp_data_list('log', data_target, obj_id, data_Dict)
objs_list = query_data.queryset.filter(id__in=objs_list_ids)
cur_action = query_data.action
query_data.action = 'retrieve'
serializer = query_data.get_serializer_class()
query_data.action = cur_action
obj_data_list = serializer(objs_list, many=True)
elements_list_for_add_to_tmp_data = []
for obj_data in obj_data_list.data:
obj_id = obj_data['id']
# фиксим json-неподходящие поля
for item_data in obj_data.keys():
if type(obj_data[item_data]) not in (str, int, float, dict, list, bool):
obj_data[item_data] = str(obj_data[item_data])
# if init:
# if check_exists_element_in_tmp_data_list('log', data_target, obj_id):
# continue
data_Dict = {
'id': obj_id,
'data': obj_data,
'DT': str(datetime.now()),
'user': str(query_data.request.user),
'oper_type': query_data.action,
'init': init
}
# add_element_in_tmp_data_list('log', data_target, obj_id, data_Dict)
elements_list_for_add_to_tmp_data.append(data_Dict)
add_element_list_to_tmp_data('log', data_target, init, elements_list_for_add_to_tmp_data)
except Exception as e:
response_data = ''
if response and response.data:
response_data = str(response.data)
msg = 'log_save_cur_state_obj fail save to log w data = {0}<br>{1}<br>{2}<br>response_data={3}'.format(
str(e),
'log - ' + str(data_target) + ' - ' + str(obj_id),
str(data_Dict),
response_data
)
techSendMail(msg)
return 'OK'
class APIViewSet_ModelClass(viewsets.ModelViewSet):
# pass
# # authentication_classes = (SessionAuthentication, BasicAuthentication, )#
# permission_classes = (IsAuthenticated, )
# # renderer_classes = [JSONCustomRenderer]
# pagination_class = None
def initial(self, request, *args, **kwargs):
res = super(APIViewSet_ModelClass, self).initial(request, *args, **kwargs)
if self.basename == 'alert' or not self.action in exclude_actions_for_logging:
return res
if not self.action in create_kwargs:
log_save_cur_state_obj(self, init=True)
return res
def finalize_response(self, request, response, *args, **kwargs):
res = super(APIViewSet_ModelClass, self).finalize_response(request, response, *args, **kwargs)
if self.basename == 'alert' or not self.action in exclude_actions_for_logging:
return res
log_save_cur_state_obj(self, response=response)
return res
def create(self, request, *args, **kwargs):
obj = super(APIViewSet_ModelClass, self).create(request, *args, **kwargs)
# data_Dict = {}
# try:
# data_Dict = {
# 'data': prepare_data_for_json(vars(obj)),
# 'DT': str(datetime.now()),
# 'user': str(request.user),
# 'oper_type': 'create'
# }
#
# add_element_in_tmp_data_list('log', 'properties_log', obj.id, data_Dict)
# except Exception as e:
# msg = 'fail save to log w data = {0}<br>{1}'.format(
# 'log - properties_log - ' + str(obj.id),
# str(data_Dict)
# )
# techSendMail(msg)
return obj
def partial_update(self, request, *args, **kwargs):
if request.data:
request.data['modifiedDT'] = datetime.now()
obj = super(APIViewSet_ModelClass, self).partial_update(request, *args, **kwargs)
# data_Dict = {}
# try:
# data_Dict = {
# 'data': prepare_data_for_json(vars(obj)),
# 'DT': str(datetime.now()),
# 'user': str(request.user),
# 'oper_type': 'create'
# }
#
# add_element_in_tmp_data_list('log', 'properties_log', obj.id, data_Dict)
# except Exception as e:
# msg = 'fail save to log w data = {0}<br>{1}'.format(
# 'log - properties_log - ' + str(obj.id),
# str(data_Dict)
# )
# techSendMail(msg)
return obj
class APIViewSet_ModelClass_w_Expenses(APIViewSet_ModelClass):
@action(methods=['GET'], detail=True)
def expenses_rates(self, request, *args, **kwargs):
from ExpensesApp.api.v1.expenses_rate.expenses_rate_api_serializers import ExpensesRate_get_Serializer
model = self.serializer_class.Meta.model
try:
obj = model.objects.get(id=kwargs['pk'])
except model.DoesNotExist:
return Response({'error': u'ошибка получения expenses_rates'},
status=status.HTTP_400_BAD_REQUEST)
expenses_rates = obj.expenses_rates.all()
serializer = ExpensesRate_get_Serializer(expenses_rates, many=True)
# if serializer.data:
# return Response(serializer.data)
return Response(serializer.data)
@action(methods=['GET'], detail=True)
def expenses_data(self, request, *args, **kwargs):
from ExpensesApp.api.v1.expenses_data.expenses_data_api_serializers import ExpensesData_get_Serializer
model = self.serializer_class.Meta.model
try:
obj = model.objects.get(id=kwargs['pk'])
except model.DoesNotExist:
return Response({'error': u'ошибка получения expenses_rates'},
status=status.HTTP_400_BAD_REQUEST)
expenses_data = obj.expenses_data.all()
serializer = ExpensesData_get_Serializer(expenses_data, many=True)
# if serializer.data:
# return Response(serializer.data)
return Response(serializer.data)
class APIViewSet_BaseClass(viewsets.ViewSet):
pass
# authentication_classes = (SessionAuthentication, BasicAuthentication,) #
permission_classes = (IsAuthenticated, )
# renderer_classes = [JSONCustomRenderer]
pagination_class = None
# class APIBaseClass(generics.RetrieveAPIView):
# authentication_classes = (SessionAuthentication, BasicAuthentication, )#
# permission_classes = (IsAuthenticated,)
# # renderer_classes = [JSONCustomRenderer]
# pagination_class = None
#
#
# class APIBaseSimplaClass(APIView):
# authentication_classes = (SessionAuthentication, BasicAuthentication)
# permission_classes = (IsAuthenticated,)
# # renderer_classes = [JSONCustomRenderer]
# pagination_class = None